summaryrefslogtreecommitdiff
path: root/src/backend/replication/basebackup_zstd.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2022-03-30 09:35:14 -0400
committerRobert Haas <rhaas@postgresql.org>2022-03-30 09:41:26 -0400
commit51c0d186d99a18e6aae53003f5138f20991e15a6 (patch)
tree0c37c8899e6448c1bd56919741f58e8e5a99c7b5 /src/backend/replication/basebackup_zstd.c
parentc6863b85829149e2241faafa161b6c5af1f06cb9 (diff)
Allow parallel zstd compression when taking a base backup.
libzstd allows transparent parallel compression just by setting an option when creating the compression context, so permit that for both client and server-side backup compression. To use this, use something like pg_basebackup --compress WHERE-zstd:workers=N where WHERE is "client" or "server" and N is an integer. When compression is performed on the server side, this will spawn threads inside the PostgreSQL backend. While there is almost no PostgreSQL server code which is thread-safe, the threads here are used internally by libzstd and touch only data structures controlled by libzstd. Patch by me, based in part on earlier work by Dipesh Pandit and Jeevan Ladhe. Reviewed by Justin Pryzby. Discussion: http://postgr.es/m/CA+Tgmobj6u-nWF-j=FemygUhobhryLxf9h-wJN7W-2rSsseHNA@mail.gmail.com
Diffstat (limited to 'src/backend/replication/basebackup_zstd.c')
-rw-r--r--src/backend/replication/basebackup_zstd.c45
1 files changed, 28 insertions, 17 deletions
diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index 5496eaa72b7..f6876f48118 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -25,8 +25,8 @@ typedef struct bbsink_zstd
/* Common information for all types of sink. */
bbsink base;
- /* Compression level */
- int compresslevel;
+ /* Compression options */
+ bc_specification *compress;
ZSTD_CCtx *cctx;
ZSTD_outBuffer zstd_outBuf;
@@ -67,22 +67,13 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
return NULL; /* keep compiler quiet */
#else
bbsink_zstd *sink;
- int compresslevel;
Assert(next != NULL);
- if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
- compresslevel = 0;
- else
- {
- compresslevel = compress->level;
- Assert(compresslevel >= 1 && compresslevel <= 22);
- }
-
sink = palloc0(sizeof(bbsink_zstd));
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
sink->base.bbs_next = next;
- sink->compresslevel = compresslevel;
+ sink->compress = compress;
return &sink->base;
#endif
@@ -99,16 +90,36 @@ bbsink_zstd_begin_backup(bbsink *sink)
bbsink_zstd *mysink = (bbsink_zstd *) sink;
size_t output_buffer_bound;
size_t ret;
+ bc_specification *compress = mysink->compress;
mysink->cctx = ZSTD_createCCtx();
if (!mysink->cctx)
elog(ERROR, "could not create zstd compression context");
- ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
- mysink->compresslevel);
- if (ZSTD_isError(ret))
- elog(ERROR, "could not set zstd compression level to %d: %s",
- mysink->compresslevel, ZSTD_getErrorName(ret));
+ if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
+ {
+ ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
+ compress->level);
+ if (ZSTD_isError(ret))
+ elog(ERROR, "could not set zstd compression level to %d: %s",
+ compress->level, ZSTD_getErrorName(ret));
+ }
+
+ if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
+ {
+ /*
+ * On older versions of libzstd, this option does not exist, and trying
+ * to set it will fail. Similarly for newer versions if they are
+ * compiled without threading support.
+ */
+ ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+ compress->workers);
+ if (ZSTD_isError(ret))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not set compression worker count to %d: %s",
+ compress->workers, ZSTD_getErrorName(ret)));
+ }
/*
* We need our own buffer, because we're going to pass different data to