/*------------------------------------------------------------------------- * * compress_lz4.c * Routines for archivers to write a LZ4 compressed data stream. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/bin/pg_dump/compress_lz4.c * *------------------------------------------------------------------------- */ #include "postgres_fe.h" #include #include "compress_lz4.h" #include "pg_backup_utils.h" #ifdef USE_LZ4 #include /* * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library. * Redefine it for installations with a lesser version. */ #ifndef LZ4F_HEADER_SIZE_MAX #define LZ4F_HEADER_SIZE_MAX 32 #endif /*--------------------------------- * Common to both compression APIs *--------------------------------- */ /* * (de)compression state used by both the Compressor and Stream APIs. */ typedef struct LZ4State { /* * Used by the Stream API to keep track of the file stream. */ FILE *fp; LZ4F_preferences_t prefs; LZ4F_compressionContext_t ctx; LZ4F_decompressionContext_t dtx; /* * Used by the Stream API's lazy initialization. */ bool inited; /* * Used by the Stream API to distinguish between compression and * decompression operations. */ bool compressing; /* * I/O buffer area. */ char *buffer; /* buffer for compressed data */ size_t buflen; /* allocated size of buffer */ size_t bufdata; /* amount of valid data currently in buffer */ /* These fields are used only while decompressing: */ size_t bufnext; /* next buffer position to decompress */ char *outbuf; /* buffer for decompressed data */ size_t outbuflen; /* allocated size of outbuf */ size_t outbufdata; /* amount of valid data currently in outbuf */ size_t outbufnext; /* next outbuf position to return */ /* * Used by both APIs to keep track of error codes. */ size_t errcode; } LZ4State; /* * LZ4State_compression_init * Initialize the required LZ4State members for compression. * * Write the LZ4 frame header in a buffer keeping track of its length. Users of * this function can choose when and how to write the header to a file stream. * * Returns true on success. In case of a failure returns false, and stores the * error code in state->errcode. */ static bool LZ4State_compression_init(LZ4State *state) { size_t status; /* * Compute size needed for buffer, assuming we will present at most * DEFAULT_IO_BUFFER_SIZE input bytes at a time. */ state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs); /* * Add some slop to ensure we're not forced to flush every time. * * The present slop factor of 50% is chosen so that the typical output * block size is about 128K when DEFAULT_IO_BUFFER_SIZE = 128K. We might * need a different slop factor to maintain that equivalence if * DEFAULT_IO_BUFFER_SIZE is changed dramatically. */ state->buflen += state->buflen / 2; /* * LZ4F_compressBegin requires a buffer that is greater or equal to * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met. */ if (state->buflen < LZ4F_HEADER_SIZE_MAX) state->buflen = LZ4F_HEADER_SIZE_MAX; status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION); if (LZ4F_isError(status)) { state->errcode = status; return false; } state->buffer = pg_malloc(state->buflen); /* * Insert LZ4 header into buffer. */ status = LZ4F_compressBegin(state->ctx, state->buffer, state->buflen, &state->prefs); if (LZ4F_isError(status)) { state->errcode = status; return false; } state->bufdata = status; return true; } /*---------------------- * Compressor API *---------------------- */ /* Private routines that support LZ4 compressed data I/O */ static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs) { size_t r; size_t readbuflen; char *outbuf; char *readbuf; LZ4F_decompressionContext_t ctx = NULL; LZ4F_decompressOptions_t dec_opt; LZ4F_errorCode_t status; memset(&dec_opt, 0, sizeof(dec_opt)); status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(status)) pg_fatal("could not create LZ4 decompression context: %s", LZ4F_getErrorName(status)); outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE); readbuflen = DEFAULT_IO_BUFFER_SIZE; while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0) { char *readp; char *readend; /* Process one chunk */ readp = readbuf; readend = readbuf + r; while (readp < readend) { size_t out_size = DEFAULT_IO_BUFFER_SIZE; size_t read_size = readend - readp; status = LZ4F_decompress(ctx, outbuf, &out_size, readp, &read_size, &dec_opt); if (LZ4F_isError(status)) pg_fatal("could not decompress: %s", LZ4F_getErrorName(status)); ahwrite(outbuf, 1, out_size, AH); readp += read_size; } } pg_free(outbuf); pg_free(readbuf); status = LZ4F_freeDecompressionContext(ctx); if (LZ4F_isError(status)) pg_fatal("could not free LZ4 decompression context: %s", LZ4F_getErrorName(status)); } static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen) { LZ4State *state = (LZ4State *) cs->private_data; size_t remaining = dLen; while (remaining > 0) { size_t chunk; size_t required; size_t status; /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); /* If not enough space, must flush buffer */ required = LZ4F_compressBound(chunk, &state->prefs); if (required > state->buflen - state->bufdata) { cs->writeF(AH, state->buffer, state->bufdata); state->bufdata = 0; } status = LZ4F_compressUpdate(state->ctx, state->buffer + state->bufdata, state->buflen - state->bufdata, data, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("could not compress data: %s", LZ4F_getErrorName(status)); state->bufdata += status; data = ((const char *) data) + chunk; remaining -= chunk; } } static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs) { LZ4State *state = (LZ4State *) cs->private_data; size_t required; size_t status; /* Nothing needs to be done */ if (!state) return; /* We might need to flush the buffer to make room for LZ4F_compressEnd */ required = LZ4F_compressBound(0, &state->prefs); if (required > state->buflen - state->bufdata) { cs->writeF(AH, state->buffer, state->bufdata); state->bufdata = 0; } status = LZ4F_compressEnd(state->ctx, state->buffer + state->bufdata, state->buflen - state->bufdata, NULL); if (LZ4F_isError(status)) pg_fatal("could not end compression: %s", LZ4F_getErrorName(status)); state->bufdata += status; /* Write the final bufferload */ cs->writeF(AH, state->buffer, state->bufdata); status = LZ4F_freeCompressionContext(state->ctx); if (LZ4F_isError(status)) pg_fatal("could not end compression: %s", LZ4F_getErrorName(status)); pg_free(state->buffer); pg_free(state); cs->private_data = NULL; } /* * Public routines that support LZ4 compressed data I/O */ void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec) { LZ4State *state; cs->readData = ReadDataFromArchiveLZ4; cs->writeData = WriteDataToArchiveLZ4; cs->end = EndCompressorLZ4; cs->compression_spec = compression_spec; /* * Read operations have access to the whole input. No state needs to be * carried between calls. */ if (cs->readF) return; state = pg_malloc0(sizeof(*state)); if (cs->compression_spec.level >= 0) state->prefs.compressionLevel = cs->compression_spec.level; if (!LZ4State_compression_init(state)) pg_fatal("could not initialize LZ4 compression: %s", LZ4F_getErrorName(state->errcode)); cs->private_data = state; } /*---------------------- * Compress Stream API *---------------------- */ /* * LZ4 equivalent to feof() or gzeof(). Return true iff there is no * more buffered data and the end of the input file has been reached. */ static bool LZ4Stream_eof(CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; return state->outbufnext >= state->outbufdata && state->bufnext >= state->bufdata && feof(state->fp); } static const char * LZ4Stream_get_error(CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; const char *errmsg; if (LZ4F_isError(state->errcode)) errmsg = LZ4F_getErrorName(state->errcode); else errmsg = strerror(errno); return errmsg; } /* * Initialize an already alloc'ed LZ4State struct for subsequent calls. * * Creates the necessary contexts for either compression or decompression. When * compressing data (indicated by compressing=true), it additionally writes the * LZ4 header in the output buffer. * * It's expected that a not-yet-initialized LZ4State will be zero-filled. * * Returns true on success. In case of a failure returns false, and stores the * error code in state->errcode. */ static bool LZ4Stream_init(LZ4State *state, bool compressing) { size_t status; if (state->inited) return true; state->compressing = compressing; if (state->compressing) { if (!LZ4State_compression_init(state)) return false; } else { status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION); if (LZ4F_isError(status)) { state->errcode = status; return false; } state->buflen = DEFAULT_IO_BUFFER_SIZE; state->buffer = pg_malloc(state->buflen); state->outbuflen = DEFAULT_IO_BUFFER_SIZE; state->outbuf = pg_malloc(state->outbuflen); } state->inited = true; return true; } /* * The workhorse for reading decompressed content out of an LZ4 compressed * stream. * * It will read up to 'ptrsize' decompressed content, or up to the new line * char if one is found first when the eol_flag is set. * * Returns the number of bytes of decompressed data copied into the ptr * buffer, or -1 in case of error. */ static int LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag) { int dsize = 0; int remaining = ptrsize; /* Lazy init */ if (!LZ4Stream_init(state, false /* decompressing */ )) { pg_log_error("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); return -1; } /* Loop until postcondition is satisfied */ while (remaining > 0) { /* * If we already have some decompressed data, return that. */ if (state->outbufnext < state->outbufdata) { char *outptr = state->outbuf + state->outbufnext; size_t readlen = state->outbufdata - state->outbufnext; bool eol_found = false; if (readlen > remaining) readlen = remaining; /* If eol_flag is set, don't read beyond a newline */ if (eol_flag) { char *eolptr = memchr(outptr, '\n', readlen); if (eolptr) { readlen = eolptr - outptr + 1; eol_found = true; } } memcpy(ptr, outptr, readlen); ptr = ((char *) ptr) + readlen; state->outbufnext += readlen; dsize += readlen; remaining -= readlen; if (eol_found || remaining == 0) break; /* We must have emptied outbuf */ Assert(state->outbufnext >= state->outbufdata); } /* * If we don't have any pending compressed data, load more into * state->buffer. */ if (state->bufnext >= state->bufdata) { size_t rsize; rsize = fread(state->buffer, 1, state->buflen, state->fp); if (rsize < state->buflen && !feof(state->fp)) { pg_log_error("could not read from input file: %m"); return -1; } if (rsize == 0) break; /* must be EOF */ state->bufdata = rsize; state->bufnext = 0; } /* * Decompress some data into state->outbuf. */ { size_t status; size_t outlen = state->outbuflen; size_t inlen = state->bufdata - state->bufnext; status = LZ4F_decompress(state->dtx, state->outbuf, &outlen, state->buffer + state->bufnext, &inlen, NULL); if (LZ4F_isError(status)) { state->errcode = status; pg_log_error("could not read from input file: %s", LZ4F_getErrorName(state->errcode)); return -1; } state->bufnext += inlen; state->outbufdata = outlen; state->outbufnext = 0; } } return dsize; } /* * Compress size bytes from ptr and write them to the stream. */ static void LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; size_t remaining = size; /* Lazy init */ if (!LZ4Stream_init(state, true)) pg_fatal("unable to initialize LZ4 library: %s", LZ4F_getErrorName(state->errcode)); while (remaining > 0) { size_t chunk; size_t required; size_t status; /* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */ chunk = Min(remaining, (size_t) DEFAULT_IO_BUFFER_SIZE); /* If not enough space, must flush buffer */ required = LZ4F_compressBound(chunk, &state->prefs); if (required > state->buflen - state->bufdata) { errno = 0; if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { errno = (errno) ? errno : ENOSPC; pg_fatal("error during writing: %m"); } state->bufdata = 0; } status = LZ4F_compressUpdate(state->ctx, state->buffer + state->bufdata, state->buflen - state->bufdata, ptr, chunk, NULL); if (LZ4F_isError(status)) pg_fatal("error during writing: %s", LZ4F_getErrorName(status)); state->bufdata += status; ptr = ((const char *) ptr) + chunk; remaining -= chunk; } } /* * fread() equivalent implementation for LZ4 compressed files. */ static size_t LZ4Stream_read(void *ptr, size_t size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; int ret; if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0) pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH)); return (size_t) ret; } /* * fgetc() equivalent implementation for LZ4 compressed files. */ static int LZ4Stream_getc(CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; unsigned char c; if (LZ4Stream_read_internal(state, &c, 1, false) <= 0) { if (!LZ4Stream_eof(CFH)) pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH)); else pg_fatal("could not read from input file: end of file"); } return c; } /* * fgets() equivalent implementation for LZ4 compressed files. */ static char * LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; int ret; ret = LZ4Stream_read_internal(state, ptr, size - 1, true); /* * LZ4Stream_read_internal returning 0 or -1 means that it was either an * EOF or an error, but gets_func is defined to return NULL in either case * so we can treat both the same here. */ if (ret <= 0) return NULL; /* * Our caller expects the return string to be NULL terminated and we know * that ret is greater than zero. */ ptr[ret - 1] = '\0'; return ptr; } /* * Finalize (de)compression of a stream. When compressing it will write any * remaining content and/or generated footer from the LZ4 API. */ static bool LZ4Stream_close(CompressFileHandle *CFH) { FILE *fp; LZ4State *state = (LZ4State *) CFH->private_data; size_t required; size_t status; int ret; fp = state->fp; if (state->inited) { if (state->compressing) { /* We might need to flush the buffer to make room */ required = LZ4F_compressBound(0, &state->prefs); if (required > state->buflen - state->bufdata) { errno = 0; if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { errno = (errno) ? errno : ENOSPC; pg_log_error("could not write to output file: %m"); } state->bufdata = 0; } status = LZ4F_compressEnd(state->ctx, state->buffer + state->bufdata, state->buflen - state->bufdata, NULL); if (LZ4F_isError(status)) { pg_log_error("could not end compression: %s", LZ4F_getErrorName(status)); } else state->bufdata += status; errno = 0; if (fwrite(state->buffer, 1, state->bufdata, state->fp) != state->bufdata) { errno = (errno) ? errno : ENOSPC; pg_log_error("could not write to output file: %m"); } status = LZ4F_freeCompressionContext(state->ctx); if (LZ4F_isError(status)) pg_log_error("could not end compression: %s", LZ4F_getErrorName(status)); } else { status = LZ4F_freeDecompressionContext(state->dtx); if (LZ4F_isError(status)) pg_log_error("could not end decompression: %s", LZ4F_getErrorName(status)); pg_free(state->outbuf); } pg_free(state->buffer); } pg_free(state); CFH->private_data = NULL; errno = 0; ret = fclose(fp); if (ret != 0) { pg_log_error("could not close file: %m"); return false; } return true; } static bool LZ4Stream_open(const char *path, int fd, const char *mode, CompressFileHandle *CFH) { LZ4State *state = (LZ4State *) CFH->private_data; if (fd >= 0) state->fp = fdopen(dup(fd), mode); else state->fp = fopen(path, mode); if (state->fp == NULL) { state->errcode = errno; return false; } return true; } static bool LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH) { char *fname; int save_errno; bool ret; fname = psprintf("%s.lz4", path); ret = CFH->open_func(fname, -1, mode, CFH); save_errno = errno; pg_free(fname); errno = save_errno; return ret; } /* * Public routines */ void InitCompressFileHandleLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec) { LZ4State *state; CFH->open_func = LZ4Stream_open; CFH->open_write_func = LZ4Stream_open_write; CFH->read_func = LZ4Stream_read; CFH->write_func = LZ4Stream_write; CFH->gets_func = LZ4Stream_gets; CFH->getc_func = LZ4Stream_getc; CFH->eof_func = LZ4Stream_eof; CFH->close_func = LZ4Stream_close; CFH->get_error_func = LZ4Stream_get_error; CFH->compression_spec = compression_spec; state = pg_malloc0(sizeof(*state)); if (CFH->compression_spec.level >= 0) state->prefs.compressionLevel = CFH->compression_spec.level; CFH->private_data = state; } #else /* USE_LZ4 */ void InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec) { pg_fatal("this build does not support compression with %s", "LZ4"); } void InitCompressFileHandleLZ4(CompressFileHandle *CFH, const pg_compress_specification compression_spec) { pg_fatal("this build does not support compression with %s", "LZ4"); } #endif /* USE_LZ4 */