summaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/compress_io.c
diff options
context:
space:
mode:
authorAndrew Dunstan <andrew@dunslane.net>2013-03-24 11:27:20 -0400
committerAndrew Dunstan <andrew@dunslane.net>2013-03-24 11:27:20 -0400
commit9e257a181cc1dc5e19eb5d770ce09cc98f470f5f (patch)
treea2b5c7a40cfe004d4838cd3be32e0177096fafbf /src/bin/pg_dump/compress_io.c
parent3b91fe185a71c05ac4528f93a39ba27232acc9e0 (diff)
Add parallel pg_dump option.
New infrastructure is added which creates a set number of workers (threads on Windows, forked processes on Unix). Jobs are then handed out to these workers by the master process as needed. pg_restore is adjusted to use this new infrastructure in place of the old setup which created a new worker for each step on the fly. Parallel dumps acquire a snapshot clone in order to stay consistent, if available. The parallel option is selected by the -j / --jobs command line parameter of pg_dump. Joachim Wieland, lightly editorialized by Andrew Dunstan.
Diffstat (limited to 'src/bin/pg_dump/compress_io.c')
-rw-r--r--src/bin/pg_dump/compress_io.c10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 768b923ae5f..0308f66c49f 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -54,6 +54,7 @@
#include "compress_io.h"
#include "dumputils.h"
+#include "parallel.h"
/*----------------------
* Compressor API
@@ -182,6 +183,9 @@ size_t
WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen)
{
+ /* Are we aborting? */
+ checkAborting(AH);
+
switch (cs->comprAlg)
{
case COMPR_ALG_LIBZ:
@@ -351,6 +355,9 @@ ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
/* no minimal chunk size for zlib */
while ((cnt = readF(AH, &buf, &buflen)))
{
+ /* Are we aborting? */
+ checkAborting(AH);
+
zp->next_in = (void *) buf;
zp->avail_in = cnt;
@@ -411,6 +418,9 @@ ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
while ((cnt = readF(AH, &buf, &buflen)))
{
+ /* Are we aborting? */
+ checkAborting(AH);
+
ahwrite(buf, 1, cnt, AH);
}