diff options
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 357 |
1 files changed, 213 insertions, 144 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 927950caff2..e29265953d4 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -61,11 +61,28 @@ #define thandle HANDLE #endif +typedef struct ParallelStateEntry +{ +#ifdef WIN32 + unsigned int threadId; +#else + pid_t pid; +#endif + ArchiveHandle *AH; +} ParallelStateEntry; + +typedef struct ParallelState +{ + int numWorkers; + ParallelStateEntry *pse; +} ParallelState; + /* Arguments needed for a worker child */ typedef struct _restore_args { ArchiveHandle *AH; TocEntry *te; + ParallelStateEntry *pse; } RestoreArgs; /* State for each parallel activity slot */ @@ -75,6 +92,14 @@ typedef struct _parallel_slot RestoreArgs *args; } ParallelSlot; +typedef struct ShutdownInformation +{ + ParallelState *pstate; + Archive *AHX; +} ShutdownInformation; + +static ShutdownInformation shutdown_info; + #define NO_SLOT (-1) #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" @@ -122,10 +147,6 @@ static int _discoverArchiveFormat(ArchiveHandle *AH); static int RestoringToDB(ArchiveHandle *AH); static void dump_lo_buf(ArchiveHandle *AH); -static void vdie_horribly(ArchiveHandle *AH, const char *modulename, - const char *fmt, va_list ap) - __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0), noreturn)); - static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim); static void SetOutput(ArchiveHandle *AH, const char *filename, int compression); static OutputContext SaveOutput(ArchiveHandle *AH); @@ -160,6 +181,11 @@ static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); static ArchiveHandle *CloneArchive(ArchiveHandle *AH); static void DeCloneArchive(ArchiveHandle *AH); +static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH); +static void unsetProcessIdentifier(ParallelStateEntry *pse); +static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate); +static void archive_close_connection(int code, void *arg); + /* * Wrapper functions. @@ -208,8 +234,8 @@ CloseArchive(Archive *AHX) res = fclose(AH->OF); if (res != 0) - die_horribly(AH, modulename, "could not close output file: %s\n", - strerror(errno)); + exit_horribly(modulename, "could not close output file: %s\n", + strerror(errno)); } /* Public */ @@ -234,14 +260,14 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) * connected to, not the one we will create, which is very bad... */ if (ropt->createDB && ropt->dropSchema) - die_horribly(AH, modulename, "-C and -c are incompatible options\n"); + exit_horribly(modulename, "-C and -c are incompatible options\n"); /* * -C is not compatible with -1, because we can't create a database inside * a transaction block. */ if (ropt->createDB && ropt->single_txn) - die_horribly(AH, modulename, "-C and -1 are incompatible options\n"); + exit_horribly(modulename, "-C and -1 are incompatible options\n"); /* * If we're going to do parallel restore, there are some restrictions. @@ -251,11 +277,11 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) { /* We haven't got round to making this work for all archive formats */ if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL) - die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n"); + exit_horribly(modulename, "parallel restore is not supported with this archive file format\n"); /* Doesn't work if the archive represents dependencies as OIDs */ if (AH->version < K_VERS_1_8) - die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n"); + exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n"); /* * It's also not gonna work if we can't reopen the input file, so @@ -274,7 +300,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) { reqs = _tocEntryRequired(te, ropt, false); if (te->hadDumper && (reqs & REQ_DATA) != 0) - die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n"); + exit_horribly(modulename, "cannot restore from compressed archive (compression not supported in this installation)\n"); } } #endif @@ -286,7 +312,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt) { ahlog(AH, 1, "connecting to database for restore\n"); if (AH->version < K_VERS_1_3) - die_horribly(AH, modulename, "direct database connections are not supported in pre-1.3 archives\n"); + exit_horribly(modulename, "direct database connections are not supported in pre-1.3 archives\n"); /* XXX Should get this from the archive */ AHX->minRemoteVersion = 070100; @@ -734,7 +760,7 @@ WriteData(Archive *AHX, const void *data, size_t dLen) ArchiveHandle *AH = (ArchiveHandle *) AHX; if (!AH->currToc) - die_horribly(AH, modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n"); + exit_horribly(modulename, "internal error -- WriteData cannot be called outside the context of a DataDumper routine\n"); return (*AH->WriteDataPtr) (AH, data, dLen); } @@ -886,7 +912,7 @@ StartBlob(Archive *AHX, Oid oid) ArchiveHandle *AH = (ArchiveHandle *) AHX; if (!AH->StartBlobPtr) - die_horribly(AH, modulename, "large-object output not supported in chosen format\n"); + exit_horribly(modulename, "large-object output not supported in chosen format\n"); (*AH->StartBlobPtr) (AH, AH->currToc, oid); @@ -973,13 +999,13 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop) { loOid = lo_create(AH->connection, oid); if (loOid == 0 || loOid != oid) - die_horribly(AH, modulename, "could not create large object %u: %s", - oid, PQerrorMessage(AH->connection)); + exit_horribly(modulename, "could not create large object %u: %s", + oid, PQerrorMessage(AH->connection)); } AH->loFd = lo_open(AH->connection, oid, INV_WRITE); if (AH->loFd == -1) - die_horribly(AH, modulename, "could not open large object %u: %s", - oid, PQerrorMessage(AH->connection)); + exit_horribly(modulename, "could not open large object %u: %s", + oid, PQerrorMessage(AH->connection)); } else { @@ -1035,8 +1061,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt) /* Setup the file */ fh = fopen(ropt->tocFile, PG_BINARY_R); if (!fh) - die_horribly(AH, modulename, "could not open TOC file \"%s\": %s\n", - ropt->tocFile, strerror(errno)); + exit_horribly(modulename, "could not open TOC file \"%s\": %s\n", + ropt->tocFile, strerror(errno)); incomplete_line = false; while (fgets(buf, sizeof(buf), fh) != NULL) @@ -1083,8 +1109,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt) /* Find TOC entry */ te = getTocEntryByDumpId(AH, id); if (!te) - die_horribly(AH, modulename, "could not find entry for ID %d\n", - id); + exit_horribly(modulename, "could not find entry for ID %d\n", + id); /* Mark it wanted */ ropt->idWanted[id - 1] = true; @@ -1104,8 +1130,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt) } if (fclose(fh) != 0) - die_horribly(AH, modulename, "could not close TOC file: %s\n", - strerror(errno)); + exit_horribly(modulename, "could not close TOC file: %s\n", + strerror(errno)); } /* @@ -1221,11 +1247,11 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression) if (!AH->OF) { if (filename) - die_horribly(AH, modulename, "could not open output file \"%s\": %s\n", - filename, strerror(errno)); + exit_horribly(modulename, "could not open output file \"%s\": %s\n", + filename, strerror(errno)); else - die_horribly(AH, modulename, "could not open output file: %s\n", - strerror(errno)); + exit_horribly(modulename, "could not open output file: %s\n", + strerror(errno)); } } @@ -1251,7 +1277,7 @@ RestoreOutput(ArchiveHandle *AH, OutputContext savedContext) res = fclose(AH->OF); if (res != 0) - die_horribly(AH, modulename, "could not close output file: %s\n", + exit_horribly(modulename, "could not close output file: %s\n", strerror(errno)); AH->gzOut = savedContext.gzOut; @@ -1329,7 +1355,7 @@ dump_lo_buf(ArchiveHandle *AH) AH->lo_buf_used), (unsigned long) AH->lo_buf_used, (unsigned long) res); if (res != AH->lo_buf_used) - die_horribly(AH, modulename, + exit_horribly(modulename, "could not write to large object (result: %lu, expected: %lu)\n", (unsigned long) res, (unsigned long) AH->lo_buf_used); } @@ -1388,7 +1414,7 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) { res = GZWRITE(ptr, size, nmemb, AH->OF); if (res != (nmemb * size)) - die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno)); + exit_horribly(modulename, "could not write to output file: %s\n", strerror(errno)); return res; } else if (AH->CustomOutPtr) @@ -1396,7 +1422,7 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) res = AH->CustomOutPtr (AH, ptr, size * nmemb); if (res != (nmemb * size)) - die_horribly(AH, modulename, "could not write to custom output routine\n"); + exit_horribly(modulename, "could not write to custom output routine\n"); return res; } else @@ -1411,56 +1437,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH) { res = fwrite(ptr, size, nmemb, AH->OF); if (res != nmemb) - die_horribly(AH, modulename, "could not write to output file: %s\n", + exit_horribly(modulename, "could not write to output file: %s\n", strerror(errno)); return res; } } } - -/* Report a fatal error and exit(1) */ -static void -vdie_horribly(ArchiveHandle *AH, const char *modulename, - const char *fmt, va_list ap) -{ - vwrite_msg(modulename, fmt, ap); - - if (AH) - { - if (AH->public.verbose) - write_msg(NULL, "*** aborted because of error\n"); - DisconnectDatabase(&AH->public); - } - - exit_nicely(1); -} - -/* As above, but with variable arg list */ -void -die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) -{ - va_list ap; - - va_start(ap, fmt); - vdie_horribly(AH, modulename, fmt, ap); - va_end(ap); -} - -/* As above, but with a complaint about a particular query. */ -void -die_on_query_failure(ArchiveHandle *AH, const char *modulename, - const char *query) -{ - write_msg(modulename, "query failed: %s", - PQerrorMessage(AH->connection)); - die_horribly(AH, modulename, "query was: %s\n", query); -} - /* on some error, we may decide to go on... */ void -warn_or_die_horribly(ArchiveHandle *AH, - const char *modulename, const char *fmt,...) +warn_or_exit_horribly(ArchiveHandle *AH, + const char *modulename, const char *fmt,...) { va_list ap; @@ -1497,14 +1484,13 @@ warn_or_die_horribly(ArchiveHandle *AH, AH->lastErrorTE = AH->currentTE; va_start(ap, fmt); + vwrite_msg(modulename, fmt, ap); + va_end(ap); + if (AH->public.exit_on_error) - vdie_horribly(AH, modulename, fmt, ap); + exit_nicely(1); else - { - vwrite_msg(modulename, fmt, ap); AH->public.n_errors++; - } - va_end(ap); } #ifdef NOT_USED @@ -1623,7 +1609,7 @@ ReadOffset(ArchiveHandle *AH, pgoff_t * o) break; default: - die_horribly(AH, modulename, "unexpected data offset flag %d\n", offsetFlg); + exit_horribly(modulename, "unexpected data offset flag %d\n", offsetFlg); } /* @@ -1636,7 +1622,7 @@ ReadOffset(ArchiveHandle *AH, pgoff_t * o) else { if ((*AH->ReadBytePtr) (AH) != 0) - die_horribly(AH, modulename, "file offset in dump file is too large\n"); + exit_horribly(modulename, "file offset in dump file is too large\n"); } } @@ -1730,7 +1716,7 @@ ReadStr(ArchiveHandle *AH) { buf = (char *) pg_malloc(l + 1); if ((*AH->ReadBufPtr) (AH, (void *) buf, l) != l) - die_horribly(AH, modulename, "unexpected end of file\n"); + exit_horribly(modulename, "unexpected end of file\n"); buf[l] = '\0'; } @@ -1773,8 +1759,8 @@ _discoverArchiveFormat(ArchiveHandle *AH) char buf[MAXPGPATH]; if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH) - die_horribly(AH, modulename, "directory name too long: \"%s\"\n", - AH->fSpec); + exit_horribly(modulename, "directory name too long: \"%s\"\n", + AH->fSpec); if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) { AH->format = archDirectory; @@ -1783,32 +1769,32 @@ _discoverArchiveFormat(ArchiveHandle *AH) #ifdef HAVE_LIBZ if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH) - die_horribly(AH, modulename, "directory name too long: \"%s\"\n", - AH->fSpec); + exit_horribly(modulename, "directory name too long: \"%s\"\n", + AH->fSpec); if (stat(buf, &st) == 0 && S_ISREG(st.st_mode)) { AH->format = archDirectory; return AH->format; } #endif - die_horribly(AH, modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n", - AH->fSpec); + exit_horribly(modulename, "directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)\n", + AH->fSpec); fh = NULL; /* keep compiler quiet */ } else { fh = fopen(AH->fSpec, PG_BINARY_R); if (!fh) - die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", - AH->fSpec, strerror(errno)); + exit_horribly(modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); } } else { fh = stdin; if (!fh) - die_horribly(AH, modulename, "could not open input file: %s\n", - strerror(errno)); + exit_horribly(modulename, "could not open input file: %s\n", + strerror(errno)); } cnt = fread(sig, 1, 5, fh); @@ -1816,10 +1802,10 @@ _discoverArchiveFormat(ArchiveHandle *AH) if (cnt != 5) { if (ferror(fh)) - die_horribly(AH, modulename, "could not read input file: %s\n", strerror(errno)); + exit_horribly(modulename, "could not read input file: %s\n", strerror(errno)); else - die_horribly(AH, modulename, "input file is too short (read %lu, expected 5)\n", - (unsigned long) cnt); + exit_horribly(modulename, "input file is too short (read %lu, expected 5)\n", + (unsigned long) cnt); } /* Save it, just in case we need it later */ @@ -1880,14 +1866,14 @@ _discoverArchiveFormat(ArchiveHandle *AH) strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0)) { /* looks like it's probably a text format dump. so suggest they try psql */ - die_horribly(AH, modulename, "input file appears to be a text format dump. Please use psql.\n"); + exit_horribly(modulename, "input file appears to be a text format dump. Please use psql.\n"); } if (AH->lookaheadLen != 512) - die_horribly(AH, modulename, "input file does not appear to be a valid archive (too short?)\n"); + exit_horribly(modulename, "input file does not appear to be a valid archive (too short?)\n"); if (!isValidTarHeader(AH->lookahead)) - die_horribly(AH, modulename, "input file does not appear to be a valid archive\n"); + exit_horribly(modulename, "input file does not appear to be a valid archive\n"); AH->format = archTar; } @@ -1907,8 +1893,8 @@ _discoverArchiveFormat(ArchiveHandle *AH) /* Close the file */ if (wantClose) if (fclose(fh) != 0) - die_horribly(AH, modulename, "could not close input file: %s\n", - strerror(errno)); + exit_horribly(modulename, "could not close input file: %s\n", + strerror(errno)); return AH->format; } @@ -2027,7 +2013,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, break; default: - die_horribly(AH, modulename, "unrecognized file format \"%d\"\n", fmt); + exit_horribly(modulename, "unrecognized file format \"%d\"\n", fmt); } return AH; @@ -2149,9 +2135,9 @@ ReadToc(ArchiveHandle *AH) /* Sanity check */ if (te->dumpId <= 0) - die_horribly(AH, modulename, - "entry ID %d out of range -- perhaps a corrupt TOC\n", - te->dumpId); + exit_horribly(modulename, + "entry ID %d out of range -- perhaps a corrupt TOC\n", + te->dumpId); te->hadDumper = ReadInt(AH); @@ -2306,13 +2292,13 @@ processEncodingEntry(ArchiveHandle *AH, TocEntry *te) *ptr2 = '\0'; encoding = pg_char_to_encoding(ptr1); if (encoding < 0) - die_horribly(AH, modulename, "unrecognized encoding \"%s\"\n", - ptr1); + exit_horribly(modulename, "unrecognized encoding \"%s\"\n", + ptr1); AH->public.encoding = encoding; } else - die_horribly(AH, modulename, "invalid ENCODING item: %s\n", - te->defn); + exit_horribly(modulename, "invalid ENCODING item: %s\n", + te->defn); free(defn); } @@ -2329,8 +2315,8 @@ processStdStringsEntry(ArchiveHandle *AH, TocEntry *te) else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0) AH->public.std_strings = false; else - die_horribly(AH, modulename, "invalid STDSTRINGS item: %s\n", - te->defn); + exit_horribly(modulename, "invalid STDSTRINGS item: %s\n", + te->defn); } static teReqs @@ -2537,9 +2523,9 @@ _doSetSessionAuth(ArchiveHandle *AH, const char *user) res = PQexec(AH->connection, cmd->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) - /* NOT warn_or_die_horribly... use -O instead to skip this. */ - die_horribly(AH, modulename, "could not set session user to \"%s\": %s", - user, PQerrorMessage(AH->connection)); + /* NOT warn_or_exit_horribly... use -O instead to skip this. */ + exit_horribly(modulename, "could not set session user to \"%s\": %s", + user, PQerrorMessage(AH->connection)); PQclear(res); } @@ -2569,9 +2555,9 @@ _doSetWithOids(ArchiveHandle *AH, const bool withOids) res = PQexec(AH->connection, cmd->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) - warn_or_die_horribly(AH, modulename, - "could not set default_with_oids: %s", - PQerrorMessage(AH->connection)); + warn_or_exit_horribly(AH, modulename, + "could not set default_with_oids: %s", + PQerrorMessage(AH->connection)); PQclear(res); } @@ -2707,9 +2693,9 @@ _selectOutputSchema(ArchiveHandle *AH, const char *schemaName) res = PQexec(AH->connection, qry->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) - warn_or_die_horribly(AH, modulename, - "could not set search_path to \"%s\": %s", - schemaName, PQerrorMessage(AH->connection)); + warn_or_exit_horribly(AH, modulename, + "could not set search_path to \"%s\": %s", + schemaName, PQerrorMessage(AH->connection)); PQclear(res); } @@ -2768,9 +2754,9 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace) res = PQexec(AH->connection, qry->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) - warn_or_die_horribly(AH, modulename, - "could not set default_tablespace to %s: %s", - fmtId(want), PQerrorMessage(AH->connection)); + warn_or_exit_horribly(AH, modulename, + "could not set default_tablespace to %s: %s", + fmtId(want), PQerrorMessage(AH->connection)); PQclear(res); } @@ -3150,10 +3136,10 @@ ReadHead(ArchiveHandle *AH) if (!AH->readHeader) { if ((*AH->ReadBufPtr) (AH, tmpMag, 5) != 5) - die_horribly(AH, modulename, "unexpected end of file\n"); + exit_horribly(modulename, "unexpected end of file\n"); if (strncmp(tmpMag, "PGDMP", 5) != 0) - die_horribly(AH, modulename, "did not find magic string in file header\n"); + exit_horribly(modulename, "did not find magic string in file header\n"); AH->vmaj = (*AH->ReadBytePtr) (AH); AH->vmin = (*AH->ReadBytePtr) (AH); @@ -3166,13 +3152,13 @@ ReadHead(ArchiveHandle *AH) AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0; if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX) - die_horribly(AH, modulename, "unsupported version (%d.%d) in file header\n", - AH->vmaj, AH->vmin); + exit_horribly(modulename, "unsupported version (%d.%d) in file header\n", + AH->vmaj, AH->vmin); AH->intSize = (*AH->ReadBytePtr) (AH); if (AH->intSize > 32) - die_horribly(AH, modulename, "sanity check on integer size (%lu) failed\n", - (unsigned long) AH->intSize); + exit_horribly(modulename, "sanity check on integer size (%lu) failed\n", + (unsigned long) AH->intSize); if (AH->intSize > sizeof(int)) write_msg(modulename, "WARNING: archive was made on a machine with larger integers, some operations might fail\n"); @@ -3185,8 +3171,8 @@ ReadHead(ArchiveHandle *AH) fmt = (*AH->ReadBytePtr) (AH); if (AH->format != fmt) - die_horribly(AH, modulename, "expected format (%d) differs from format found in file (%d)\n", - AH->format, fmt); + exit_horribly(modulename, "expected format (%d) differs from format found in file (%d)\n", + AH->format, fmt); } if (AH->version >= K_VERS_1_2) @@ -3290,6 +3276,66 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim) ahprintf(AH, "-- %s %s\n\n", msg, buf); } +static void +setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH) +{ +#ifdef WIN32 + pse->threadId = GetCurrentThreadId(); +#else + pse->pid = getpid(); +#endif + pse->AH = AH; +} + +static void +unsetProcessIdentifier(ParallelStateEntry *pse) +{ +#ifdef WIN32 + pse->threadId = 0; +#else + pse->pid = 0; +#endif + pse->AH = NULL; +} + +static ParallelStateEntry * +GetMyPSEntry(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->numWorkers; i++) +#ifdef WIN32 + if (pstate->pse[i].threadId == GetCurrentThreadId()) +#else + if (pstate->pse[i].pid == getpid()) +#endif + return &(pstate->pse[i]); + + return NULL; +} + +static void +archive_close_connection(int code, void *arg) +{ + ShutdownInformation *si = (ShutdownInformation *) arg; + + if (si->pstate) + { + ParallelStateEntry *entry = GetMyPSEntry(si->pstate); + + if (entry != NULL && entry->AH) + DisconnectDatabase(&(entry->AH->public)); + } + else if (si->AHX) + DisconnectDatabase(si->AHX); +} + +void +on_exit_close_archive(Archive *AHX) +{ + shutdown_info.AHX = AHX; + on_exit_nicely(archive_close_connection, &shutdown_info); +} /* * Main engine for parallel restore. @@ -3316,10 +3362,17 @@ restore_toc_entries_parallel(ArchiveHandle *AH) TocEntry *next_work_item; thandle ret_child; TocEntry *te; + ParallelState *pstate; + int i; ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); - slots = (ParallelSlot *) pg_calloc(sizeof(ParallelSlot), n_slots); + slots = (ParallelSlot *) pg_calloc(n_slots, sizeof(ParallelSlot)); + pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); + pstate->pse = (ParallelStateEntry *) pg_calloc(n_slots, sizeof(ParallelStateEntry)); + pstate->numWorkers = ropt->number_of_jobs; + for (i = 0; i < pstate->numWorkers; i++) + unsetProcessIdentifier(&(pstate->pse[i])); /* Adjust dependency information */ fix_dependencies(AH); @@ -3375,6 +3428,12 @@ restore_toc_entries_parallel(ArchiveHandle *AH) */ DisconnectDatabase(&AH->public); + /* + * Set the pstate in the shutdown_info. The exit handler uses pstate if set + * and falls back to AHX otherwise. + */ + shutdown_info.pstate = pstate; + /* blow away any transient state from the old connection */ if (AH->currUser) free(AH->currUser); @@ -3473,6 +3532,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) args = pg_malloc(sizeof(RestoreArgs)); args->AH = CloneArchive(AH); args->te = next_work_item; + args->pse = &pstate->pse[next_slot]; /* run the step in a worker child */ child = spawn_restore(args); @@ -3500,14 +3560,20 @@ restore_toc_entries_parallel(ArchiveHandle *AH) } else { - die_horribly(AH, modulename, "worker process crashed: status %d\n", - work_status); + exit_horribly(modulename, "worker process crashed: status %d\n", + work_status); } } ahlog(AH, 1, "finished main parallel loop\n"); /* + * Remove the pstate again, so the exit handler will now fall back to + * closing AH->connection again. + */ + shutdown_info.pstate = NULL; + + /* * Now reconnect the single parent connection. */ ConnectDatabase((Archive *) AH, ropt->dbname, @@ -3548,23 +3614,23 @@ spawn_restore(RestoreArgs *args) { /* in child process */ parallel_restore(args); - die_horribly(args->AH, modulename, - "parallel_restore should not return\n"); + exit_horribly(modulename, + "parallel_restore should not return\n"); } else if (child < 0) { /* fork failed */ - die_horribly(args->AH, modulename, - "could not create worker process: %s\n", - strerror(errno)); + exit_horribly(modulename, + "could not create worker process: %s\n", + strerror(errno)); } #else child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore, args, 0, NULL); if (child == 0) - die_horribly(args->AH, modulename, - "could not create worker thread: %s\n", - strerror(errno)); + exit_horribly(modulename, + "could not create worker thread: %s\n", + strerror(errno)); #endif return child; @@ -3806,6 +3872,8 @@ parallel_restore(RestoreArgs *args) RestoreOptions *ropt = AH->ropt; int retval; + setProcessIdentifier(args->pse, AH); + /* * Close and reopen the input file so we have a private file pointer that * doesn't stomp on anyone else's file pointer, if we're actually going to @@ -3836,6 +3904,7 @@ parallel_restore(RestoreArgs *args) /* And clean up */ DisconnectDatabase((Archive *) AH); + unsetProcessIdentifier(args->pse); /* If we reopened the file, we are done with it, so close it now */ if (te->section == SECTION_DATA) @@ -3881,7 +3950,7 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, } if (te == NULL) - die_horribly(AH, modulename, "could not find slot of finished worker\n"); + exit_horribly(modulename, "could not find slot of finished worker\n"); ahlog(AH, 1, "finished item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -3896,8 +3965,8 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, else if (status == WORKER_IGNORED_ERRORS) AH->public.n_errors++; else if (status != 0) - die_horribly(AH, modulename, "worker process failed: exit code %d\n", - status); + exit_horribly(modulename, "worker process failed: exit code %d\n", + status); reduce_dependencies(AH, te, ready_list); } |