summaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/walmethods.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r--src/bin/pg_basebackup/walmethods.c160
1 files changed, 159 insertions, 1 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 52f314af3bb..f1ba2a828a0 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -17,6 +17,10 @@
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
@@ -30,6 +34,9 @@
/* Size of zlib buffer for .tar.gz */
#define ZLIB_OUT_SIZE 4096
+/* Size of LZ4 input chunk for .lz4 */
+#define LZ4_IN_SIZE 4096
+
/*-------------------------------------------------------------------------
* WalDirectoryMethod - write wal to a directory looking like pg_wal
*-------------------------------------------------------------------------
@@ -60,6 +67,11 @@ typedef struct DirectoryMethodFile
#ifdef HAVE_LIBZ
gzFile gzfp;
#endif
+#ifdef HAVE_LIBLZ4
+ LZ4F_compressionContext_t ctx;
+ size_t lz4bufsize;
+ void *lz4buf;
+#endif
} DirectoryMethodFile;
static const char *
@@ -76,7 +88,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix)
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "",
+ dir_data->compression_method == COMPRESSION_GZIP ? ".gz" :
+ dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
@@ -92,6 +105,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
#ifdef HAVE_LIBZ
gzFile gzfp = NULL;
#endif
+#ifdef HAVE_LIBLZ4
+ LZ4F_compressionContext_t ctx = NULL;
+ size_t lz4bufsize = 0;
+ void *lz4buf = NULL;
+#endif
filename = dir_get_file_name(pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
@@ -126,6 +144,50 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
}
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t ctx_out;
+ size_t header_size;
+
+ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctx_out))
+ {
+ close(fd);
+ return NULL;
+ }
+
+ lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
+ lz4buf = pg_malloc0(lz4bufsize);
+
+ /* add the header */
+ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL);
+ if (LZ4F_isError(header_size))
+ {
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ return NULL;
+ }
+
+ errno = 0;
+ if (write(fd, lz4buf, header_size) != header_size)
+ {
+ int save_errno = errno;
+
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+
+ /*
+ * If write didn't set errno, assume problem is no disk space.
+ */
+ errno = save_errno ? save_errno : ENOSPC;
+ return NULL;
+ }
+ }
+#endif
/* Do pre-padding on non-compressed files */
if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
@@ -177,6 +239,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
gzclose(gzfp);
else
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ }
+ else
+#endif
close(fd);
return NULL;
}
@@ -187,6 +259,15 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (dir_data->compression_method == COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ f->ctx = ctx;
+ f->lz4buf = lz4buf;
+ f->lz4bufsize = lz4bufsize;
+ }
+#endif
+
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
@@ -210,6 +291,43 @@ dir_write(Walfile f, const void *buf, size_t count)
r = (ssize_t) gzwrite(df->gzfp, buf, count);
else
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t chunk;
+ size_t remaining;
+ const void *inbuf = buf;
+
+ remaining = count;
+ while (remaining > 0)
+ {
+ size_t compressed;
+
+ if (remaining > LZ4_IN_SIZE)
+ chunk = LZ4_IN_SIZE;
+ else
+ chunk = remaining;
+
+ remaining -= chunk;
+ compressed = LZ4F_compressUpdate(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ inbuf, chunk,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+
+ inbuf = ((char *) inbuf) + chunk;
+ }
+
+ /* Our caller keeps track of the uncompressed size. */
+ r = (ssize_t) count;
+ }
+ else
+#endif
r = write(df->fd, buf, count);
if (r > 0)
df->currpos += r;
@@ -240,6 +358,25 @@ dir_close(Walfile f, WalCloseMethod method)
r = gzclose(df->gzfp);
else
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t compressed;
+
+ compressed = LZ4F_compressEnd(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+
+ r = close(df->fd);
+ }
+ else
+#endif
r = close(df->fd);
if (r == 0)
@@ -293,6 +430,12 @@ dir_close(Walfile f, WalCloseMethod method)
}
}
+#ifdef HAVE_LIBLZ4
+ pg_free(df->lz4buf);
+ /* supports free on NULL */
+ LZ4F_freeCompressionContext(df->ctx);
+#endif
+
pg_free(df->pathname);
pg_free(df->fullpath);
if (df->temp_suffix)
@@ -317,6 +460,21 @@ dir_sync(Walfile f)
return -1;
}
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ size_t compressed;
+
+ /* Flush any internal buffers */
+ compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+ }
+#endif
return fsync(((DirectoryMethodFile *) f)->fd);
}