summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bin/pg_dump/compress_lz4.c242
1 files changed, 89 insertions, 153 deletions
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 47ee2e4bbac..b817a083d38 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -65,14 +65,12 @@ typedef struct LZ4State
char *buffer; /* buffer for compressed data */
size_t buflen; /* allocated size of buffer */
size_t bufdata; /* amount of valid data currently in buffer */
-
- /*
- * Used by the Stream API to store already uncompressed data that the
- * caller has not consumed.
- */
- size_t overflowalloclen;
- size_t overflowlen;
- char *overflowbuf;
+ /* These fields are used only while decompressing: */
+ size_t bufnext; /* next buffer position to decompress */
+ char *outbuf; /* buffer for decompressed data */
+ size_t outbuflen; /* allocated size of outbuf */
+ size_t outbufdata; /* amount of valid data currently in outbuf */
+ size_t outbufnext; /* next outbuf position to return */
/*
* Used by both APIs to keep track of error codes.
@@ -163,8 +161,8 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
pg_fatal("could not create LZ4 decompression context: %s",
LZ4F_getErrorName(status));
- outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
- readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
+ outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE);
+ readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE);
readbuflen = DEFAULT_IO_BUFFER_SIZE;
while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
{
@@ -179,7 +177,6 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
size_t out_size = DEFAULT_IO_BUFFER_SIZE;
size_t read_size = readend - readp;
- memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
status = LZ4F_decompress(ctx, outbuf, &out_size,
readp, &read_size, &dec_opt);
if (LZ4F_isError(status))
@@ -322,15 +319,16 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi
/*
* LZ4 equivalent to feof() or gzeof(). Return true iff there is no
- * decompressed output in the overflow buffer and the end of the backing file
- * is reached.
+ * more buffered data and the end of the input file has been reached.
*/
static bool
LZ4Stream_eof(CompressFileHandle *CFH)
{
LZ4State *state = (LZ4State *) CFH->private_data;
- return state->overflowlen == 0 && feof(state->fp);
+ return state->outbufnext >= state->outbufdata &&
+ state->bufnext >= state->bufdata &&
+ feof(state->fp);
}
static const char *
@@ -352,13 +350,15 @@ LZ4Stream_get_error(CompressFileHandle *CFH)
*
* Creates the necessary contexts for either compression or decompression. When
* compressing data (indicated by compressing=true), it additionally writes the
- * LZ4 header in the output stream.
+ * LZ4 header in the output buffer.
+ *
+ * It's expected that a not-yet-initialized LZ4State will be zero-filled.
*
* Returns true on success. In case of a failure returns false, and stores the
* error code in state->errcode.
*/
static bool
-LZ4Stream_init(LZ4State *state, int size, bool compressing)
+LZ4Stream_init(LZ4State *state, bool compressing)
{
size_t status;
@@ -381,12 +381,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
return false;
}
- state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
+ state->buflen = DEFAULT_IO_BUFFER_SIZE;
state->buffer = pg_malloc(state->buflen);
-
- state->overflowalloclen = state->buflen;
- state->overflowbuf = pg_malloc(state->overflowalloclen);
- state->overflowlen = 0;
+ state->outbuflen = DEFAULT_IO_BUFFER_SIZE;
+ state->outbuf = pg_malloc(state->outbuflen);
}
state->inited = true;
@@ -394,53 +392,11 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
}
/*
- * Read already decompressed content from the overflow buffer into 'ptr' up to
- * 'size' bytes, if available. If the eol_flag is set, then stop at the first
- * occurrence of the newline char prior to 'size' bytes.
- *
- * Any unread content in the overflow buffer is moved to the beginning.
- *
- * Returns the number of bytes read from the overflow buffer (and copied into
- * the 'ptr' buffer), or 0 if the overflow buffer is empty.
- */
-static int
-LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
-{
- char *p;
- int readlen = 0;
-
- if (state->overflowlen == 0)
- return 0;
-
- if (state->overflowlen >= size)
- readlen = size;
- else
- readlen = state->overflowlen;
-
- if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
- /* Include the line terminating char */
- readlen = p - state->overflowbuf + 1;
-
- memcpy(ptr, state->overflowbuf, readlen);
- state->overflowlen -= readlen;
-
- if (state->overflowlen > 0)
- memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
-
- return readlen;
-}
-
-/*
* The workhorse for reading decompressed content out of an LZ4 compressed
* stream.
*
* It will read up to 'ptrsize' decompressed content, or up to the new line
- * char if found first when the eol_flag is set. It is possible that the
- * decompressed output generated by reading any compressed input via the
- * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
- * at an overflow buffer within LZ4State. Of course, when the function is
- * called, it will first try to consume any decompressed content already
- * present in the overflow buffer, before decompressing new content.
+ * char if one is found first when the eol_flag is set.
*
* Returns the number of bytes of decompressed data copied into the ptr
* buffer, or -1 in case of error.
@@ -449,62 +405,85 @@ static int
LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
{
int dsize = 0;
- int rsize;
- int size = ptrsize;
- bool eol_found = false;
-
- void *readbuf;
+ int remaining = ptrsize;
/* Lazy init */
- if (!LZ4Stream_init(state, size, false /* decompressing */ ))
+ if (!LZ4Stream_init(state, false /* decompressing */ ))
{
pg_log_error("unable to initialize LZ4 library: %s",
LZ4F_getErrorName(state->errcode));
return -1;
}
- /* No work needs to be done for a zero-sized output buffer */
- if (size <= 0)
- return 0;
-
- /* Verify that there is enough space in the outbuf */
- if (size > state->buflen)
+ /* Loop until postcondition is satisfied */
+ while (remaining > 0)
{
- state->buflen = size;
- state->buffer = pg_realloc(state->buffer, size);
- }
-
- /* use already decompressed content if available */
- dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
- if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
- return dsize;
-
- readbuf = pg_malloc(size);
+ /*
+ * If we already have some decompressed data, return that.
+ */
+ if (state->outbufnext < state->outbufdata)
+ {
+ char *outptr = state->outbuf + state->outbufnext;
+ size_t readlen = state->outbufdata - state->outbufnext;
+ bool eol_found = false;
+
+ if (readlen > remaining)
+ readlen = remaining;
+ /* If eol_flag is set, don't read beyond a newline */
+ if (eol_flag)
+ {
+ char *eolptr = memchr(outptr, '\n', readlen);
- do
- {
- char *rp;
- char *rend;
+ if (eolptr)
+ {
+ readlen = eolptr - outptr + 1;
+ eol_found = true;
+ }
+ }
+ memcpy(ptr, outptr, readlen);
+ ptr = ((char *) ptr) + readlen;
+ state->outbufnext += readlen;
+ dsize += readlen;
+ remaining -= readlen;
+ if (eol_found || remaining == 0)
+ break;
+ /* We must have emptied outbuf */
+ Assert(state->outbufnext >= state->outbufdata);
+ }
- rsize = fread(readbuf, 1, size, state->fp);
- if (rsize < size && !feof(state->fp))
+ /*
+ * If we don't have any pending compressed data, load more into
+ * state->buffer.
+ */
+ if (state->bufnext >= state->bufdata)
{
- pg_log_error("could not read from input file: %m");
- return -1;
- }
+ size_t rsize;
- rp = (char *) readbuf;
- rend = (char *) readbuf + rsize;
+ rsize = fread(state->buffer, 1, state->buflen, state->fp);
+ if (rsize < state->buflen && !feof(state->fp))
+ {
+ pg_log_error("could not read from input file: %m");
+ return -1;
+ }
+ if (rsize == 0)
+ break; /* must be EOF */
+ state->bufdata = rsize;
+ state->bufnext = 0;
+ }
- while (rp < rend)
+ /*
+ * Decompress some data into state->outbuf.
+ */
{
size_t status;
- size_t outlen = state->buflen;
- size_t read_remain = rend - rp;
-
- memset(state->buffer, 0, outlen);
- status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
- rp, &read_remain, NULL);
+ size_t outlen = state->outbuflen;
+ size_t inlen = state->bufdata - state->bufnext;
+
+ status = LZ4F_decompress(state->dtx,
+ state->outbuf, &outlen,
+ state->buffer + state->bufnext,
+ &inlen,
+ NULL);
if (LZ4F_isError(status))
{
state->errcode = status;
@@ -512,54 +491,11 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
LZ4F_getErrorName(state->errcode));
return -1;
}
-
- rp += read_remain;
-
- /*
- * fill in what space is available in ptr if the eol flag is set,
- * either skip if one already found or fill up to EOL if present
- * in the outbuf
- */
- if (outlen > 0 && dsize < size && eol_found == false)
- {
- char *p;
- size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
- size_t len = outlen < lib ? outlen : lib;
-
- if (eol_flag &&
- (p = memchr(state->buffer, '\n', outlen)) &&
- (size_t) (p - state->buffer + 1) <= len)
- {
- len = p - state->buffer + 1;
- eol_found = true;
- }
-
- memcpy((char *) ptr + dsize, state->buffer, len);
- dsize += len;
-
- /* move what did not fit, if any, at the beginning of the buf */
- if (len < outlen)
- memmove(state->buffer, state->buffer + len, outlen - len);
- outlen -= len;
- }
-
- /* if there is available output, save it */
- if (outlen > 0)
- {
- while (state->overflowlen + outlen > state->overflowalloclen)
- {
- state->overflowalloclen *= 2;
- state->overflowbuf = pg_realloc(state->overflowbuf,
- state->overflowalloclen);
- }
-
- memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
- state->overflowlen += outlen;
- }
+ state->bufnext += inlen;
+ state->outbufdata = outlen;
+ state->outbufnext = 0;
}
- } while (rsize == size && dsize < size && eol_found == false);
-
- pg_free(readbuf);
+ }
return dsize;
}
@@ -574,7 +510,7 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
size_t remaining = size;
/* Lazy init */
- if (!LZ4Stream_init(state, size, true))
+ if (!LZ4Stream_init(state, true))
pg_fatal("unable to initialize LZ4 library: %s",
LZ4F_getErrorName(state->errcode));
@@ -737,7 +673,7 @@ LZ4Stream_close(CompressFileHandle *CFH)
if (LZ4F_isError(status))
pg_log_error("could not end decompression: %s",
LZ4F_getErrorName(status));
- pg_free(state->overflowbuf);
+ pg_free(state->outbuf);
}
pg_free(state->buffer);