diff options
| -rw-r--r-- | src/bin/pg_dump/compress_lz4.c | 242 |
1 files changed, 89 insertions, 153 deletions
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index 47ee2e4bbac..b817a083d38 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -65,14 +65,12 @@ typedef struct LZ4State char *buffer; /* buffer for compressed data */ size_t buflen; /* allocated size of buffer */ size_t bufdata; /* amount of valid data currently in buffer */ - - /* - * Used by the Stream API to store already uncompressed data that the - * caller has not consumed. - */ - size_t overflowalloclen; - size_t overflowlen; - char *overflowbuf; + /* These fields are used only while decompressing: */ + size_t bufnext; /* next buffer position to decompress */ + char *outbuf; /* buffer for decompressed data */ + size_t outbuflen; /* allocated size of outbuf */ + size_t outbufdata; /* amount of valid data currently in outbuf */ + size_t outbufnext; /* next outbuf position to return */ /* * Used by both APIs to keep track of error codes. @@ -163,8 +161,8 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) pg_fatal("could not create LZ4 decompression context: %s", LZ4F_getErrorName(status)); - outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE); - readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE); + outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); + readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); readbuflen = DEFAULT_IO_BUFFER_SIZE; while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0) { @@ -179,7 +177,6 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) size_t out_size = DEFAULT_IO_BUFFER_SIZE; size_t read_size = readend - readp; - memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE); status = LZ4F_decompress(ctx, outbuf, &out_size, readp, &read_size, &dec_opt); if (LZ4F_isError(status)) @@ -322,15 +319,16 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi /* * LZ4 equivalent to feof() or gzeof(). Return true iff there is no - * decompressed output in the overflow buffer and the end of the backing file - * is reached. + * more buffered data and the end of the input file has been reached. */ static bool LZ4Stream_eof(CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - return state->overflowlen == 0 && feof(state->fp); + return state->outbufnext >= state->outbufdata && + state->bufnext >= state->bufdata && + feof(state->fp); } static const char * @@ -352,13 +350,15 @@ LZ4Stream_get_error(CompressFileHandle *CFH) * * Creates the necessary contexts for either compression or decompression. When * compressing data (indicated by compressing=true), it additionally writes the - * LZ4 header in the output stream. + * LZ4 header in the output buffer. + * + * It's expected that a not-yet-initialized LZ4State will be zero-filled. * * Returns true on success. In case of a failure returns false, and stores the * error code in state->errcode. */ static bool -LZ4Stream_init(LZ4State *state, int size, bool compressing) +LZ4Stream_init(LZ4State *state, bool compressing) { size_t status; @@ -381,12 +381,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) return false; } - state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE); + state->buflen = DEFAULT_IO_BUFFER_SIZE; state->buffer = pg_malloc(state->buflen); - - state->overflowalloclen = state->buflen; - state->overflowbuf = pg_malloc(state->overflowalloclen); - state->overflowlen = 0; + state->outbuflen = DEFAULT_IO_BUFFER_SIZE; + state->outbuf = pg_malloc(state->outbuflen); } state->inited = true; @@ -394,53 +392,11 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) } /* - * Read already decompressed content from the overflow buffer into 'ptr' up to - * 'size' bytes, if available. If the eol_flag is set, then stop at the first - * occurrence of the newline char prior to 'size' bytes. - * - * Any unread content in the overflow buffer is moved to the beginning. - * - * Returns the number of bytes read from the overflow buffer (and copied into - * the 'ptr' buffer), or 0 if the overflow buffer is empty. - */ -static int -LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag) -{ - char *p; - int readlen = 0; - - if (state->overflowlen == 0) - return 0; - - if (state->overflowlen >= size) - readlen = size; - else - readlen = state->overflowlen; - - if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen))) - /* Include the line terminating char */ - readlen = p - state->overflowbuf + 1; - - memcpy(ptr, state->overflowbuf, readlen); - state->overflowlen -= readlen; - - if (state->overflowlen > 0) - memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen); - - return readlen; -} - -/* * The workhorse for reading decompressed content out of an LZ4 compressed * stream. * * It will read up to 'ptrsize' decompressed content, or up to the new line - * char if found first when the eol_flag is set. It is possible that the - * decompressed output generated by reading any compressed input via the - * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored - * at an overflow buffer within LZ4State. Of course, when the function is - * called, it will first try to consume any decompressed content already - * present in the overflow buffer, before decompressing new content. + * char if one is found first when the eol_flag is set. * * Returns the number of bytes of decompressed data copied into the ptr * buffer, or -1 in case of error. @@ -449,62 +405,85 @@ static int LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) { int dsize = 0; - int rsize; - int size = ptrsize; - bool eol_found = false; - - void *readbuf; + int remaining = ptrsize; /* Lazy init */ - if (!LZ4Stream_init(state, size, false /* decompressing */ )) + if (!LZ4Stream_init(state, false /* decompressing */ )) { pg_log_error("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); return -1; } - /* No work needs to be done for a zero-sized output buffer */ - if (size <= 0) - return 0; - - /* Verify that there is enough space in the outbuf */ - if (size > state->buflen) + /* Loop until postcondition is satisfied */ + while (remaining > 0) { - state->buflen = size; - state->buffer = pg_realloc(state->buffer, size); - } - - /* use already decompressed content if available */ - dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag); - if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize))) - return dsize; - - readbuf = pg_malloc(size); + /* + * If we already have some decompressed data, return that. + */ + if (state->outbufnext < state->outbufdata) + { + char *outptr = state->outbuf + state->outbufnext; + size_t readlen = state->outbufdata - state->outbufnext; + bool eol_found = false; + + if (readlen > remaining) + readlen = remaining; + /* If eol_flag is set, don't read beyond a newline */ + if (eol_flag) + { + char *eolptr = memchr(outptr, '\n', readlen); - do - { - char *rp; - char *rend; + if (eolptr) + { + readlen = eolptr - outptr + 1; + eol_found = true; + } + } + memcpy(ptr, outptr, readlen); + ptr = ((char *) ptr) + readlen; + state->outbufnext += readlen; + dsize += readlen; + remaining -= readlen; + if (eol_found || remaining == 0) + break; + /* We must have emptied outbuf */ + Assert(state->outbufnext >= state->outbufdata); + } - rsize = fread(readbuf, 1, size, state->fp); - if (rsize < size && !feof(state->fp)) + /* + * If we don't have any pending compressed data, load more into + * state->buffer. + */ + if (state->bufnext >= state->bufdata) { - pg_log_error("could not read from input file: %m"); - return -1; - } + size_t rsize; - rp = (char *) readbuf; - rend = (char *) readbuf + rsize; + rsize = fread(state->buffer, 1, state->buflen, state->fp); + if (rsize < state->buflen && !feof(state->fp)) + { + pg_log_error("could not read from input file: %m"); + return -1; + } + if (rsize == 0) + break; /* must be EOF */ + state->bufdata = rsize; + state->bufnext = 0; + } - while (rp < rend) + /* + * Decompress some data into state->outbuf. + */ { size_t status; - size_t outlen = state->buflen; - size_t read_remain = rend - rp; - - memset(state->buffer, 0, outlen); - status = LZ4F_decompress(state->dtx, state->buffer, &outlen, - rp, &read_remain, NULL); + size_t outlen = state->outbuflen; + size_t inlen = state->bufdata - state->bufnext; + + status = LZ4F_decompress(state->dtx, + state->outbuf, &outlen, + state->buffer + state->bufnext, + &inlen, + NULL); if (LZ4F_isError(status)) { state->errcode = status; @@ -512,54 +491,11 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) LZ4F_getErrorName(state->errcode)); return -1; } - - rp += read_remain; - - /* - * fill in what space is available in ptr if the eol flag is set, - * either skip if one already found or fill up to EOL if present - * in the outbuf - */ - if (outlen > 0 && dsize < size && eol_found == false) - { - char *p; - size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize; - size_t len = outlen < lib ? outlen : lib; - - if (eol_flag && - (p = memchr(state->buffer, '\n', outlen)) && - (size_t) (p - state->buffer + 1) <= len) - { - len = p - state->buffer + 1; - eol_found = true; - } - - memcpy((char *) ptr + dsize, state->buffer, len); - dsize += len; - - /* move what did not fit, if any, at the beginning of the buf */ - if (len < outlen) - memmove(state->buffer, state->buffer + len, outlen - len); - outlen -= len; - } - - /* if there is available output, save it */ - if (outlen > 0) - { - while (state->overflowlen + outlen > state->overflowalloclen) - { - state->overflowalloclen *= 2; - state->overflowbuf = pg_realloc(state->overflowbuf, - state->overflowalloclen); - } - - memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen); - state->overflowlen += outlen; - } + state->bufnext += inlen; + state->outbufdata = outlen; + state->outbufnext = 0; } - } while (rsize == size && dsize < size && eol_found == false); - - pg_free(readbuf); + } return dsize; } @@ -574,7 +510,7 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) size_t remaining = size; /* Lazy init */ - if (!LZ4Stream_init(state, size, true)) + if (!LZ4Stream_init(state, true)) pg_fatal("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); @@ -737,7 +673,7 @@ LZ4Stream_close(CompressFileHandle *CFH) if (LZ4F_isError(status)) pg_log_error("could not end decompression: %s", LZ4F_getErrorName(status)); - pg_free(state->overflowbuf); + pg_free(state->outbuf); } pg_free(state->buffer); |
