diff options
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r-- | src/bin/pg_basebackup/walmethods.c | 160 |
1 files changed, 159 insertions, 1 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index 52f314af3bb..f1ba2a828a0 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -17,6 +17,10 @@ #include <sys/stat.h> #include <time.h> #include <unistd.h> + +#ifdef HAVE_LIBLZ4 +#include <lz4frame.h> +#endif #ifdef HAVE_LIBZ #include <zlib.h> #endif @@ -30,6 +34,9 @@ /* Size of zlib buffer for .tar.gz */ #define ZLIB_OUT_SIZE 4096 +/* Size of LZ4 input chunk for .lz4 */ +#define LZ4_IN_SIZE 4096 + /*------------------------------------------------------------------------- * WalDirectoryMethod - write wal to a directory looking like pg_wal *------------------------------------------------------------------------- @@ -60,6 +67,11 @@ typedef struct DirectoryMethodFile #ifdef HAVE_LIBZ gzFile gzfp; #endif +#ifdef HAVE_LIBLZ4 + LZ4F_compressionContext_t ctx; + size_t lz4bufsize; + void *lz4buf; +#endif } DirectoryMethodFile; static const char * @@ -76,7 +88,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix) snprintf(filename, MAXPGPATH, "%s%s%s", pathname, - dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "", + dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : + dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "", temp_suffix ? temp_suffix : ""); return filename; @@ -92,6 +105,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ #ifdef HAVE_LIBZ gzFile gzfp = NULL; #endif +#ifdef HAVE_LIBLZ4 + LZ4F_compressionContext_t ctx = NULL; + size_t lz4bufsize = 0; + void *lz4buf = NULL; +#endif filename = dir_get_file_name(pathname, temp_suffix); snprintf(tmppath, sizeof(tmppath), "%s/%s", @@ -126,6 +144,50 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ } } #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + size_t ctx_out; + size_t header_size; + + ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); + if (LZ4F_isError(ctx_out)) + { + close(fd); + return NULL; + } + + lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL); + lz4buf = pg_malloc0(lz4bufsize); + + /* add the header */ + header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL); + if (LZ4F_isError(header_size)) + { + (void) LZ4F_freeCompressionContext(ctx); + pg_free(lz4buf); + close(fd); + return NULL; + } + + errno = 0; + if (write(fd, lz4buf, header_size) != header_size) + { + int save_errno = errno; + + (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); + (void) LZ4F_freeCompressionContext(ctx); + pg_free(lz4buf); + close(fd); + + /* + * If write didn't set errno, assume problem is no disk space. + */ + errno = save_errno ? save_errno : ENOSPC; + return NULL; + } + } +#endif /* Do pre-padding on non-compressed files */ if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE) @@ -177,6 +239,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ gzclose(gzfp); else #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL); + (void) LZ4F_freeCompressionContext(ctx); + pg_free(lz4buf); + close(fd); + } + else +#endif close(fd); return NULL; } @@ -187,6 +259,15 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (dir_data->compression_method == COMPRESSION_GZIP) f->gzfp = gzfp; #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + f->ctx = ctx; + f->lz4buf = lz4buf; + f->lz4bufsize = lz4bufsize; + } +#endif + f->fd = fd; f->currpos = 0; f->pathname = pg_strdup(pathname); @@ -210,6 +291,43 @@ dir_write(Walfile f, const void *buf, size_t count) r = (ssize_t) gzwrite(df->gzfp, buf, count); else #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + size_t chunk; + size_t remaining; + const void *inbuf = buf; + + remaining = count; + while (remaining > 0) + { + size_t compressed; + + if (remaining > LZ4_IN_SIZE) + chunk = LZ4_IN_SIZE; + else + chunk = remaining; + + remaining -= chunk; + compressed = LZ4F_compressUpdate(df->ctx, + df->lz4buf, df->lz4bufsize, + inbuf, chunk, + NULL); + + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->lz4buf, compressed) != compressed) + return -1; + + inbuf = ((char *) inbuf) + chunk; + } + + /* Our caller keeps track of the uncompressed size. */ + r = (ssize_t) count; + } + else +#endif r = write(df->fd, buf, count); if (r > 0) df->currpos += r; @@ -240,6 +358,25 @@ dir_close(Walfile f, WalCloseMethod method) r = gzclose(df->gzfp); else #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + size_t compressed; + + compressed = LZ4F_compressEnd(df->ctx, + df->lz4buf, df->lz4bufsize, + NULL); + + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->lz4buf, compressed) != compressed) + return -1; + + r = close(df->fd); + } + else +#endif r = close(df->fd); if (r == 0) @@ -293,6 +430,12 @@ dir_close(Walfile f, WalCloseMethod method) } } +#ifdef HAVE_LIBLZ4 + pg_free(df->lz4buf); + /* supports free on NULL */ + LZ4F_freeCompressionContext(df->ctx); +#endif + pg_free(df->pathname); pg_free(df->fullpath); if (df->temp_suffix) @@ -317,6 +460,21 @@ dir_sync(Walfile f) return -1; } #endif +#ifdef HAVE_LIBLZ4 + if (dir_data->compression_method == COMPRESSION_LZ4) + { + DirectoryMethodFile *df = (DirectoryMethodFile *) f; + size_t compressed; + + /* Flush any internal buffers */ + compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL); + if (LZ4F_isError(compressed)) + return -1; + + if (write(df->fd, df->lz4buf, compressed) != compressed) + return -1; + } +#endif return fsync(((DirectoryMethodFile *) f)->fd); } |