diff options
Diffstat (limited to 'src/bin/pg_dump/compress_lz4.c')
| -rw-r--r-- | src/bin/pg_dump/compress_lz4.c | 167 |
1 files changed, 97 insertions, 70 deletions
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index e2f7c468293..47ee2e4bbac 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -60,13 +60,11 @@ typedef struct LZ4State bool compressing; /* - * Used by the Compressor API to mark if the compression headers have been - * written after initialization. + * I/O buffer area. */ - bool needs_header_flush; - - size_t buflen; - char *buffer; + char *buffer; /* buffer for compressed data */ + size_t buflen; /* allocated size of buffer */ + size_t bufdata; /* amount of valid data currently in buffer */ /* * Used by the Stream API to store already uncompressed data that the @@ -77,12 +75,6 @@ typedef struct LZ4State char *overflowbuf; /* - * Used by both APIs to keep track of the compressed data length stored in - * the buffer. - */ - size_t compressedlen; - - /* * Used by both APIs to keep track of error codes. */ size_t errcode; @@ -103,9 +95,18 @@ LZ4State_compression_init(LZ4State *state) { size_t status; + /* + * Compute size needed for buffer, assuming we will present at most + * DEFAULT_IO_BUFFER_SIZE input bytes at a time. + */ state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); /* + * Then double it, to ensure we're not forced to flush every time. + */ + state->buflen *= 2; + + /* * LZ4F_compressBegin requires a buffer that is greater or equal to * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met. */ @@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state) } state->buffer = pg_malloc(state->buflen); + + /* + * Insert LZ4 header into buffer. + */ status = LZ4F_compressBegin(state->ctx, state->buffer, state->buflen, &state->prefs); @@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state) return false; } - state->compressedlen = status; + state->bufdata = status; return true; } @@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, { LZ4State *state = (LZ4State *) cs->private_data; size_t remaining = dLen; - size_t status; - size_t chunk; - - /* Write the header if not yet written. */ - if (state->needs_header_flush) - { - cs->writeF(AH, state->buffer, state->compressedlen); - state->needs_header_flush = false; - } while (remaining > 0) { + size_t chunk; + size_t required; + size_t status; - if (remaining > DEFAULT_IO_BUFFER_SIZE) - chunk = DEFAULT_IO_BUFFER_SIZE; - else - chunk = remaining; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } - remaining -= chunk; status = LZ4F_compressUpdate(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, data, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("could not compress data: %s", LZ4F_getErrorName(status)); - cs->writeF(AH, state->buffer, status); + state->bufdata += status; - data = ((char *) data) + chunk; + data = ((const char *) data) + chunk; + remaining -= chunk; } } @@ -238,29 +244,32 @@ static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) { LZ4State *state = (LZ4State *) cs->private_data; + size_t required; size_t status; /* Nothing needs to be done */ if (!state) return; - /* - * Write the header if not yet written. The caller is not required to call - * writeData if the relation does not contain any data. Thus it is - * possible to reach here without having flushed the header. Do it before - * ending the compression. - */ - if (state->needs_header_flush) - cs->writeF(AH, state->buffer, state->compressedlen); + /* We might need to flush the buffer to make room for LZ4F_compressEnd */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + cs->writeF(AH, state->buffer, state->bufdata); + state->bufdata = 0; + } status = LZ4F_compressEnd(state->ctx, - state->buffer, state->buflen, + state->buffer + state->bufdata, + state->buflen - state->bufdata, NULL); if (LZ4F_isError(status)) pg_fatal("could not end compression: %s", LZ4F_getErrorName(status)); + state->bufdata += status; - cs->writeF(AH, state->buffer, status); + /* Write the final bufferload */ + cs->writeF(AH, state->buffer, state->bufdata); status = LZ4F_freeCompressionContext(state->ctx); if (LZ4F_isError(status)) @@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi pg_fatal("could not initialize LZ4 compression: %s", LZ4F_getErrorName(state->errcode)); - /* Remember that the header has not been written. */ - state->needs_header_flush = true; cs->private_data = state; } @@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) state->compressing = compressing; - /* When compressing, write LZ4 header to the output stream. */ if (state->compressing) { - if (!LZ4State_compression_init(state)) return false; - - errno = 0; - if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen) - { - errno = (errno) ? errno : ENOSPC; - return false; - } } else { @@ -573,8 +571,7 @@ static void LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - size_t status; - int remaining = size; + size_t remaining = size; /* Lazy init */ if (!LZ4Stream_init(state, size, true)) @@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) while (remaining > 0) { - int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE); + size_t chunk; + size_t required; + size_t status; - remaining -= chunk; + /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ + chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); + + /* If not enough space, must flush buffer */ + required = LZ4F_compressBound(chunk, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_fatal("error during writing: %m"); + } + state->bufdata = 0; + } - status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen, + status = LZ4F_compressUpdate(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, ptr, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("error during writing: %s", LZ4F_getErrorName(status)); - - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_fatal("error during writing: %m"); - } + state->bufdata += status; ptr = ((const char *) ptr) + chunk; + remaining -= chunk; } } @@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH) { FILE *fp; LZ4State *state = (LZ4State *) CFH->private_data; + size_t required; size_t status; int ret; @@ -683,20 +694,36 @@ LZ4Stream_close(CompressFileHandle *CFH) { if (state->compressing) { - status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL); + /* We might need to flush the buffer to make room */ + required = LZ4F_compressBound(0, &state->prefs); + if (required > state->buflen - state->bufdata) + { + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) + { + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); + } + state->bufdata = 0; + } + + status = LZ4F_compressEnd(state->ctx, + state->buffer + state->bufdata, + state->buflen - state->bufdata, + NULL); if (LZ4F_isError(status)) { pg_log_error("could not end compression: %s", LZ4F_getErrorName(status)); } else + state->bufdata += status; + + errno = 0; + if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { - errno = 0; - if (fwrite(state->buffer, 1, status, state->fp) != status) - { - errno = (errno) ? errno : ENOSPC; - pg_log_error("could not write to output file: %m"); - } + errno = (errno) ? errno : ENOSPC; + pg_log_error("could not write to output file: %m"); } status = LZ4F_freeCompressionContext(state->ctx); |
