diff options
| author | Tom Lane <tgl@sss.pgh.pa.us> | 2025-10-13 13:17:45 -0400 |
|---|---|---|
| committer | Tom Lane <tgl@sss.pgh.pa.us> | 2025-10-13 13:17:45 -0400 |
| commit | 1f8062dd9668572d66549fc798a7d2057aa34ee1 (patch) | |
| tree | 9ab6ecd05fa101bae13388a14ff5141fbd711ac0 | |
| parent | fe8192a95e6c7159d639e341740e32966c9cf385 (diff) | |
Fix serious performance problems in LZ4Stream_read_internal.
I was distressed to find that reading an LZ4-compressed toc.dat
file was hundreds of times slower than it ought to be. On
investigation, the blame mostly affixes to LZ4Stream_read_overflow's
habit of memmove'ing all the remaining buffered data after each read
operation. Since reading a TOC file tends to involve a lot of small
(even one-byte) decompression calls, that amounts to an O(N^2) cost.
This could have been fixed with a minimal patch, but to my
eyes LZ4Stream_read_internal and LZ4Stream_read_overflow are
badly-written spaghetti code; in particular the eol_flag logic
is inefficient and duplicative. I chose to throw the code
away and rewrite from scratch. This version is about sixty
lines shorter as well as not having the performance issue.
Fortunately, AFAICT the only way to get to this problem is to
manually LZ4-compress the toc.dat and/or blobs.toc files within a
directory-style archive; in the main data files, we read blocks
that are large enough that the O(N^2) behavior doesn't manifest.
Few people do that, which likely explains the lack of field
complaints. Otherwise this performance bug might be considered
bad enough to warrant back-patching.
Author: Tom Lane <tgl@sss.pgh.pa.us>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us
| -rw-r--r-- | src/bin/pg_dump/compress_lz4.c | 242 |
1 files changed, 89 insertions, 153 deletions
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c index 47ee2e4bbac..b817a083d38 100644 --- a/src/bin/pg_dump/compress_lz4.c +++ b/src/bin/pg_dump/compress_lz4.c @@ -65,14 +65,12 @@ typedef struct LZ4State char *buffer; /* buffer for compressed data */ size_t buflen; /* allocated size of buffer */ size_t bufdata; /* amount of valid data currently in buffer */ - - /* - * Used by the Stream API to store already uncompressed data that the - * caller has not consumed. - */ - size_t overflowalloclen; - size_t overflowlen; - char *overflowbuf; + /* These fields are used only while decompressing: */ + size_t bufnext; /* next buffer position to decompress */ + char *outbuf; /* buffer for decompressed data */ + size_t outbuflen; /* allocated size of outbuf */ + size_t outbufdata; /* amount of valid data currently in outbuf */ + size_t outbufnext; /* next outbuf position to return */ /* * Used by both APIs to keep track of error codes. @@ -163,8 +161,8 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) pg_fatal("could not create LZ4 decompression context: %s", LZ4F_getErrorName(status)); - outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE); - readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE); + outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); + readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); readbuflen = DEFAULT_IO_BUFFER_SIZE; while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0) { @@ -179,7 +177,6 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) size_t out_size = DEFAULT_IO_BUFFER_SIZE; size_t read_size = readend - readp; - memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE); status = LZ4F_decompress(ctx, outbuf, &out_size, readp, &read_size, &dec_opt); if (LZ4F_isError(status)) @@ -322,15 +319,16 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi /* * LZ4 equivalent to feof() or gzeof(). Return true iff there is no - * decompressed output in the overflow buffer and the end of the backing file - * is reached. + * more buffered data and the end of the input file has been reached. */ static bool LZ4Stream_eof(CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; - return state->overflowlen == 0 && feof(state->fp); + return state->outbufnext >= state->outbufdata && + state->bufnext >= state->bufdata && + feof(state->fp); } static const char * @@ -352,13 +350,15 @@ LZ4Stream_get_error(CompressFileHandle *CFH) * * Creates the necessary contexts for either compression or decompression. When * compressing data (indicated by compressing=true), it additionally writes the - * LZ4 header in the output stream. + * LZ4 header in the output buffer. + * + * It's expected that a not-yet-initialized LZ4State will be zero-filled. * * Returns true on success. In case of a failure returns false, and stores the * error code in state->errcode. */ static bool -LZ4Stream_init(LZ4State *state, int size, bool compressing) +LZ4Stream_init(LZ4State *state, bool compressing) { size_t status; @@ -381,12 +381,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) return false; } - state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE); + state->buflen = DEFAULT_IO_BUFFER_SIZE; state->buffer = pg_malloc(state->buflen); - - state->overflowalloclen = state->buflen; - state->overflowbuf = pg_malloc(state->overflowalloclen); - state->overflowlen = 0; + state->outbuflen = DEFAULT_IO_BUFFER_SIZE; + state->outbuf = pg_malloc(state->outbuflen); } state->inited = true; @@ -394,53 +392,11 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing) } /* - * Read already decompressed content from the overflow buffer into 'ptr' up to - * 'size' bytes, if available. If the eol_flag is set, then stop at the first - * occurrence of the newline char prior to 'size' bytes. - * - * Any unread content in the overflow buffer is moved to the beginning. - * - * Returns the number of bytes read from the overflow buffer (and copied into - * the 'ptr' buffer), or 0 if the overflow buffer is empty. - */ -static int -LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag) -{ - char *p; - int readlen = 0; - - if (state->overflowlen == 0) - return 0; - - if (state->overflowlen >= size) - readlen = size; - else - readlen = state->overflowlen; - - if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen))) - /* Include the line terminating char */ - readlen = p - state->overflowbuf + 1; - - memcpy(ptr, state->overflowbuf, readlen); - state->overflowlen -= readlen; - - if (state->overflowlen > 0) - memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen); - - return readlen; -} - -/* * The workhorse for reading decompressed content out of an LZ4 compressed * stream. * * It will read up to 'ptrsize' decompressed content, or up to the new line - * char if found first when the eol_flag is set. It is possible that the - * decompressed output generated by reading any compressed input via the - * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored - * at an overflow buffer within LZ4State. Of course, when the function is - * called, it will first try to consume any decompressed content already - * present in the overflow buffer, before decompressing new content. + * char if one is found first when the eol_flag is set. * * Returns the number of bytes of decompressed data copied into the ptr * buffer, or -1 in case of error. @@ -449,62 +405,85 @@ static int LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) { int dsize = 0; - int rsize; - int size = ptrsize; - bool eol_found = false; - - void *readbuf; + int remaining = ptrsize; /* Lazy init */ - if (!LZ4Stream_init(state, size, false /* decompressing */ )) + if (!LZ4Stream_init(state, false /* decompressing */ )) { pg_log_error("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); return -1; } - /* No work needs to be done for a zero-sized output buffer */ - if (size <= 0) - return 0; - - /* Verify that there is enough space in the outbuf */ - if (size > state->buflen) + /* Loop until postcondition is satisfied */ + while (remaining > 0) { - state->buflen = size; - state->buffer = pg_realloc(state->buffer, size); - } - - /* use already decompressed content if available */ - dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag); - if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize))) - return dsize; - - readbuf = pg_malloc(size); + /* + * If we already have some decompressed data, return that. + */ + if (state->outbufnext < state->outbufdata) + { + char *outptr = state->outbuf + state->outbufnext; + size_t readlen = state->outbufdata - state->outbufnext; + bool eol_found = false; + + if (readlen > remaining) + readlen = remaining; + /* If eol_flag is set, don't read beyond a newline */ + if (eol_flag) + { + char *eolptr = memchr(outptr, '\n', readlen); - do - { - char *rp; - char *rend; + if (eolptr) + { + readlen = eolptr - outptr + 1; + eol_found = true; + } + } + memcpy(ptr, outptr, readlen); + ptr = ((char *) ptr) + readlen; + state->outbufnext += readlen; + dsize += readlen; + remaining -= readlen; + if (eol_found || remaining == 0) + break; + /* We must have emptied outbuf */ + Assert(state->outbufnext >= state->outbufdata); + } - rsize = fread(readbuf, 1, size, state->fp); - if (rsize < size && !feof(state->fp)) + /* + * If we don't have any pending compressed data, load more into + * state->buffer. + */ + if (state->bufnext >= state->bufdata) { - pg_log_error("could not read from input file: %m"); - return -1; - } + size_t rsize; - rp = (char *) readbuf; - rend = (char *) readbuf + rsize; + rsize = fread(state->buffer, 1, state->buflen, state->fp); + if (rsize < state->buflen && !feof(state->fp)) + { + pg_log_error("could not read from input file: %m"); + return -1; + } + if (rsize == 0) + break; /* must be EOF */ + state->bufdata = rsize; + state->bufnext = 0; + } - while (rp < rend) + /* + * Decompress some data into state->outbuf. + */ { size_t status; - size_t outlen = state->buflen; - size_t read_remain = rend - rp; - - memset(state->buffer, 0, outlen); - status = LZ4F_decompress(state->dtx, state->buffer, &outlen, - rp, &read_remain, NULL); + size_t outlen = state->outbuflen; + size_t inlen = state->bufdata - state->bufnext; + + status = LZ4F_decompress(state->dtx, + state->outbuf, &outlen, + state->buffer + state->bufnext, + &inlen, + NULL); if (LZ4F_isError(status)) { state->errcode = status; @@ -512,54 +491,11 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) LZ4F_getErrorName(state->errcode)); return -1; } - - rp += read_remain; - - /* - * fill in what space is available in ptr if the eol flag is set, - * either skip if one already found or fill up to EOL if present - * in the outbuf - */ - if (outlen > 0 && dsize < size && eol_found == false) - { - char *p; - size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize; - size_t len = outlen < lib ? outlen : lib; - - if (eol_flag && - (p = memchr(state->buffer, '\n', outlen)) && - (size_t) (p - state->buffer + 1) <= len) - { - len = p - state->buffer + 1; - eol_found = true; - } - - memcpy((char *) ptr + dsize, state->buffer, len); - dsize += len; - - /* move what did not fit, if any, at the beginning of the buf */ - if (len < outlen) - memmove(state->buffer, state->buffer + len, outlen - len); - outlen -= len; - } - - /* if there is available output, save it */ - if (outlen > 0) - { - while (state->overflowlen + outlen > state->overflowalloclen) - { - state->overflowalloclen *= 2; - state->overflowbuf = pg_realloc(state->overflowbuf, - state->overflowalloclen); - } - - memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen); - state->overflowlen += outlen; - } + state->bufnext += inlen; + state->outbufdata = outlen; + state->outbufnext = 0; } - } while (rsize == size && dsize < size && eol_found == false); - - pg_free(readbuf); + } return dsize; } @@ -574,7 +510,7 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) size_t remaining = size; /* Lazy init */ - if (!LZ4Stream_init(state, size, true)) + if (!LZ4Stream_init(state, true)) pg_fatal("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); @@ -737,7 +673,7 @@ LZ4Stream_close(CompressFileHandle *CFH) if (LZ4F_isError(status)) pg_log_error("could not end decompression: %s", LZ4F_getErrorName(status)); - pg_free(state->overflowbuf); + pg_free(state->outbuf); } pg_free(state->buffer); |
