summaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/compress_lz4.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/compress_lz4.c')
-rw-r--r--src/bin/pg_dump/compress_lz4.c167
1 files changed, 97 insertions, 70 deletions
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index e2f7c468293..47ee2e4bbac 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -60,13 +60,11 @@ typedef struct LZ4State
bool compressing;
/*
- * Used by the Compressor API to mark if the compression headers have been
- * written after initialization.
+ * I/O buffer area.
*/
- bool needs_header_flush;
-
- size_t buflen;
- char *buffer;
+ char *buffer; /* buffer for compressed data */
+ size_t buflen; /* allocated size of buffer */
+ size_t bufdata; /* amount of valid data currently in buffer */
/*
* Used by the Stream API to store already uncompressed data that the
@@ -77,12 +75,6 @@ typedef struct LZ4State
char *overflowbuf;
/*
- * Used by both APIs to keep track of the compressed data length stored in
- * the buffer.
- */
- size_t compressedlen;
-
- /*
* Used by both APIs to keep track of error codes.
*/
size_t errcode;
@@ -103,9 +95,18 @@ LZ4State_compression_init(LZ4State *state)
{
size_t status;
+ /*
+ * Compute size needed for buffer, assuming we will present at most
+ * DEFAULT_IO_BUFFER_SIZE input bytes at a time.
+ */
state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
/*
+ * Then double it, to ensure we're not forced to flush every time.
+ */
+ state->buflen *= 2;
+
+ /*
* LZ4F_compressBegin requires a buffer that is greater or equal to
* LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
*/
@@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state)
}
state->buffer = pg_malloc(state->buflen);
+
+ /*
+ * Insert LZ4 header into buffer.
+ */
status = LZ4F_compressBegin(state->ctx,
state->buffer, state->buflen,
&state->prefs);
@@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state)
return false;
}
- state->compressedlen = status;
+ state->bufdata = status;
return true;
}
@@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
{
LZ4State *state = (LZ4State *) cs->private_data;
size_t remaining = dLen;
- size_t status;
- size_t chunk;
-
- /* Write the header if not yet written. */
- if (state->needs_header_flush)
- {
- cs->writeF(AH, state->buffer, state->compressedlen);
- state->needs_header_flush = false;
- }
while (remaining > 0)
{
+ size_t chunk;
+ size_t required;
+ size_t status;
- if (remaining > DEFAULT_IO_BUFFER_SIZE)
- chunk = DEFAULT_IO_BUFFER_SIZE;
- else
- chunk = remaining;
+ /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
+ chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE);
+
+ /* If not enough space, must flush buffer */
+ required = LZ4F_compressBound(chunk, &state->prefs);
+ if (required > state->buflen - state->bufdata)
+ {
+ cs->writeF(AH, state->buffer, state->bufdata);
+ state->bufdata = 0;
+ }
- remaining -= chunk;
status = LZ4F_compressUpdate(state->ctx,
- state->buffer, state->buflen,
+ state->buffer + state->bufdata,
+ state->buflen - state->bufdata,
data, chunk, NULL);
if (LZ4F_isError(status))
pg_fatal("could not compress data: %s",
LZ4F_getErrorName(status));
- cs->writeF(AH, state->buffer, status);
+ state->bufdata += status;
- data = ((char *) data) + chunk;
+ data = ((const char *) data) + chunk;
+ remaining -= chunk;
}
}
@@ -238,29 +244,32 @@ static void
EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
{
LZ4State *state = (LZ4State *) cs->private_data;
+ size_t required;
size_t status;
/* Nothing needs to be done */
if (!state)
return;
- /*
- * Write the header if not yet written. The caller is not required to call
- * writeData if the relation does not contain any data. Thus it is
- * possible to reach here without having flushed the header. Do it before
- * ending the compression.
- */
- if (state->needs_header_flush)
- cs->writeF(AH, state->buffer, state->compressedlen);
+ /* We might need to flush the buffer to make room for LZ4F_compressEnd */
+ required = LZ4F_compressBound(0, &state->prefs);
+ if (required > state->buflen - state->bufdata)
+ {
+ cs->writeF(AH, state->buffer, state->bufdata);
+ state->bufdata = 0;
+ }
status = LZ4F_compressEnd(state->ctx,
- state->buffer, state->buflen,
+ state->buffer + state->bufdata,
+ state->buflen - state->bufdata,
NULL);
if (LZ4F_isError(status))
pg_fatal("could not end compression: %s",
LZ4F_getErrorName(status));
+ state->bufdata += status;
- cs->writeF(AH, state->buffer, status);
+ /* Write the final bufferload */
+ cs->writeF(AH, state->buffer, state->bufdata);
status = LZ4F_freeCompressionContext(state->ctx);
if (LZ4F_isError(status))
@@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi
pg_fatal("could not initialize LZ4 compression: %s",
LZ4F_getErrorName(state->errcode));
- /* Remember that the header has not been written. */
- state->needs_header_flush = true;
cs->private_data = state;
}
@@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
state->compressing = compressing;
- /* When compressing, write LZ4 header to the output stream. */
if (state->compressing)
{
-
if (!LZ4State_compression_init(state))
return false;
-
- errno = 0;
- if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
- {
- errno = (errno) ? errno : ENOSPC;
- return false;
- }
}
else
{
@@ -573,8 +571,7 @@ static void
LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
{
LZ4State *state = (LZ4State *) CFH->private_data;
- size_t status;
- int remaining = size;
+ size_t remaining = size;
/* Lazy init */
if (!LZ4Stream_init(state, size, true))
@@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
while (remaining > 0)
{
- int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
+ size_t chunk;
+ size_t required;
+ size_t status;
- remaining -= chunk;
+ /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
+ chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE);
+
+ /* If not enough space, must flush buffer */
+ required = LZ4F_compressBound(chunk, &state->prefs);
+ if (required > state->buflen - state->bufdata)
+ {
+ errno = 0;
+ if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
+ {
+ errno = (errno) ? errno : ENOSPC;
+ pg_fatal("error during writing: %m");
+ }
+ state->bufdata = 0;
+ }
- status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
+ status = LZ4F_compressUpdate(state->ctx,
+ state->buffer + state->bufdata,
+ state->buflen - state->bufdata,
ptr, chunk, NULL);
if (LZ4F_isError(status))
pg_fatal("error during writing: %s", LZ4F_getErrorName(status));
-
- errno = 0;
- if (fwrite(state->buffer, 1, status, state->fp) != status)
- {
- errno = (errno) ? errno : ENOSPC;
- pg_fatal("error during writing: %m");
- }
+ state->bufdata += status;
ptr = ((const char *) ptr) + chunk;
+ remaining -= chunk;
}
}
@@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH)
{
FILE *fp;
LZ4State *state = (LZ4State *) CFH->private_data;
+ size_t required;
size_t status;
int ret;
@@ -683,20 +694,36 @@ LZ4Stream_close(CompressFileHandle *CFH)
{
if (state->compressing)
{
- status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
+ /* We might need to flush the buffer to make room */
+ required = LZ4F_compressBound(0, &state->prefs);
+ if (required > state->buflen - state->bufdata)
+ {
+ errno = 0;
+ if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
+ {
+ errno = (errno) ? errno : ENOSPC;
+ pg_log_error("could not write to output file: %m");
+ }
+ state->bufdata = 0;
+ }
+
+ status = LZ4F_compressEnd(state->ctx,
+ state->buffer + state->bufdata,
+ state->buflen - state->bufdata,
+ NULL);
if (LZ4F_isError(status))
{
pg_log_error("could not end compression: %s",
LZ4F_getErrorName(status));
}
else
+ state->bufdata += status;
+
+ errno = 0;
+ if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata)
{
- errno = 0;
- if (fwrite(state->buffer, 1, status, state->fp) != status)
- {
- errno = (errno) ? errno : ENOSPC;
- pg_log_error("could not write to output file: %m");
- }
+ errno = (errno) ? errno : ENOSPC;
+ pg_log_error("could not write to output file: %m");
}
status = LZ4F_freeCompressionContext(state->ctx);