summaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/walmethods.c
diff options
context:
space:
mode:
authorMichael Paquier <michael@paquier.xyz>2021-11-05 11:33:25 +0900
committerMichael Paquier <michael@paquier.xyz>2021-11-05 11:33:25 +0900
commitbabbbb595d2322da095a1e6703171b3f1f2815cb (patch)
tree2822626ff97ea4f4f88f6776354e79e40a07925f /src/bin/pg_basebackup/walmethods.c
parent5cd7eb1f1c32e1b95894f28b277b4e4b89add772 (diff)
Add support for LZ4 compression in pg_receivewal
pg_receivewal gains a new option, --compression-method=lz4, available when the code is compiled with --with-lz4. Similarly to gzip, this gives the possibility to compress archived WAL segments with LZ4. This option is not compatible with --compress. The implementation uses LZ4 frames, and is compatible with simple lz4 commands. Like gzip, using --synchronous ensures that any data will be flushed to disk within the current .partial segment, so as it is possible to retrieve as much WAL data as possible even from a non-completed segment (this requires completing the partial file with zeros up to the WAL segment size supported by the backend after decompression, but this is the same as gzip). The calculation of the streaming start LSN is able to transparently find and check LZ4-compressed segments. Contrary to gzip where the uncompressed size is directly stored in the object read, the LZ4 chunk protocol does not store the uncompressed data by default. There is contentSize that can be used with LZ4 frames by that would not help if using an archive that includes segments compressed with the defaults of a "lz4" command, where this is not stored. So, this commit has taken the most extensible approach by decompressing the already-archived segment to check its uncompressed size, through a blank output buffer in chunks of 64kB (no actual performance difference noticed with 8kB, 16kB or 32kB, and the operation in itself is actually fast). Tests have been added to verify the creation and correctness of the generated LZ4 files. The latter is achieved by the use of command "lz4", if found in the environment. The tar-based WAL method in walmethods.c, used now only by pg_basebackup, does not know yet about LZ4. Its code could be extended for this purpose. Author: Georgios Kokolatos Reviewed-by: Michael Paquier, Jian Guo, Magnus Hagander, Dilip Kumar Discussion: https://postgr.es/m/ZCm1J5vfyQ2E6dYvXz8si39HQ2gwxSZ3IpYaVgYa3lUwY88SLapx9EEnOf5uEwrddhx2twG7zYKjVeuP5MwZXCNPybtsGouDsAD1o2L_I5E=@pm.me
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r--src/bin/pg_basebackup/walmethods.c160
1 files changed, 159 insertions, 1 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 52f314af3bb..f1ba2a828a0 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -17,6 +17,10 @@
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
@@ -30,6 +34,9 @@
/* Size of zlib buffer for .tar.gz */
#define ZLIB_OUT_SIZE 4096
+/* Size of LZ4 input chunk for .lz4 */
+#define LZ4_IN_SIZE 4096
+
/*-------------------------------------------------------------------------
* WalDirectoryMethod - write wal to a directory looking like pg_wal
*-------------------------------------------------------------------------
@@ -60,6 +67,11 @@ typedef struct DirectoryMethodFile
#ifdef HAVE_LIBZ
gzFile gzfp;
#endif
+#ifdef HAVE_LIBLZ4
+ LZ4F_compressionContext_t ctx;
+ size_t lz4bufsize;
+ void *lz4buf;
+#endif
} DirectoryMethodFile;
static const char *
@@ -76,7 +88,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix)
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "",
+ dir_data->compression_method == COMPRESSION_GZIP ? ".gz" :
+ dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
@@ -92,6 +105,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
#ifdef HAVE_LIBZ
gzFile gzfp = NULL;
#endif
+#ifdef HAVE_LIBLZ4
+ LZ4F_compressionContext_t ctx = NULL;
+ size_t lz4bufsize = 0;
+ void *lz4buf = NULL;
+#endif
filename = dir_get_file_name(pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
@@ -126,6 +144,50 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
}
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t ctx_out;
+ size_t header_size;
+
+ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctx_out))
+ {
+ close(fd);
+ return NULL;
+ }
+
+ lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
+ lz4buf = pg_malloc0(lz4bufsize);
+
+ /* add the header */
+ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL);
+ if (LZ4F_isError(header_size))
+ {
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ return NULL;
+ }
+
+ errno = 0;
+ if (write(fd, lz4buf, header_size) != header_size)
+ {
+ int save_errno = errno;
+
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+
+ /*
+ * If write didn't set errno, assume problem is no disk space.
+ */
+ errno = save_errno ? save_errno : ENOSPC;
+ return NULL;
+ }
+ }
+#endif
/* Do pre-padding on non-compressed files */
if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
@@ -177,6 +239,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
gzclose(gzfp);
else
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ }
+ else
+#endif
close(fd);
return NULL;
}
@@ -187,6 +259,15 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (dir_data->compression_method == COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ f->ctx = ctx;
+ f->lz4buf = lz4buf;
+ f->lz4bufsize = lz4bufsize;
+ }
+#endif
+
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
@@ -210,6 +291,43 @@ dir_write(Walfile f, const void *buf, size_t count)
r = (ssize_t) gzwrite(df->gzfp, buf, count);
else
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t chunk;
+ size_t remaining;
+ const void *inbuf = buf;
+
+ remaining = count;
+ while (remaining > 0)
+ {
+ size_t compressed;
+
+ if (remaining > LZ4_IN_SIZE)
+ chunk = LZ4_IN_SIZE;
+ else
+ chunk = remaining;
+
+ remaining -= chunk;
+ compressed = LZ4F_compressUpdate(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ inbuf, chunk,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+
+ inbuf = ((char *) inbuf) + chunk;
+ }
+
+ /* Our caller keeps track of the uncompressed size. */
+ r = (ssize_t) count;
+ }
+ else
+#endif
r = write(df->fd, buf, count);
if (r > 0)
df->currpos += r;
@@ -240,6 +358,25 @@ dir_close(Walfile f, WalCloseMethod method)
r = gzclose(df->gzfp);
else
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t compressed;
+
+ compressed = LZ4F_compressEnd(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+
+ r = close(df->fd);
+ }
+ else
+#endif
r = close(df->fd);
if (r == 0)
@@ -293,6 +430,12 @@ dir_close(Walfile f, WalCloseMethod method)
}
}
+#ifdef HAVE_LIBLZ4
+ pg_free(df->lz4buf);
+ /* supports free on NULL */
+ LZ4F_freeCompressionContext(df->ctx);
+#endif
+
pg_free(df->pathname);
pg_free(df->fullpath);
if (df->temp_suffix)
@@ -317,6 +460,21 @@ dir_sync(Walfile f)
return -1;
}
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ size_t compressed;
+
+ /* Flush any internal buffers */
+ compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+ }
+#endif
return fsync(((DirectoryMethodFile *) f)->fd);
}