diff options
author | Andrew Dunstan <andrew@dunslane.net> | 2013-03-24 11:27:20 -0400 |
---|---|---|
committer | Andrew Dunstan <andrew@dunslane.net> | 2013-03-24 11:27:20 -0400 |
commit | 9e257a181cc1dc5e19eb5d770ce09cc98f470f5f (patch) | |
tree | a2b5c7a40cfe004d4838cd3be32e0177096fafbf /src/bin/pg_dump/pg_backup_archiver.c | |
parent | 3b91fe185a71c05ac4528f93a39ba27232acc9e0 (diff) |
Add parallel pg_dump option.
New infrastructure is added which creates a set number of workers
(threads on Windows, forked processes on Unix). Jobs are then
handed out to these workers by the master process as needed.
pg_restore is adjusted to use this new infrastructure in place of the
old setup which created a new worker for each step on the fly. Parallel
dumps acquire a snapshot clone in order to stay consistent, if
available.
The parallel option is selected by the -j / --jobs command line
parameter of pg_dump.
Joachim Wieland, lightly editorialized by Andrew Dunstan.
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 735 |
1 files changed, 274 insertions, 461 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 19d12788d9d..3c2671bb2d5 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -22,8 +22,10 @@ #include "pg_backup_db.h" #include "dumputils.h" +#include "parallel.h" #include <ctype.h> +#include <fcntl.h> #include <unistd.h> #include <sys/stat.h> #include <sys/types.h> @@ -35,72 +37,6 @@ #include "libpq/libpq-fs.h" -/* - * Special exit values from worker children. We reserve 0 for normal - * success; 1 and other small values should be interpreted as crashes. - */ -#define WORKER_CREATE_DONE 10 -#define WORKER_INHIBIT_DATA 11 -#define WORKER_IGNORED_ERRORS 12 - -/* - * Unix uses exit to return result from worker child, so function is void. - * Windows thread result comes via function return. - */ -#ifndef WIN32 -#define parallel_restore_result void -#else -#define parallel_restore_result DWORD -#endif - -/* IDs for worker children are either PIDs or thread handles */ -#ifndef WIN32 -#define thandle pid_t -#else -#define thandle HANDLE -#endif - -typedef struct ParallelStateEntry -{ -#ifdef WIN32 - unsigned int threadId; -#else - pid_t pid; -#endif - ArchiveHandle *AH; -} ParallelStateEntry; - -typedef struct ParallelState -{ - int numWorkers; - ParallelStateEntry *pse; -} ParallelState; - -/* Arguments needed for a worker child */ -typedef struct _restore_args -{ - ArchiveHandle *AH; - TocEntry *te; - ParallelStateEntry *pse; -} RestoreArgs; - -/* State for each parallel activity slot */ -typedef struct _parallel_slot -{ - thandle child_id; - RestoreArgs *args; -} ParallelSlot; - -typedef struct ShutdownInformation -{ - ParallelState *pstate; - Archive *AHX; -} ShutdownInformation; - -static ShutdownInformation shutdown_info; - -#define NO_SLOT (-1) - #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" @@ -116,7 +52,7 @@ static const char *modulename = gettext_noop("archiver"); static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, - const int compression, ArchiveMode mode); + const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr); static void _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH); static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isData, bool acl_pass); @@ -136,7 +72,6 @@ static bool _tocEntryIsACL(TocEntry *te); static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void buildTocEntryArrays(ArchiveHandle *AH); -static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id); static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te); static int _discoverArchiveFormat(ArchiveHandle *AH); @@ -149,21 +84,19 @@ static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool is_parallel); -static void restore_toc_entries_parallel(ArchiveHandle *AH); -static thandle spawn_restore(RestoreArgs *args); -static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); -static bool work_in_progress(ParallelSlot *slots, int n_slots); -static int get_next_slot(ParallelSlot *slots, int n_slots); +static void restore_toc_entries_prefork(ArchiveHandle *AH); +static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, + TocEntry *pending_list); +static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); static void par_list_header_init(TocEntry *l); static void par_list_append(TocEntry *l, TocEntry *te); static void par_list_remove(TocEntry *te); static TocEntry *get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, - ParallelSlot *slots, int n_slots); -static parallel_restore_result parallel_restore(RestoreArgs *args); + ParallelState *pstate); static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, - thandle worker, int status, - ParallelSlot *slots, int n_slots); + int worker, int status, + ParallelState *pstate); static void fix_dependencies(ArchiveHandle *AH); static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); @@ -172,14 +105,6 @@ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); -static ArchiveHandle *CloneArchive(ArchiveHandle *AH); -static void DeCloneArchive(ArchiveHandle *AH); - -static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH); -static void unsetProcessIdentifier(ParallelStateEntry *pse); -static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate); -static void archive_close_connection(int code, void *arg); - /* * Wrapper functions. @@ -189,15 +114,28 @@ static void archive_close_connection(int code, void *arg); * */ +/* + * The dump worker setup needs lots of knowledge of the internals of pg_dump, + * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker + * setup doesn't need to know anything much, so it's defined here. + */ +static void +setupRestoreWorker(Archive *AHX, RestoreOptions *ropt) +{ + ArchiveHandle *AH = (ArchiveHandle *) AHX; + + (AH->ReopenPtr) (AH); +} + /* Create a new archive */ /* Public */ Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, - const int compression, ArchiveMode mode) + const int compression, ArchiveMode mode, SetupWorkerPtr setupDumpWorker) { - ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode); + ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, mode, setupDumpWorker); return (Archive *) AH; } @@ -207,7 +145,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt, Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt) { - ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead); + ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, archModeRead, setupRestoreWorker); return (Archive *) AH; } @@ -311,7 +249,7 @@ RestoreArchive(Archive *AHX) /* * If we're going to do parallel restore, there are some restrictions. */ - parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB); + parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB); if (parallel_mode) { /* We haven't got round to making this work for all archive formats */ @@ -499,7 +437,25 @@ RestoreArchive(Archive *AHX) * In parallel mode, turn control over to the parallel-restore logic. */ if (parallel_mode) - restore_toc_entries_parallel(AH); + { + ParallelState *pstate; + TocEntry pending_list; + + par_list_header_init(&pending_list); + + /* This runs PRE_DATA items and then disconnects from the database */ + restore_toc_entries_prefork(AH); + Assert(AH->connection == NULL); + + /* ParallelBackupStart() will actually fork the processes */ + pstate = ParallelBackupStart(AH, ropt); + restore_toc_entries_parallel(AH, pstate, &pending_list); + ParallelBackupEnd(AH, pstate); + + /* reconnect the master and see if we missed something */ + restore_toc_entries_postfork(AH, &pending_list); + Assert(AH->connection != NULL); + } else { for (te = AH->toc->next; te != AH->toc; te = te->next) @@ -558,7 +514,7 @@ static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool is_parallel) { - int retval = 0; + int status = WORKER_OK; teReqs reqs; bool defnDumped; @@ -611,7 +567,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, if (ropt->noDataForFailedTables) { if (is_parallel) - retval = WORKER_INHIBIT_DATA; + status = WORKER_INHIBIT_DATA; else inhibit_data_for_failed_table(AH, te); } @@ -626,7 +582,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, * just set the return value. */ if (is_parallel) - retval = WORKER_CREATE_DONE; + status = WORKER_CREATE_DONE; else mark_create_done(AH, te); } @@ -744,7 +700,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, } } - return retval; + if (AH->public.n_errors > 0 && status == WORKER_OK) + status = WORKER_IGNORED_ERRORS; + + return status; } /* @@ -1634,7 +1593,7 @@ buildTocEntryArrays(ArchiveHandle *AH) } } -static TocEntry * +TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id) { /* build index arrays if we didn't already */ @@ -2018,7 +1977,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) */ static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, - const int compression, ArchiveMode mode) + const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr) { ArchiveHandle *AH; @@ -2100,6 +2059,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, } #endif + AH->SetupWorkerPtr = setupWorkerPtr; + if (fmt == archUnknown) AH->format = _discoverArchiveFormat(AH); else @@ -2132,50 +2093,66 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, return AH; } - void -WriteDataChunks(ArchiveHandle *AH) +WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { TocEntry *te; - StartDataPtr startPtr; - EndDataPtr endPtr; for (te = AH->toc->next; te != AH->toc; te = te->next) { - if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0) - { - AH->currToc = te; - /* printf("Writing data for %d (%x)\n", te->id, te); */ - - if (strcmp(te->desc, "BLOBS") == 0) - { - startPtr = AH->StartBlobsPtr; - endPtr = AH->EndBlobsPtr; - } - else - { - startPtr = AH->StartDataPtr; - endPtr = AH->EndDataPtr; - } + if (!te->dataDumper) + continue; - if (startPtr != NULL) - (*startPtr) (AH, te); + if ((te->reqs & REQ_DATA) == 0) + continue; + if (pstate && pstate->numWorkers > 1) + { /* - * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg); + * If we are in a parallel backup, then we are always the master + * process. */ + EnsureIdleWorker(AH, pstate); + Assert(GetIdleWorker(pstate) != NO_SLOT); + DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP); + } + else + WriteDataChunksForTocEntry(AH, te); + } + EnsureWorkersFinished(AH, pstate); +} - /* - * The user-provided DataDumper routine needs to call - * AH->WriteData - */ - (*te->dataDumper) ((Archive *) AH, te->dataDumperArg); +void +WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te) +{ + StartDataPtr startPtr; + EndDataPtr endPtr; - if (endPtr != NULL) - (*endPtr) (AH, te); - AH->currToc = NULL; - } + AH->currToc = te; + + if (strcmp(te->desc, "BLOBS") == 0) + { + startPtr = AH->StartBlobsPtr; + endPtr = AH->EndBlobsPtr; } + else + { + startPtr = AH->StartDataPtr; + endPtr = AH->EndDataPtr; + } + + if (startPtr != NULL) + (*startPtr) (AH, te); + + /* + * The user-provided DataDumper routine needs to call AH->WriteData + */ + (*te->dataDumper) ((Archive *) AH, te->dataDumperArg); + + if (endPtr != NULL) + (*endPtr) (AH, te); + + AH->currToc = NULL; } void @@ -2911,7 +2888,7 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH) const char *type = te->desc; /* Use ALTER TABLE for views and sequences */ - if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0|| + if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 || strcmp(type, "MATERIALIZED VIEW") == 0) type = "TABLE"; @@ -3404,67 +3381,6 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim) ahprintf(AH, "-- %s %s\n\n", msg, buf); } -static void -setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH) -{ -#ifdef WIN32 - pse->threadId = GetCurrentThreadId(); -#else - pse->pid = getpid(); -#endif - pse->AH = AH; -} - -static void -unsetProcessIdentifier(ParallelStateEntry *pse) -{ -#ifdef WIN32 - pse->threadId = 0; -#else - pse->pid = 0; -#endif - pse->AH = NULL; -} - -static ParallelStateEntry * -GetMyPSEntry(ParallelState *pstate) -{ - int i; - - for (i = 0; i < pstate->numWorkers; i++) -#ifdef WIN32 - if (pstate->pse[i].threadId == GetCurrentThreadId()) -#else - if (pstate->pse[i].pid == getpid()) -#endif - return &(pstate->pse[i]); - - return NULL; -} - -static void -archive_close_connection(int code, void *arg) -{ - ShutdownInformation *si = (ShutdownInformation *) arg; - - if (si->pstate) - { - ParallelStateEntry *entry = GetMyPSEntry(si->pstate); - - if (entry != NULL && entry->AH) - DisconnectDatabase(&(entry->AH->public)); - } - else if (si->AHX) - DisconnectDatabase(si->AHX); -} - -void -on_exit_close_archive(Archive *AHX) -{ - shutdown_info.AHX = AHX; - on_exit_nicely(archive_close_connection, &shutdown_info); -} - /* * Main engine for parallel restore. * @@ -3477,30 +3393,13 @@ on_exit_close_archive(Archive *AHX) * RestoreArchive). */ static void -restore_toc_entries_parallel(ArchiveHandle *AH) +restore_toc_entries_prefork(ArchiveHandle *AH) { RestoreOptions *ropt = AH->ropt; - int n_slots = ropt->number_of_jobs; - ParallelSlot *slots; - int work_status; - int next_slot; bool skipped_some; - TocEntry pending_list; - TocEntry ready_list; TocEntry *next_work_item; - thandle ret_child; - TocEntry *te; - ParallelState *pstate; - int i; - - ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); - slots = (ParallelSlot *) pg_malloc0(n_slots * sizeof(ParallelSlot)); - pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); - pstate->pse = (ParallelStateEntry *) pg_malloc0(n_slots * sizeof(ParallelStateEntry)); - pstate->numWorkers = ropt->number_of_jobs; - for (i = 0; i < pstate->numWorkers; i++) - unsetProcessIdentifier(&(pstate->pse[i])); + ahlog(AH, 2, "entering restore_toc_entries_prefork\n"); /* Adjust dependency information */ fix_dependencies(AH); @@ -3509,7 +3408,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) * Do all the early stuff in a single connection in the parent. There's no * great point in running it in parallel, in fact it will actually run * faster in a single connection because we avoid all the connection and - * setup overhead. Also, pre-9.2 pg_dump versions were not very good + * setup overhead. Also, pre-9.2 pg_dump versions were not very good * about showing all the dependencies of SECTION_PRE_DATA items, so we do * not risk trying to process them out-of-order. * @@ -3561,12 +3460,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH) */ DisconnectDatabase(&AH->public); - /* - * Set the pstate in the shutdown_info. The exit handler uses pstate if - * set and falls back to AHX otherwise. - */ - shutdown_info.pstate = pstate; - /* blow away any transient state from the old connection */ if (AH->currUser) free(AH->currUser); @@ -3578,17 +3471,42 @@ restore_toc_entries_parallel(ArchiveHandle *AH) free(AH->currTablespace); AH->currTablespace = NULL; AH->currWithOids = -1; +} + +/* + * Main engine for parallel restore. + * + * Work is done in three phases. + * First we process all SECTION_PRE_DATA tocEntries, in a single connection, + * just as for a standard restore. This is done in restore_toc_entries_prefork(). + * Second we process the remaining non-ACL steps in parallel worker children + * (threads on Windows, processes on Unix), these fork off and set up their + * connections before we call restore_toc_entries_parallel_forked. + * Finally we process all the ACL entries in a single connection (that happens + * back in RestoreArchive). + */ +static void +restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, + TocEntry *pending_list) +{ + int work_status; + bool skipped_some; + TocEntry ready_list; + TocEntry *next_work_item; + int ret_child; + + ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); /* - * Initialize the lists of pending and ready items. After this setup, the - * pending list is everything that needs to be done but is blocked by one - * or more dependencies, while the ready list contains items that have no - * remaining dependencies. Note: we don't yet filter out entries that - * aren't going to be restored. They might participate in dependency + * Initialize the lists of ready items, the list for pending items has + * already been initialized in the caller. After this setup, the pending + * list is everything that needs to be done but is blocked by one or more + * dependencies, while the ready list contains items that have no + * remaining dependencies. Note: we don't yet filter out entries that + * aren't going to be restored. They might participate in dependency * chains connecting entries that should be restored, so we treat them as * live until we actually process them. */ - par_list_header_init(&pending_list); par_list_header_init(&ready_list); skipped_some = false; for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) @@ -3613,7 +3531,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) } if (next_work_item->depCount > 0) - par_list_append(&pending_list, next_work_item); + par_list_append(pending_list, next_work_item); else par_list_append(&ready_list, next_work_item); } @@ -3627,9 +3545,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH) ahlog(AH, 1, "entering main parallel loop\n"); - while ((next_work_item = get_next_work_item(AH, &ready_list, - slots, n_slots)) != NULL || - work_in_progress(slots, n_slots)) + while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL || + !IsEveryWorkerIdle(pstate)) { if (next_work_item != NULL) { @@ -3647,62 +3564,72 @@ restore_toc_entries_parallel(ArchiveHandle *AH) continue; } - if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT) - { - /* There is work still to do and a worker slot available */ - thandle child; - RestoreArgs *args; + ahlog(AH, 1, "launching item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); - ahlog(AH, 1, "launching item %d %s %s\n", - next_work_item->dumpId, - next_work_item->desc, next_work_item->tag); + par_list_remove(next_work_item); - par_list_remove(next_work_item); - - /* this memory is dealloced in mark_work_done() */ - args = pg_malloc(sizeof(RestoreArgs)); - args->AH = CloneArchive(AH); - args->te = next_work_item; - args->pse = &pstate->pse[next_slot]; + Assert(GetIdleWorker(pstate) != NO_SLOT); + DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE); + } + else + /* at least one child is working and we have nothing ready. */ + Assert(!IsEveryWorkerIdle(pstate)); - /* run the step in a worker child */ - child = spawn_restore(args); + for (;;) + { + int nTerm = 0; - slots[next_slot].child_id = child; - slots[next_slot].args = args; + /* + * In order to reduce dependencies as soon as possible and + * especially to reap the status of workers who are working on + * items that pending items depend on, we do a non-blocking check + * for ended workers first. + * + * However, if we do not have any other work items currently that + * workers can work on, we do not busy-loop here but instead + * really wait for at least one worker to terminate. Hence we call + * ListenToWorkers(..., ..., do_wait = true) in this case. + */ + ListenToWorkers(AH, pstate, !next_work_item); - continue; + while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) + { + nTerm++; + mark_work_done(AH, &ready_list, ret_child, work_status, pstate); } - } - /* - * If we get here there must be work being done. Either there is no - * work available to schedule (and work_in_progress returned true) or - * there are no slots available. So we wait for a worker to finish, - * and process the result. - */ - ret_child = reap_child(slots, n_slots, &work_status); + /* + * We need to make sure that we have an idle worker before + * re-running the loop. If nTerm > 0 we already have that (quick + * check). + */ + if (nTerm > 0) + break; - if (WIFEXITED(work_status)) - { - mark_work_done(AH, &ready_list, - ret_child, WEXITSTATUS(work_status), - slots, n_slots); - } - else - { - exit_horribly(modulename, "worker process crashed: status %d\n", - work_status); + /* if nobody terminated, explicitly check for an idle worker */ + if (GetIdleWorker(pstate) != NO_SLOT) + break; + + /* + * If we have no idle worker, read the result of one or more + * workers and loop the loop to call ReapWorkerStatus() on them. + */ + ListenToWorkers(AH, pstate, true); } } ahlog(AH, 1, "finished main parallel loop\n"); +} - /* - * Remove the pstate again, so the exit handler will now fall back to - * closing AH->connection again. - */ - shutdown_info.pstate = NULL; +static void +restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) +{ + RestoreOptions *ropt = AH->ropt; + TocEntry *te; + + ahlog(AH, 2, "entering restore_toc_entries_postfork\n"); /* * Now reconnect the single parent connection. @@ -3718,7 +3645,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH) * dependencies, or some other pathological condition. If so, do it in the * single parent connection. */ - for (te = pending_list.par_next; te != &pending_list; te = te->par_next) + for (te = pending_list->par_next; te != pending_list; te = te->par_next) { ahlog(AH, 1, "processing missed item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -3729,121 +3656,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH) } /* - * create a worker child to perform a restore step in parallel - */ -static thandle -spawn_restore(RestoreArgs *args) -{ - thandle child; - - /* Ensure stdio state is quiesced before forking */ - fflush(NULL); - -#ifndef WIN32 - child = fork(); - if (child == 0) - { - /* in child process */ - parallel_restore(args); - exit_horribly(modulename, - "parallel_restore should not return\n"); - } - else if (child < 0) - { - /* fork failed */ - exit_horribly(modulename, - "could not create worker process: %s\n", - strerror(errno)); - } -#else - child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore, - args, 0, NULL); - if (child == 0) - exit_horribly(modulename, - "could not create worker thread: %s\n", - strerror(errno)); -#endif - - return child; -} - -/* - * collect status from a completed worker child - */ -static thandle -reap_child(ParallelSlot *slots, int n_slots, int *work_status) -{ -#ifndef WIN32 - /* Unix is so much easier ... */ - return wait(work_status); -#else - static HANDLE *handles = NULL; - int hindex, - snum, - tnum; - thandle ret_child; - DWORD res; - - /* first time around only, make space for handles to listen on */ - if (handles == NULL) - handles = (HANDLE *) pg_malloc0(n_slots * sizeof(HANDLE)); - - /* set up list of handles to listen to */ - for (snum = 0, tnum = 0; snum < n_slots; snum++) - if (slots[snum].child_id != 0) - handles[tnum++] = slots[snum].child_id; - - /* wait for one to finish */ - hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE); - - /* get handle of finished thread */ - ret_child = handles[hindex - WAIT_OBJECT_0]; - - /* get the result */ - GetExitCodeThread(ret_child, &res); - *work_status = res; - - /* dispose of handle to stop leaks */ - CloseHandle(ret_child); - - return ret_child; -#endif -} - -/* - * are we doing anything now? - */ -static bool -work_in_progress(ParallelSlot *slots, int n_slots) -{ - int i; - - for (i = 0; i < n_slots; i++) - { - if (slots[i].child_id != 0) - return true; - } - return false; -} - -/* - * find the first free parallel slot (if any). - */ -static int -get_next_slot(ParallelSlot *slots, int n_slots) -{ - int i; - - for (i = 0; i < n_slots; i++) - { - if (slots[i].child_id == 0) - return i; - } - return NO_SLOT; -} - - -/* * Check if te1 has an exclusive lock requirement for an item that te2 also * requires, whether or not te2's requirement is for an exclusive lock. */ @@ -3916,7 +3728,7 @@ par_list_remove(TocEntry *te) */ static TocEntry * get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, - ParallelSlot *slots, int n_slots) + ParallelState *pstate) { bool pref_non_data = false; /* or get from AH->ropt */ TocEntry *data_te = NULL; @@ -3931,11 +3743,11 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, { int count = 0; - for (k = 0; k < n_slots; k++) - if (slots[k].args->te != NULL && - slots[k].args->te->section == SECTION_DATA) + for (k = 0; k < pstate->numWorkers; k++) + if (pstate->parallelSlot[k].args->te != NULL && + pstate->parallelSlot[k].args->te->section == SECTION_DATA) count++; - if (n_slots == 0 || count * 4 < n_slots) + if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) pref_non_data = false; } @@ -3951,13 +3763,13 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ - for (i = 0; i < n_slots && !conflicts; i++) + for (i = 0; i < pstate->numWorkers && !conflicts; i++) { TocEntry *running_te; - if (slots[i].args == NULL) + if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) continue; - running_te = slots[i].args->te; + running_te = pstate->parallelSlot[i].args->te; if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) @@ -3992,63 +3804,29 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, /* * Restore a single TOC item in parallel with others * - * this is the procedure run as a thread (Windows) or a - * separate process (everything else). + * this is run in the worker, i.e. in a thread (Windows) or a separate process + * (everything else). A worker process executes several such work items during + * a parallel backup or restore. Once we terminate here and report back that + * our work is finished, the master process will assign us a new work item. */ -static parallel_restore_result -parallel_restore(RestoreArgs *args) +int +parallel_restore(ParallelArgs * args) { ArchiveHandle *AH = args->AH; TocEntry *te = args->te; RestoreOptions *ropt = AH->ropt; - int retval; - - setProcessIdentifier(args->pse, AH); - - /* - * Close and reopen the input file so we have a private file pointer that - * doesn't stomp on anyone else's file pointer, if we're actually going to - * need to read from the file. Otherwise, just close it except on Windows, - * where it will possibly be needed by other threads. - * - * Note: on Windows, since we are using threads not processes, the reopen - * call *doesn't* close the original file pointer but just open a new one. - */ - if (te->section == SECTION_DATA) - (AH->ReopenPtr) (AH); -#ifndef WIN32 - else - (AH->ClosePtr) (AH); -#endif - - /* - * We need our own database connection, too - */ - ConnectDatabase((Archive *) AH, ropt->dbname, - ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + int status; _doSetFixedOutputState(AH); - /* Restore the TOC item */ - retval = restore_toc_entry(AH, te, ropt, true); - - /* And clean up */ - DisconnectDatabase((Archive *) AH); - unsetProcessIdentifier(args->pse); + Assert(AH->connection != NULL); - /* If we reopened the file, we are done with it, so close it now */ - if (te->section == SECTION_DATA) - (AH->ClosePtr) (AH); + AH->public.n_errors = 0; - if (retval == 0 && AH->public.n_errors) - retval = WORKER_IGNORED_ERRORS; + /* Restore the TOC item */ + status = restore_toc_entry(AH, te, ropt, true); -#ifndef WIN32 - exit(retval); -#else - return retval; -#endif + return status; } @@ -4060,25 +3838,12 @@ parallel_restore(RestoreArgs *args) */ static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, - thandle worker, int status, - ParallelSlot *slots, int n_slots) + int worker, int status, + ParallelState *pstate) { TocEntry *te = NULL; - int i; - - for (i = 0; i < n_slots; i++) - { - if (slots[i].child_id == worker) - { - slots[i].child_id = 0; - te = slots[i].args->te; - DeCloneArchive(slots[i].args->AH); - free(slots[i].args); - slots[i].args = NULL; - break; - } - } + te = pstate->parallelSlot[worker].args->te; if (te == NULL) exit_horribly(modulename, "could not find slot of finished worker\n"); @@ -4179,8 +3944,8 @@ fix_dependencies(ArchiveHandle *AH) /* * Count the incoming dependencies for each item. Also, it is possible * that the dependencies list items that are not in the archive at all - * (that should not happen in 9.2 and later, but is highly likely in - * older archives). Subtract such items from the depCounts. + * (that should not happen in 9.2 and later, but is highly likely in older + * archives). Subtract such items from the depCounts. */ for (te = AH->toc->next; te != AH->toc; te = te->next) { @@ -4377,16 +4142,13 @@ inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te) } } - /* * Clone and de-clone routines used in parallel restoration. * * Enough of the structure is cloned to ensure that there is no * conflict between different threads each with their own clone. - * - * These could be public, but no need at present. */ -static ArchiveHandle * +ArchiveHandle * CloneArchive(ArchiveHandle *AH) { ArchiveHandle *clone; @@ -4412,9 +4174,60 @@ CloneArchive(ArchiveHandle *AH) /* clone has its own error count, too */ clone->public.n_errors = 0; + /* + * Connect our new clone object to the database: In parallel restore the + * parent is already disconnected, because we can connect the worker + * processes independently to the database (no snapshot sync required). In + * parallel backup we clone the parent's existing connection. + */ + if (AH->mode == archModeRead) + { + RestoreOptions *ropt = AH->ropt; + + Assert(AH->connection == NULL); + /* this also sets clone->connection */ + ConnectDatabase((Archive *) clone, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->promptPassword); + } + else + { + char *dbname; + char *pghost; + char *pgport; + char *username; + const char *encname; + + Assert(AH->connection != NULL); + + /* + * Even though we are technically accessing the parent's database + * object here, these functions are fine to be called like that + * because all just return a pointer and do not actually send/receive + * any data to/from the database. + */ + dbname = PQdb(AH->connection); + pghost = PQhost(AH->connection); + pgport = PQport(AH->connection); + username = PQuser(AH->connection); + encname = pg_encoding_to_char(AH->public.encoding); + + /* this also sets clone->connection */ + ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO); + + /* + * Set the same encoding, whatever we set here is what we got from + * pg_encoding_to_char(), so we really shouldn't run into an error + * setting that very same value. Also see the comment in + * SetupConnection(). + */ + PQsetClientEncoding(clone->connection, encname); + } + /* Let the format-specific code have a chance too */ (clone->ClonePtr) (clone); + Assert(clone->connection != NULL); return clone; } @@ -4423,7 +4236,7 @@ CloneArchive(ArchiveHandle *AH) * * Note: we assume any clone-local connection was already closed. */ -static void +void DeCloneArchive(ArchiveHandle *AH) { /* Clear format-specific state */ |