summaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_backup_archiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c735
1 files changed, 274 insertions, 461 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 19d12788d9d..3c2671bb2d5 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -22,8 +22,10 @@
#include "pg_backup_db.h"
#include "dumputils.h"
+#include "parallel.h"
#include <ctype.h>
+#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
@@ -35,72 +37,6 @@
#include "libpq/libpq-fs.h"
-/*
- * Special exit values from worker children. We reserve 0 for normal
- * success; 1 and other small values should be interpreted as crashes.
- */
-#define WORKER_CREATE_DONE 10
-#define WORKER_INHIBIT_DATA 11
-#define WORKER_IGNORED_ERRORS 12
-
-/*
- * Unix uses exit to return result from worker child, so function is void.
- * Windows thread result comes via function return.
- */
-#ifndef WIN32
-#define parallel_restore_result void
-#else
-#define parallel_restore_result DWORD
-#endif
-
-/* IDs for worker children are either PIDs or thread handles */
-#ifndef WIN32
-#define thandle pid_t
-#else
-#define thandle HANDLE
-#endif
-
-typedef struct ParallelStateEntry
-{
-#ifdef WIN32
- unsigned int threadId;
-#else
- pid_t pid;
-#endif
- ArchiveHandle *AH;
-} ParallelStateEntry;
-
-typedef struct ParallelState
-{
- int numWorkers;
- ParallelStateEntry *pse;
-} ParallelState;
-
-/* Arguments needed for a worker child */
-typedef struct _restore_args
-{
- ArchiveHandle *AH;
- TocEntry *te;
- ParallelStateEntry *pse;
-} RestoreArgs;
-
-/* State for each parallel activity slot */
-typedef struct _parallel_slot
-{
- thandle child_id;
- RestoreArgs *args;
-} ParallelSlot;
-
-typedef struct ShutdownInformation
-{
- ParallelState *pstate;
- Archive *AHX;
-} ShutdownInformation;
-
-static ShutdownInformation shutdown_info;
-
-#define NO_SLOT (-1)
-
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
@@ -116,7 +52,7 @@ static const char *modulename = gettext_noop("archiver");
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode);
+ const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr);
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
ArchiveHandle *AH);
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass);
@@ -136,7 +72,6 @@ static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void buildTocEntryArrays(ArchiveHandle *AH);
-static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
static int _discoverArchiveFormat(ArchiveHandle *AH);
@@ -149,21 +84,19 @@ static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel);
-static void restore_toc_entries_parallel(ArchiveHandle *AH);
-static thandle spawn_restore(RestoreArgs *args);
-static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
-static bool work_in_progress(ParallelSlot *slots, int n_slots);
-static int get_next_slot(ParallelSlot *slots, int n_slots);
+static void restore_toc_entries_prefork(ArchiveHandle *AH);
+static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ TocEntry *pending_list);
+static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry *ready_list,
- ParallelSlot *slots, int n_slots);
-static parallel_restore_result parallel_restore(RestoreArgs *args);
+ ParallelState *pstate);
static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
- thandle worker, int status,
- ParallelSlot *slots, int n_slots);
+ int worker, int status,
+ ParallelState *pstate);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH);
@@ -172,14 +105,6 @@ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
-static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
-static void DeCloneArchive(ArchiveHandle *AH);
-
-static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH);
-static void unsetProcessIdentifier(ParallelStateEntry *pse);
-static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate);
-static void archive_close_connection(int code, void *arg);
-
/*
* Wrapper functions.
@@ -189,15 +114,28 @@ static void archive_close_connection(int code, void *arg);
*
*/
+/*
+ * The dump worker setup needs lots of knowledge of the internals of pg_dump,
+ * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
+ * setup doesn't need to know anything much, so it's defined here.
+ */
+static void
+setupRestoreWorker(Archive *AHX, RestoreOptions *ropt)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ (AH->ReopenPtr) (AH);
+}
+
/* Create a new archive */
/* Public */
Archive *
CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode)
+ const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker)
{
- ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode);
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker);
return (Archive *) AH;
}
@@ -207,7 +145,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
Archive *
OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
{
- ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead);
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker);
return (Archive *) AH;
}
@@ -311,7 +249,7 @@ RestoreArchive(Archive *AHX)
/*
* If we're going to do parallel restore, there are some restrictions.
*/
- parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB);
+ parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
if (parallel_mode)
{
/* We haven't got round to making this work for all archive formats */
@@ -499,7 +437,25 @@ RestoreArchive(Archive *AHX)
* In parallel mode, turn control over to the parallel-restore logic.
*/
if (parallel_mode)
- restore_toc_entries_parallel(AH);
+ {
+ ParallelState *pstate;
+ TocEntry pending_list;
+
+ par_list_header_init(&pending_list);
+
+ /* This runs PRE_DATA items and then disconnects from the database */
+ restore_toc_entries_prefork(AH);
+ Assert(AH->connection == NULL);
+
+ /* ParallelBackupStart() will actually fork the processes */
+ pstate = ParallelBackupStart(AH, ropt);
+ restore_toc_entries_parallel(AH, pstate, &pending_list);
+ ParallelBackupEnd(AH, pstate);
+
+ /* reconnect the master and see if we missed something */
+ restore_toc_entries_postfork(AH, &pending_list);
+ Assert(AH->connection != NULL);
+ }
else
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
@@ -558,7 +514,7 @@ static int
restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel)
{
- int retval = 0;
+ int status = WORKER_OK;
teReqs reqs;
bool defnDumped;
@@ -611,7 +567,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
if (ropt->noDataForFailedTables)
{
if (is_parallel)
- retval = WORKER_INHIBIT_DATA;
+ status = WORKER_INHIBIT_DATA;
else
inhibit_data_for_failed_table(AH, te);
}
@@ -626,7 +582,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
* just set the return value.
*/
if (is_parallel)
- retval = WORKER_CREATE_DONE;
+ status = WORKER_CREATE_DONE;
else
mark_create_done(AH, te);
}
@@ -744,7 +700,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
}
}
- return retval;
+ if (AH->public.n_errors > 0 && status == WORKER_OK)
+ status = WORKER_IGNORED_ERRORS;
+
+ return status;
}
/*
@@ -1634,7 +1593,7 @@ buildTocEntryArrays(ArchiveHandle *AH)
}
}
-static TocEntry *
+TocEntry *
getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
{
/* build index arrays if we didn't already */
@@ -2018,7 +1977,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
*/
static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, ArchiveMode mode)
+ const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr)
{
ArchiveHandle *AH;
@@ -2100,6 +2059,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
}
#endif
+ AH->SetupWorkerPtr = setupWorkerPtr;
+
if (fmt == archUnknown)
AH->format = _discoverArchiveFormat(AH);
else
@@ -2132,50 +2093,66 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
return AH;
}
-
void
-WriteDataChunks(ArchiveHandle *AH)
+WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
{
TocEntry *te;
- StartDataPtr startPtr;
- EndDataPtr endPtr;
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
- if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0)
- {
- AH->currToc = te;
- /* printf("Writing data for %d (%x)\n", te->id, te); */
-
- if (strcmp(te->desc, "BLOBS") == 0)
- {
- startPtr = AH->StartBlobsPtr;
- endPtr = AH->EndBlobsPtr;
- }
- else
- {
- startPtr = AH->StartDataPtr;
- endPtr = AH->EndDataPtr;
- }
+ if (!te->dataDumper)
+ continue;
- if (startPtr != NULL)
- (*startPtr) (AH, te);
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+ if (pstate && pstate->numWorkers > 1)
+ {
/*
- * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
+ * If we are in a parallel backup, then we are always the master
+ * process.
*/
+ EnsureIdleWorker(AH, pstate);
+ Assert(GetIdleWorker(pstate) != NO_SLOT);
+ DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+ }
+ else
+ WriteDataChunksForTocEntry(AH, te);
+ }
+ EnsureWorkersFinished(AH, pstate);
+}
- /*
- * The user-provided DataDumper routine needs to call
- * AH->WriteData
- */
- (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+void
+WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ StartDataPtr startPtr;
+ EndDataPtr endPtr;
- if (endPtr != NULL)
- (*endPtr) (AH, te);
- AH->currToc = NULL;
- }
+ AH->currToc = te;
+
+ if (strcmp(te->desc, "BLOBS") == 0)
+ {
+ startPtr = AH->StartBlobsPtr;
+ endPtr = AH->EndBlobsPtr;
}
+ else
+ {
+ startPtr = AH->StartDataPtr;
+ endPtr = AH->EndDataPtr;
+ }
+
+ if (startPtr != NULL)
+ (*startPtr) (AH, te);
+
+ /*
+ * The user-provided DataDumper routine needs to call AH->WriteData
+ */
+ (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+
+ if (endPtr != NULL)
+ (*endPtr) (AH, te);
+
+ AH->currToc = NULL;
}
void
@@ -2911,7 +2888,7 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
const char *type = te->desc;
/* Use ALTER TABLE for views and sequences */
- if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0||
+ if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
strcmp(type, "MATERIALIZED VIEW") == 0)
type = "TABLE";
@@ -3404,67 +3381,6 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
ahprintf(AH, "-- %s %s\n\n", msg, buf);
}
-static void
-setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH)
-{
-#ifdef WIN32
- pse->threadId = GetCurrentThreadId();
-#else
- pse->pid = getpid();
-#endif
- pse->AH = AH;
-}
-
-static void
-unsetProcessIdentifier(ParallelStateEntry *pse)
-{
-#ifdef WIN32
- pse->threadId = 0;
-#else
- pse->pid = 0;
-#endif
- pse->AH = NULL;
-}
-
-static ParallelStateEntry *
-GetMyPSEntry(ParallelState *pstate)
-{
- int i;
-
- for (i = 0; i < pstate->numWorkers; i++)
-#ifdef WIN32
- if (pstate->pse[i].threadId == GetCurrentThreadId())
-#else
- if (pstate->pse[i].pid == getpid())
-#endif
- return &(pstate->pse[i]);
-
- return NULL;
-}
-
-static void
-archive_close_connection(int code, void *arg)
-{
- ShutdownInformation *si = (ShutdownInformation *) arg;
-
- if (si->pstate)
- {
- ParallelStateEntry *entry = GetMyPSEntry(si->pstate);
-
- if (entry != NULL && entry->AH)
- DisconnectDatabase(&(entry->AH->public));
- }
- else if (si->AHX)
- DisconnectDatabase(si->AHX);
-}
-
-void
-on_exit_close_archive(Archive *AHX)
-{
- shutdown_info.AHX = AHX;
- on_exit_nicely(archive_close_connection, &shutdown_info);
-}
-
/*
* Main engine for parallel restore.
*
@@ -3477,30 +3393,13 @@ on_exit_close_archive(Archive *AHX)
* RestoreArchive).
*/
static void
-restore_toc_entries_parallel(ArchiveHandle *AH)
+restore_toc_entries_prefork(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->ropt;
- int n_slots = ropt->number_of_jobs;
- ParallelSlot *slots;
- int work_status;
- int next_slot;
bool skipped_some;
- TocEntry pending_list;
- TocEntry ready_list;
TocEntry *next_work_item;
- thandle ret_child;
- TocEntry *te;
- ParallelState *pstate;
- int i;
-
- ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
- slots = (ParallelSlot *) pg_malloc0(n_slots * sizeof(ParallelSlot));
- pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
- pstate->pse = (ParallelStateEntry *) pg_malloc0(n_slots * sizeof(ParallelStateEntry));
- pstate->numWorkers = ropt->number_of_jobs;
- for (i = 0; i < pstate->numWorkers; i++)
- unsetProcessIdentifier(&(pstate->pse[i]));
+ ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
/* Adjust dependency information */
fix_dependencies(AH);
@@ -3509,7 +3408,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
* Do all the early stuff in a single connection in the parent. There's no
* great point in running it in parallel, in fact it will actually run
* faster in a single connection because we avoid all the connection and
- * setup overhead. Also, pre-9.2 pg_dump versions were not very good
+ * setup overhead. Also, pre-9.2 pg_dump versions were not very good
* about showing all the dependencies of SECTION_PRE_DATA items, so we do
* not risk trying to process them out-of-order.
*
@@ -3561,12 +3460,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
*/
DisconnectDatabase(&AH->public);
- /*
- * Set the pstate in the shutdown_info. The exit handler uses pstate if
- * set and falls back to AHX otherwise.
- */
- shutdown_info.pstate = pstate;
-
/* blow away any transient state from the old connection */
if (AH->currUser)
free(AH->currUser);
@@ -3578,17 +3471,42 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
free(AH->currTablespace);
AH->currTablespace = NULL;
AH->currWithOids = -1;
+}
+
+/*
+ * Main engine for parallel restore.
+ *
+ * Work is done in three phases.
+ * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+ * just as for a standard restore. This is done in restore_toc_entries_prefork().
+ * Second we process the remaining non-ACL steps in parallel worker children
+ * (threads on Windows, processes on Unix), these fork off and set up their
+ * connections before we call restore_toc_entries_parallel_forked.
+ * Finally we process all the ACL entries in a single connection (that happens
+ * back in RestoreArchive).
+ */
+static void
+restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ TocEntry *pending_list)
+{
+ int work_status;
+ bool skipped_some;
+ TocEntry ready_list;
+ TocEntry *next_work_item;
+ int ret_child;
+
+ ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
/*
- * Initialize the lists of pending and ready items. After this setup, the
- * pending list is everything that needs to be done but is blocked by one
- * or more dependencies, while the ready list contains items that have no
- * remaining dependencies. Note: we don't yet filter out entries that
- * aren't going to be restored. They might participate in dependency
+ * Initialize the lists of ready items, the list for pending items has
+ * already been initialized in the caller. After this setup, the pending
+ * list is everything that needs to be done but is blocked by one or more
+ * dependencies, while the ready list contains items that have no
+ * remaining dependencies. Note: we don't yet filter out entries that
+ * aren't going to be restored. They might participate in dependency
* chains connecting entries that should be restored, so we treat them as
* live until we actually process them.
*/
- par_list_header_init(&pending_list);
par_list_header_init(&ready_list);
skipped_some = false;
for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
@@ -3613,7 +3531,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
}
if (next_work_item->depCount > 0)
- par_list_append(&pending_list, next_work_item);
+ par_list_append(pending_list, next_work_item);
else
par_list_append(&ready_list, next_work_item);
}
@@ -3627,9 +3545,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
ahlog(AH, 1, "entering main parallel loop\n");
- while ((next_work_item = get_next_work_item(AH, &ready_list,
- slots, n_slots)) != NULL ||
- work_in_progress(slots, n_slots))
+ while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
+ !IsEveryWorkerIdle(pstate))
{
if (next_work_item != NULL)
{
@@ -3647,62 +3564,72 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
continue;
}
- if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
- {
- /* There is work still to do and a worker slot available */
- thandle child;
- RestoreArgs *args;
+ ahlog(AH, 1, "launching item %d %s %s\n",
+ next_work_item->dumpId,
+ next_work_item->desc, next_work_item->tag);
- ahlog(AH, 1, "launching item %d %s %s\n",
- next_work_item->dumpId,
- next_work_item->desc, next_work_item->tag);
+ par_list_remove(next_work_item);
- par_list_remove(next_work_item);
-
- /* this memory is dealloced in mark_work_done() */
- args = pg_malloc(sizeof(RestoreArgs));
- args->AH = CloneArchive(AH);
- args->te = next_work_item;
- args->pse = &pstate->pse[next_slot];
+ Assert(GetIdleWorker(pstate) != NO_SLOT);
+ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
+ }
+ else
+ /* at least one child is working and we have nothing ready. */
+ Assert(!IsEveryWorkerIdle(pstate));
- /* run the step in a worker child */
- child = spawn_restore(args);
+ for (;;)
+ {
+ int nTerm = 0;
- slots[next_slot].child_id = child;
- slots[next_slot].args = args;
+ /*
+ * In order to reduce dependencies as soon as possible and
+ * especially to reap the status of workers who are working on
+ * items that pending items depend on, we do a non-blocking check
+ * for ended workers first.
+ *
+ * However, if we do not have any other work items currently that
+ * workers can work on, we do not busy-loop here but instead
+ * really wait for at least one worker to terminate. Hence we call
+ * ListenToWorkers(..., ..., do_wait = true) in this case.
+ */
+ ListenToWorkers(AH, pstate, !next_work_item);
- continue;
+ while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+ {
+ nTerm++;
+ mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
}
- }
- /*
- * If we get here there must be work being done. Either there is no
- * work available to schedule (and work_in_progress returned true) or
- * there are no slots available. So we wait for a worker to finish,
- * and process the result.
- */
- ret_child = reap_child(slots, n_slots, &work_status);
+ /*
+ * We need to make sure that we have an idle worker before
+ * re-running the loop. If nTerm > 0 we already have that (quick
+ * check).
+ */
+ if (nTerm > 0)
+ break;
- if (WIFEXITED(work_status))
- {
- mark_work_done(AH, &ready_list,
- ret_child, WEXITSTATUS(work_status),
- slots, n_slots);
- }
- else
- {
- exit_horribly(modulename, "worker process crashed: status %d\n",
- work_status);
+ /* if nobody terminated, explicitly check for an idle worker */
+ if (GetIdleWorker(pstate) != NO_SLOT)
+ break;
+
+ /*
+ * If we have no idle worker, read the result of one or more
+ * workers and loop the loop to call ReapWorkerStatus() on them.
+ */
+ ListenToWorkers(AH, pstate, true);
}
}
ahlog(AH, 1, "finished main parallel loop\n");
+}
- /*
- * Remove the pstate again, so the exit handler will now fall back to
- * closing AH->connection again.
- */
- shutdown_info.pstate = NULL;
+static void
+restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
+{
+ RestoreOptions *ropt = AH->ropt;
+ TocEntry *te;
+
+ ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
/*
* Now reconnect the single parent connection.
@@ -3718,7 +3645,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
* dependencies, or some other pathological condition. If so, do it in the
* single parent connection.
*/
- for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
+ for (te = pending_list->par_next; te != pending_list; te = te->par_next)
{
ahlog(AH, 1, "processing missed item %d %s %s\n",
te->dumpId, te->desc, te->tag);
@@ -3729,121 +3656,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
}
/*
- * create a worker child to perform a restore step in parallel
- */
-static thandle
-spawn_restore(RestoreArgs *args)
-{
- thandle child;
-
- /* Ensure stdio state is quiesced before forking */
- fflush(NULL);
-
-#ifndef WIN32
- child = fork();
- if (child == 0)
- {
- /* in child process */
- parallel_restore(args);
- exit_horribly(modulename,
- "parallel_restore should not return\n");
- }
- else if (child < 0)
- {
- /* fork failed */
- exit_horribly(modulename,
- "could not create worker process: %s\n",
- strerror(errno));
- }
-#else
- child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
- args, 0, NULL);
- if (child == 0)
- exit_horribly(modulename,
- "could not create worker thread: %s\n",
- strerror(errno));
-#endif
-
- return child;
-}
-
-/*
- * collect status from a completed worker child
- */
-static thandle
-reap_child(ParallelSlot *slots, int n_slots, int *work_status)
-{
-#ifndef WIN32
- /* Unix is so much easier ... */
- return wait(work_status);
-#else
- static HANDLE *handles = NULL;
- int hindex,
- snum,
- tnum;
- thandle ret_child;
- DWORD res;
-
- /* first time around only, make space for handles to listen on */
- if (handles == NULL)
- handles = (HANDLE *) pg_malloc0(n_slots * sizeof(HANDLE));
-
- /* set up list of handles to listen to */
- for (snum = 0, tnum = 0; snum < n_slots; snum++)
- if (slots[snum].child_id != 0)
- handles[tnum++] = slots[snum].child_id;
-
- /* wait for one to finish */
- hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
-
- /* get handle of finished thread */
- ret_child = handles[hindex - WAIT_OBJECT_0];
-
- /* get the result */
- GetExitCodeThread(ret_child, &res);
- *work_status = res;
-
- /* dispose of handle to stop leaks */
- CloseHandle(ret_child);
-
- return ret_child;
-#endif
-}
-
-/*
- * are we doing anything now?
- */
-static bool
-work_in_progress(ParallelSlot *slots, int n_slots)
-{
- int i;
-
- for (i = 0; i < n_slots; i++)
- {
- if (slots[i].child_id != 0)
- return true;
- }
- return false;
-}
-
-/*
- * find the first free parallel slot (if any).
- */
-static int
-get_next_slot(ParallelSlot *slots, int n_slots)
-{
- int i;
-
- for (i = 0; i < n_slots; i++)
- {
- if (slots[i].child_id == 0)
- return i;
- }
- return NO_SLOT;
-}
-
-
-/*
* Check if te1 has an exclusive lock requirement for an item that te2 also
* requires, whether or not te2's requirement is for an exclusive lock.
*/
@@ -3916,7 +3728,7 @@ par_list_remove(TocEntry *te)
*/
static TocEntry *
get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
- ParallelSlot *slots, int n_slots)
+ ParallelState *pstate)
{
bool pref_non_data = false; /* or get from AH->ropt */
TocEntry *data_te = NULL;
@@ -3931,11 +3743,11 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
{
int count = 0;
- for (k = 0; k < n_slots; k++)
- if (slots[k].args->te != NULL &&
- slots[k].args->te->section == SECTION_DATA)
+ for (k = 0; k < pstate->numWorkers; k++)
+ if (pstate->parallelSlot[k].args->te != NULL &&
+ pstate->parallelSlot[k].args->te->section == SECTION_DATA)
count++;
- if (n_slots == 0 || count * 4 < n_slots)
+ if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
pref_non_data = false;
}
@@ -3951,13 +3763,13 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
* that a currently running item also needs lock on, or vice versa. If
* so, we don't want to schedule them together.
*/
- for (i = 0; i < n_slots && !conflicts; i++)
+ for (i = 0; i < pstate->numWorkers && !conflicts; i++)
{
TocEntry *running_te;
- if (slots[i].args == NULL)
+ if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
continue;
- running_te = slots[i].args->te;
+ running_te = pstate->parallelSlot[i].args->te;
if (has_lock_conflicts(te, running_te) ||
has_lock_conflicts(running_te, te))
@@ -3992,63 +3804,29 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
/*
* Restore a single TOC item in parallel with others
*
- * this is the procedure run as a thread (Windows) or a
- * separate process (everything else).
+ * this is run in the worker, i.e. in a thread (Windows) or a separate process
+ * (everything else). A worker process executes several such work items during
+ * a parallel backup or restore. Once we terminate here and report back that
+ * our work is finished, the master process will assign us a new work item.
*/
-static parallel_restore_result
-parallel_restore(RestoreArgs *args)
+int
+parallel_restore(ParallelArgs * args)
{
ArchiveHandle *AH = args->AH;
TocEntry *te = args->te;
RestoreOptions *ropt = AH->ropt;
- int retval;
-
- setProcessIdentifier(args->pse, AH);
-
- /*
- * Close and reopen the input file so we have a private file pointer that
- * doesn't stomp on anyone else's file pointer, if we're actually going to
- * need to read from the file. Otherwise, just close it except on Windows,
- * where it will possibly be needed by other threads.
- *
- * Note: on Windows, since we are using threads not processes, the reopen
- * call *doesn't* close the original file pointer but just open a new one.
- */
- if (te->section == SECTION_DATA)
- (AH->ReopenPtr) (AH);
-#ifndef WIN32
- else
- (AH->ClosePtr) (AH);
-#endif
-
- /*
- * We need our own database connection, too
- */
- ConnectDatabase((Archive *) AH, ropt->dbname,
- ropt->pghost, ropt->pgport, ropt->username,
- ropt->promptPassword);
+ int status;
_doSetFixedOutputState(AH);
- /* Restore the TOC item */
- retval = restore_toc_entry(AH, te, ropt, true);
-
- /* And clean up */
- DisconnectDatabase((Archive *) AH);
- unsetProcessIdentifier(args->pse);
+ Assert(AH->connection != NULL);
- /* If we reopened the file, we are done with it, so close it now */
- if (te->section == SECTION_DATA)
- (AH->ClosePtr) (AH);
+ AH->public.n_errors = 0;
- if (retval == 0 && AH->public.n_errors)
- retval = WORKER_IGNORED_ERRORS;
+ /* Restore the TOC item */
+ status = restore_toc_entry(AH, te, ropt, true);
-#ifndef WIN32
- exit(retval);
-#else
- return retval;
-#endif
+ return status;
}
@@ -4060,25 +3838,12 @@ parallel_restore(RestoreArgs *args)
*/
static void
mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
- thandle worker, int status,
- ParallelSlot *slots, int n_slots)
+ int worker, int status,
+ ParallelState *pstate)
{
TocEntry *te = NULL;
- int i;
-
- for (i = 0; i < n_slots; i++)
- {
- if (slots[i].child_id == worker)
- {
- slots[i].child_id = 0;
- te = slots[i].args->te;
- DeCloneArchive(slots[i].args->AH);
- free(slots[i].args);
- slots[i].args = NULL;
- break;
- }
- }
+ te = pstate->parallelSlot[worker].args->te;
if (te == NULL)
exit_horribly(modulename, "could not find slot of finished worker\n");
@@ -4179,8 +3944,8 @@ fix_dependencies(ArchiveHandle *AH)
/*
* Count the incoming dependencies for each item. Also, it is possible
* that the dependencies list items that are not in the archive at all
- * (that should not happen in 9.2 and later, but is highly likely in
- * older archives). Subtract such items from the depCounts.
+ * (that should not happen in 9.2 and later, but is highly likely in older
+ * archives). Subtract such items from the depCounts.
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
@@ -4377,16 +4142,13 @@ inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
}
}
-
/*
* Clone and de-clone routines used in parallel restoration.
*
* Enough of the structure is cloned to ensure that there is no
* conflict between different threads each with their own clone.
- *
- * These could be public, but no need at present.
*/
-static ArchiveHandle *
+ArchiveHandle *
CloneArchive(ArchiveHandle *AH)
{
ArchiveHandle *clone;
@@ -4412,9 +4174,60 @@ CloneArchive(ArchiveHandle *AH)
/* clone has its own error count, too */
clone->public.n_errors = 0;
+ /*
+ * Connect our new clone object to the database: In parallel restore the
+ * parent is already disconnected, because we can connect the worker
+ * processes independently to the database (no snapshot sync required). In
+ * parallel backup we clone the parent's existing connection.
+ */
+ if (AH->mode == archModeRead)
+ {
+ RestoreOptions *ropt = AH->ropt;
+
+ Assert(AH->connection == NULL);
+ /* this also sets clone->connection */
+ ConnectDatabase((Archive *) clone, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->promptPassword);
+ }
+ else
+ {
+ char *dbname;
+ char *pghost;
+ char *pgport;
+ char *username;
+ const char *encname;
+
+ Assert(AH->connection != NULL);
+
+ /*
+ * Even though we are technically accessing the parent's database
+ * object here, these functions are fine to be called like that
+ * because all just return a pointer and do not actually send/receive
+ * any data to/from the database.
+ */
+ dbname = PQdb(AH->connection);
+ pghost = PQhost(AH->connection);
+ pgport = PQport(AH->connection);
+ username = PQuser(AH->connection);
+ encname = pg_encoding_to_char(AH->public.encoding);
+
+ /* this also sets clone->connection */
+ ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+
+ /*
+ * Set the same encoding, whatever we set here is what we got from
+ * pg_encoding_to_char(), so we really shouldn't run into an error
+ * setting that very same value. Also see the comment in
+ * SetupConnection().
+ */
+ PQsetClientEncoding(clone->connection, encname);
+ }
+
/* Let the format-specific code have a chance too */
(clone->ClonePtr) (clone);
+ Assert(clone->connection != NULL);
return clone;
}
@@ -4423,7 +4236,7 @@ CloneArchive(ArchiveHandle *AH)
*
* Note: we assume any clone-local connection was already closed.
*/
-static void
+void
DeCloneArchive(ArchiveHandle *AH)
{
/* Clear format-specific state */