summaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_backup_archiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c357
1 files changed, 213 insertions, 144 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 927950caff2..e29265953d4 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -61,11 +61,28 @@
#define thandle HANDLE
#endif
+typedef struct ParallelStateEntry
+{
+#ifdef WIN32
+ unsigned int threadId;
+#else
+ pid_t pid;
+#endif
+ ArchiveHandle *AH;
+} ParallelStateEntry;
+
+typedef struct ParallelState
+{
+ int numWorkers;
+ ParallelStateEntry *pse;
+} ParallelState;
+
/* Arguments needed for a worker child */
typedef struct _restore_args
{
ArchiveHandle *AH;
TocEntry *te;
+ ParallelStateEntry *pse;
} RestoreArgs;
/* State for each parallel activity slot */
@@ -75,6 +92,14 @@ typedef struct _parallel_slot
RestoreArgs *args;
} ParallelSlot;
+typedef struct ShutdownInformation
+{
+ ParallelState *pstate;
+ Archive *AHX;
+} ShutdownInformation;
+
+static ShutdownInformation shutdown_info;
+
#define NO_SLOT (-1)
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
@@ -122,10 +147,6 @@ static int _discoverArchiveFormat(ArchiveHandle *AH);
static int RestoringToDB(ArchiveHandle *AH);
static void dump_lo_buf(ArchiveHandle *AH);
-static void vdie_horribly(ArchiveHandle *AH, const char *modulename,
- const char *fmt, va_list ap)
- __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0), noreturn));
-
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
static OutputContext SaveOutput(ArchiveHandle *AH);
@@ -160,6 +181,11 @@ static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
static void DeCloneArchive(ArchiveHandle *AH);
+static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH);
+static void unsetProcessIdentifier(ParallelStateEntry *pse);
+static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate);
+static void archive_close_connection(int code, void *arg);
+
/*
* Wrapper functions.
@@ -208,8 +234,8 @@ CloseArchive(Archive *AHX)
res = fclose(AH->OF);
if (res != 0)
- die_horribly(AH, modulename, "could not close output file: %s\n",
- strerror(errno));
+ exit_horribly(modulename, "could not close output file: %s\n",
+ strerror(errno));
}
/* Public */
@@ -234,14 +260,14 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
* connected to, not the one we will create, which is very bad...
*/
if (ropt->createDB && ropt->dropSchema)
- die_horribly(AH, modulename, "-C and -c are incompatible options\n");
+ exit_horribly(modulename, "-C and -c are incompatible options\n");
/*
* -C is not compatible with -1, because we can't create a database inside
* a transaction block.
*/
if (ropt->createDB && ropt->single_txn)
- die_horribly(AH, modulename, "-C and -1 are incompatible options\n");
+ exit_horribly(modulename, "-C and -1 are incompatible options\n");
/*
* If we're going to do parallel restore, there are some restrictions.
@@ -251,11 +277,11 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
/* We haven't got round to making this work for all archive formats */
if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
- die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
+ exit_horribly(modulename, "parallel restore is not supported with this archive file format\n");
/* Doesn't work if the archive represents dependencies as OIDs */
if (AH->version < K_VERS_1_8)
- die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
+ exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
/*
* It's also not gonna work if we can't reopen the input file, so
@@ -274,7 +300,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
reqs = _tocEntryRequired(te, ropt, false);
if (te->hadDumper && (reqs & REQ_DATA) != 0)
- die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
+ exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
}
}
#endif
@@ -286,7 +312,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
ahlog(AH, 1, "connecting to database for restore\n");
if (AH->version < K_VERS_1_3)
- die_horribly(AH, modulename, "direct database connections are not supported in pre-1.3 archives\n");
+ exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n");
/* XXX Should get this from the archive */
AHX->minRemoteVersion = 070100;
@@ -734,7 +760,7 @@ WriteData(Archive *AHX, const void *data, size_t dLen)
ArchiveHandle *AH = (ArchiveHandle *) AHX;
if (!AH->currToc)
- die_horribly(AH, modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
+ exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n");
return (*AH->WriteDataPtr) (AH, data, dLen);
}
@@ -886,7 +912,7 @@ StartBlob(Archive *AHX, Oid oid)
ArchiveHandle *AH = (ArchiveHandle *) AHX;
if (!AH->StartBlobPtr)
- die_horribly(AH, modulename, "large-object output not supported in chosen format\n");
+ exit_horribly(modulename, "large-object output not supported in chosen format\n");
(*AH->StartBlobPtr) (AH, AH->currToc, oid);
@@ -973,13 +999,13 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
{
loOid = lo_create(AH->connection, oid);
if (loOid == 0 || loOid != oid)
- die_horribly(AH, modulename, "could not create large object %u: %s",
- oid, PQerrorMessage(AH->connection));
+ exit_horribly(modulename, "could not create large object %u: %s",
+ oid, PQerrorMessage(AH->connection));
}
AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
if (AH->loFd == -1)
- die_horribly(AH, modulename, "could not open large object %u: %s",
- oid, PQerrorMessage(AH->connection));
+ exit_horribly(modulename, "could not open large object %u: %s",
+ oid, PQerrorMessage(AH->connection));
}
else
{
@@ -1035,8 +1061,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
/* Setup the file */
fh = fopen(ropt->tocFile, PG_BINARY_R);
if (!fh)
- die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n",
- ropt->tocFile, strerror(errno));
+ exit_horribly(modulename, "could not open TOC file \"%s\": %s\n",
+ ropt->tocFile, strerror(errno));
incomplete_line = false;
while (fgets(buf, sizeof(buf), fh) != NULL)
@@ -1083,8 +1109,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
/* Find TOC entry */
te = getTocEntryByDumpId(AH, id);
if (!te)
- die_horribly(AH, modulename, "could not find entry for ID %d\n",
- id);
+ exit_horribly(modulename, "could not find entry for ID %d\n",
+ id);
/* Mark it wanted */
ropt->idWanted[id - 1] = true;
@@ -1104,8 +1130,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
}
if (fclose(fh) != 0)
- die_horribly(AH, modulename, "could not close TOC file: %s\n",
- strerror(errno));
+ exit_horribly(modulename, "could not close TOC file: %s\n",
+ strerror(errno));
}
/*
@@ -1221,11 +1247,11 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression)
if (!AH->OF)
{
if (filename)
- die_horribly(AH, modulename, "could not open output file \"%s\": %s\n",
- filename, strerror(errno));
+ exit_horribly(modulename, "could not open output file \"%s\": %s\n",
+ filename, strerror(errno));
else
- die_horribly(AH, modulename, "could not open output file: %s\n",
- strerror(errno));
+ exit_horribly(modulename, "could not open output file: %s\n",
+ strerror(errno));
}
}
@@ -1251,7 +1277,7 @@ RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
res = fclose(AH->OF);
if (res != 0)
- die_horribly(AH, modulename, "could not close output file: %s\n",
+ exit_horribly(modulename, "could not close output file: %s\n",
strerror(errno));
AH->gzOut = savedContext.gzOut;
@@ -1329,7 +1355,7 @@ dump_lo_buf(ArchiveHandle *AH)
AH->lo_buf_used),
(unsigned long) AH->lo_buf_used, (unsigned long) res);
if (res != AH->lo_buf_used)
- die_horribly(AH, modulename,
+ exit_horribly(modulename,
"could not write to large object (result: %lu, expected: %lu)\n",
(unsigned long) res, (unsigned long) AH->lo_buf_used);
}
@@ -1388,7 +1414,7 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
{
res = GZWRITE(ptr, size, nmemb, AH->OF);
if (res != (nmemb * size))
- die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno));
+ exit_horribly(modulename, "could not write to output file: %s\n", strerror(errno));
return res;
}
else if (AH->CustomOutPtr)
@@ -1396,7 +1422,7 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
res = AH->CustomOutPtr (AH, ptr, size * nmemb);
if (res != (nmemb * size))
- die_horribly(AH, modulename, "could not write to custom output routine\n");
+ exit_horribly(modulename, "could not write to custom output routine\n");
return res;
}
else
@@ -1411,56 +1437,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
{
res = fwrite(ptr, size, nmemb, AH->OF);
if (res != nmemb)
- die_horribly(AH, modulename, "could not write to output file: %s\n",
+ exit_horribly(modulename, "could not write to output file: %s\n",
strerror(errno));
return res;
}
}
}
-
-/* Report a fatal error and exit(1) */
-static void
-vdie_horribly(ArchiveHandle *AH, const char *modulename,
- const char *fmt, va_list ap)
-{
- vwrite_msg(modulename, fmt, ap);
-
- if (AH)
- {
- if (AH->public.verbose)
- write_msg(NULL, "*** aborted because of error\n");
- DisconnectDatabase(&AH->public);
- }
-
- exit_nicely(1);
-}
-
-/* As above, but with variable arg list */
-void
-die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- vdie_horribly(AH, modulename, fmt, ap);
- va_end(ap);
-}
-
-/* As above, but with a complaint about a particular query. */
-void
-die_on_query_failure(ArchiveHandle *AH, const char *modulename,
- const char *query)
-{
- write_msg(modulename, "query failed: %s",
- PQerrorMessage(AH->connection));
- die_horribly(AH, modulename, "query was: %s\n", query);
-}
-
/* on some error, we may decide to go on... */
void
-warn_or_die_horribly(ArchiveHandle *AH,
- const char *modulename, const char *fmt,...)
+warn_or_exit_horribly(ArchiveHandle *AH,
+ const char *modulename, const char *fmt,...)
{
va_list ap;
@@ -1497,14 +1484,13 @@ warn_or_die_horribly(ArchiveHandle *AH,
AH->lastErrorTE = AH->currentTE;
va_start(ap, fmt);
+ vwrite_msg(modulename, fmt, ap);
+ va_end(ap);
+
if (AH->public.exit_on_error)
- vdie_horribly(AH, modulename, fmt, ap);
+ exit_nicely(1);
else
- {
- vwrite_msg(modulename, fmt, ap);
AH->public.n_errors++;
- }
- va_end(ap);
}
#ifdef NOT_USED
@@ -1623,7 +1609,7 @@ ReadOffset(ArchiveHandle *AH, pgoff_t * o)
break;
default:
- die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg);
+ exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg);
}
/*
@@ -1636,7 +1622,7 @@ ReadOffset(ArchiveHandle *AH, pgoff_t * o)
else
{
if ((*AH->ReadBytePtr) (AH) != 0)
- die_horribly(AH, modulename, "file offset in dump file is too large\n");
+ exit_horribly(modulename, "file offset in dump file is too large\n");
}
}
@@ -1730,7 +1716,7 @@ ReadStr(ArchiveHandle *AH)
{
buf = (char *) pg_malloc(l + 1);
if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l)
- die_horribly(AH, modulename, "unexpected end of file\n");
+ exit_horribly(modulename, "unexpected end of file\n");
buf[l] = '\0';
}
@@ -1773,8 +1759,8 @@ _discoverArchiveFormat(ArchiveHandle *AH)
char buf[MAXPGPATH];
if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
- die_horribly(AH, modulename, "directory name too long: \"%s\"\n",
- AH->fSpec);
+ exit_horribly(modulename, "directory name too long: \"%s\"\n",
+ AH->fSpec);
if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
{
AH->format = archDirectory;
@@ -1783,32 +1769,32 @@ _discoverArchiveFormat(ArchiveHandle *AH)
#ifdef HAVE_LIBZ
if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
- die_horribly(AH, modulename, "directory name too long: \"%s\"\n",
- AH->fSpec);
+ exit_horribly(modulename, "directory name too long: \"%s\"\n",
+ AH->fSpec);
if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
{
AH->format = archDirectory;
return AH->format;
}
#endif
- die_horribly(AH, modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
- AH->fSpec);
+ exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n",
+ AH->fSpec);
fh = NULL; /* keep compiler quiet */
}
else
{
fh = fopen(AH->fSpec, PG_BINARY_R);
if (!fh)
- die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
- AH->fSpec, strerror(errno));
+ exit_horribly(modulename, "could not open input file \"%s\": %s\n",
+ AH->fSpec, strerror(errno));
}
}
else
{
fh = stdin;
if (!fh)
- die_horribly(AH, modulename, "could not open input file: %s\n",
- strerror(errno));
+ exit_horribly(modulename, "could not open input file: %s\n",
+ strerror(errno));
}
cnt = fread(sig, 1, 5, fh);
@@ -1816,10 +1802,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
if (cnt != 5)
{
if (ferror(fh))
- die_horribly(AH, modulename, "could not read input file: %s\n", strerror(errno));
+ exit_horribly(modulename, "could not read input file: %s\n", strerror(errno));
else
- die_horribly(AH, modulename, "input file is too short (read %lu, expected 5)\n",
- (unsigned long) cnt);
+ exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n",
+ (unsigned long) cnt);
}
/* Save it, just in case we need it later */
@@ -1880,14 +1866,14 @@ _discoverArchiveFormat(ArchiveHandle *AH)
strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
{
/* looks like it's probably a text format dump. so suggest they try psql */
- die_horribly(AH, modulename, "input file appears to be a text format dump. Please use psql.\n");
+ exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n");
}
if (AH->lookaheadLen != 512)
- die_horribly(AH, modulename, "input file does not appear to be a valid archive (too short?)\n");
+ exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n");
if (!isValidTarHeader(AH->lookahead))
- die_horribly(AH, modulename, "input file does not appear to be a valid archive\n");
+ exit_horribly(modulename, "input file does not appear to be a valid archive\n");
AH->format = archTar;
}
@@ -1907,8 +1893,8 @@ _discoverArchiveFormat(ArchiveHandle *AH)
/* Close the file */
if (wantClose)
if (fclose(fh) != 0)
- die_horribly(AH, modulename, "could not close input file: %s\n",
- strerror(errno));
+ exit_horribly(modulename, "could not close input file: %s\n",
+ strerror(errno));
return AH->format;
}
@@ -2027,7 +2013,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
break;
default:
- die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt);
+ exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt);
}
return AH;
@@ -2149,9 +2135,9 @@ ReadToc(ArchiveHandle *AH)
/* Sanity check */
if (te->dumpId <= 0)
- die_horribly(AH, modulename,
- "entry ID %d out of range -- perhaps a corrupt TOC\n",
- te->dumpId);
+ exit_horribly(modulename,
+ "entry ID %d out of range -- perhaps a corrupt TOC\n",
+ te->dumpId);
te->hadDumper = ReadInt(AH);
@@ -2306,13 +2292,13 @@ processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
*ptr2 = '\0';
encoding = pg_char_to_encoding(ptr1);
if (encoding < 0)
- die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n",
- ptr1);
+ exit_horribly(modulename, "unrecognized encoding \"%s\"\n",
+ ptr1);
AH->public.encoding = encoding;
}
else
- die_horribly(AH, modulename, "invalid ENCODING item: %s\n",
- te->defn);
+ exit_horribly(modulename, "invalid ENCODING item: %s\n",
+ te->defn);
free(defn);
}
@@ -2329,8 +2315,8 @@ processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
AH->public.std_strings = false;
else
- die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n",
- te->defn);
+ exit_horribly(modulename, "invalid STDSTRINGS item: %s\n",
+ te->defn);
}
static teReqs
@@ -2537,9 +2523,9 @@ _doSetSessionAuth(ArchiveHandle *AH, const char *user)
res = PQexec(AH->connection, cmd->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
- /* NOT warn_or_die_horribly... use -O instead to skip this. */
- die_horribly(AH, modulename, "could not set session user to \"%s\": %s",
- user, PQerrorMessage(AH->connection));
+ /* NOT warn_or_exit_horribly... use -O instead to skip this. */
+ exit_horribly(modulename, "could not set session user to \"%s\": %s",
+ user, PQerrorMessage(AH->connection));
PQclear(res);
}
@@ -2569,9 +2555,9 @@ _doSetWithOids(ArchiveHandle *AH, const bool withOids)
res = PQexec(AH->connection, cmd->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
- warn_or_die_horribly(AH, modulename,
- "could not set default_with_oids: %s",
- PQerrorMessage(AH->connection));
+ warn_or_exit_horribly(AH, modulename,
+ "could not set default_with_oids: %s",
+ PQerrorMessage(AH->connection));
PQclear(res);
}
@@ -2707,9 +2693,9 @@ _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
res = PQexec(AH->connection, qry->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
- warn_or_die_horribly(AH, modulename,
- "could not set search_path to \"%s\": %s",
- schemaName, PQerrorMessage(AH->connection));
+ warn_or_exit_horribly(AH, modulename,
+ "could not set search_path to \"%s\": %s",
+ schemaName, PQerrorMessage(AH->connection));
PQclear(res);
}
@@ -2768,9 +2754,9 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace)
res = PQexec(AH->connection, qry->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
- warn_or_die_horribly(AH, modulename,
- "could not set default_tablespace to %s: %s",
- fmtId(want), PQerrorMessage(AH->connection));
+ warn_or_exit_horribly(AH, modulename,
+ "could not set default_tablespace to %s: %s",
+ fmtId(want), PQerrorMessage(AH->connection));
PQclear(res);
}
@@ -3150,10 +3136,10 @@ ReadHead(ArchiveHandle *AH)
if (!AH->readHeader)
{
if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5)
- die_horribly(AH, modulename, "unexpected end of file\n");
+ exit_horribly(modulename, "unexpected end of file\n");
if (strncmp(tmpMag, "PGDMP", 5) != 0)
- die_horribly(AH, modulename, "did not find magic string in file header\n");
+ exit_horribly(modulename, "did not find magic string in file header\n");
AH->vmaj = (*AH->ReadBytePtr) (AH);
AH->vmin = (*AH->ReadBytePtr) (AH);
@@ -3166,13 +3152,13 @@ ReadHead(ArchiveHandle *AH)
AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
- die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n",
- AH->vmaj, AH->vmin);
+ exit_horribly(modulename, "unsupported version (%d.%d) in file header\n",
+ AH->vmaj, AH->vmin);
AH->intSize = (*AH->ReadBytePtr) (AH);
if (AH->intSize > 32)
- die_horribly(AH, modulename, "sanity check on integer size (%lu) failed\n",
- (unsigned long) AH->intSize);
+ exit_horribly(modulename, "sanity check on integer size (%lu) failed\n",
+ (unsigned long) AH->intSize);
if (AH->intSize > sizeof(int))
write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n");
@@ -3185,8 +3171,8 @@ ReadHead(ArchiveHandle *AH)
fmt = (*AH->ReadBytePtr) (AH);
if (AH->format != fmt)
- die_horribly(AH, modulename, "expected format (%d) differs from format found in file (%d)\n",
- AH->format, fmt);
+ exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n",
+ AH->format, fmt);
}
if (AH->version >= K_VERS_1_2)
@@ -3290,6 +3276,66 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
ahprintf(AH, "-- %s %s\n\n", msg, buf);
}
+static void
+setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH)
+{
+#ifdef WIN32
+ pse->threadId = GetCurrentThreadId();
+#else
+ pse->pid = getpid();
+#endif
+ pse->AH = AH;
+}
+
+static void
+unsetProcessIdentifier(ParallelStateEntry *pse)
+{
+#ifdef WIN32
+ pse->threadId = 0;
+#else
+ pse->pid = 0;
+#endif
+ pse->AH = NULL;
+}
+
+static ParallelStateEntry *
+GetMyPSEntry(ParallelState *pstate)
+{
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+#ifdef WIN32
+ if (pstate->pse[i].threadId == GetCurrentThreadId())
+#else
+ if (pstate->pse[i].pid == getpid())
+#endif
+ return &(pstate->pse[i]);
+
+ return NULL;
+}
+
+static void
+archive_close_connection(int code, void *arg)
+{
+ ShutdownInformation *si = (ShutdownInformation *) arg;
+
+ if (si->pstate)
+ {
+ ParallelStateEntry *entry = GetMyPSEntry(si->pstate);
+
+ if (entry != NULL && entry->AH)
+ DisconnectDatabase(&(entry->AH->public));
+ }
+ else if (si->AHX)
+ DisconnectDatabase(si->AHX);
+}
+
+void
+on_exit_close_archive(Archive *AHX)
+{
+ shutdown_info.AHX = AHX;
+ on_exit_nicely(archive_close_connection, &shutdown_info);
+}
/*
* Main engine for parallel restore.
@@ -3316,10 +3362,17 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
TocEntry *next_work_item;
thandle ret_child;
TocEntry *te;
+ ParallelState *pstate;
+ int i;
ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
- slots = (ParallelSlot *) pg_calloc(sizeof(ParallelSlot), n_slots);
+ slots = (ParallelSlot *) pg_calloc(n_slots, sizeof(ParallelSlot));
+ pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
+ pstate->pse = (ParallelStateEntry *) pg_calloc(n_slots, sizeof(ParallelStateEntry));
+ pstate->numWorkers = ropt->number_of_jobs;
+ for (i = 0; i < pstate->numWorkers; i++)
+ unsetProcessIdentifier(&(pstate->pse[i]));
/* Adjust dependency information */
fix_dependencies(AH);
@@ -3375,6 +3428,12 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
*/
DisconnectDatabase(&AH->public);
+ /*
+ * Set the pstate in the shutdown_info. The exit handler uses pstate if set
+ * and falls back to AHX otherwise.
+ */
+ shutdown_info.pstate = pstate;
+
/* blow away any transient state from the old connection */
if (AH->currUser)
free(AH->currUser);
@@ -3473,6 +3532,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
args = pg_malloc(sizeof(RestoreArgs));
args->AH = CloneArchive(AH);
args->te = next_work_item;
+ args->pse = &pstate->pse[next_slot];
/* run the step in a worker child */
child = spawn_restore(args);
@@ -3500,14 +3560,20 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
}
else
{
- die_horribly(AH, modulename, "worker process crashed: status %d\n",
- work_status);
+ exit_horribly(modulename, "worker process crashed: status %d\n",
+ work_status);
}
}
ahlog(AH, 1, "finished main parallel loop\n");
/*
+ * Remove the pstate again, so the exit handler will now fall back to
+ * closing AH->connection again.
+ */
+ shutdown_info.pstate = NULL;
+
+ /*
* Now reconnect the single parent connection.
*/
ConnectDatabase((Archive *) AH, ropt->dbname,
@@ -3548,23 +3614,23 @@ spawn_restore(RestoreArgs *args)
{
/* in child process */
parallel_restore(args);
- die_horribly(args->AH, modulename,
- "parallel_restore should not return\n");
+ exit_horribly(modulename,
+ "parallel_restore should not return\n");
}
else if (child < 0)
{
/* fork failed */
- die_horribly(args->AH, modulename,
- "could not create worker process: %s\n",
- strerror(errno));
+ exit_horribly(modulename,
+ "could not create worker process: %s\n",
+ strerror(errno));
}
#else
child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
args, 0, NULL);
if (child == 0)
- die_horribly(args->AH, modulename,
- "could not create worker thread: %s\n",
- strerror(errno));
+ exit_horribly(modulename,
+ "could not create worker thread: %s\n",
+ strerror(errno));
#endif
return child;
@@ -3806,6 +3872,8 @@ parallel_restore(RestoreArgs *args)
RestoreOptions *ropt = AH->ropt;
int retval;
+ setProcessIdentifier(args->pse, AH);
+
/*
* Close and reopen the input file so we have a private file pointer that
* doesn't stomp on anyone else's file pointer, if we're actually going to
@@ -3836,6 +3904,7 @@ parallel_restore(RestoreArgs *args)
/* And clean up */
DisconnectDatabase((Archive *) AH);
+ unsetProcessIdentifier(args->pse);
/* If we reopened the file, we are done with it, so close it now */
if (te->section == SECTION_DATA)
@@ -3881,7 +3950,7 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
}
if (te == NULL)
- die_horribly(AH, modulename, "could not find slot of finished worker\n");
+ exit_horribly(modulename, "could not find slot of finished worker\n");
ahlog(AH, 1, "finished item %d %s %s\n",
te->dumpId, te->desc, te->tag);
@@ -3896,8 +3965,8 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
else if (status == WORKER_IGNORED_ERRORS)
AH->public.n_errors++;
else if (status != 0)
- die_horribly(AH, modulename, "worker process failed: exit code %d\n",
- status);
+ exit_horribly(modulename, "worker process failed: exit code %d\n",
+ status);
reduce_dependencies(AH, te, ready_list);
}