diff options
Diffstat (limited to 'src/backend/replication/logical/syncutils.c')
| -rw-r--r-- | src/backend/replication/logical/syncutils.c | 139 |
1 files changed, 115 insertions, 24 deletions
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index ae8c9385916..a696b1f2dc0 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -16,6 +16,7 @@ #include "catalog/pg_subscription_rel.h" #include "pgstat.h" +#include "replication/logicallauncher.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "utils/lsyscache.h" @@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE pg_noreturn void FinishSyncWorker(void) { + Assert(am_sequencesync_worker() || am_tablesync_worker()); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -61,16 +64,31 @@ FinishSyncWorker(void) /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); + if (am_sequencesync_worker()) + { + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); + + /* + * Reset last_seqsync_start_time, so that next time a sequencesync + * worker is needed it can be started promptly. + */ + logicalrep_reset_seqsync_start_time(); + } + else + { + StartTransactionCommand(); + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, - InvalidOid); + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, + InvalidOid); + } /* Stop gracefully */ proc_exit(0); @@ -86,7 +104,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Attempt to launch a sync worker for one or more sequences or a table, if + * a worker slot is available and the retry interval has elapsed. + * + * wtype: sync worker type. + * nsyncworkers: Number of currently running sync workers for the subscription. + * relid: InvalidOid for sequencesync worker, actual relid for tablesync + * worker. + * last_start_time: Pointer to the last start time of the worker. + */ +void +launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, + TimestampTz *last_start_time) +{ + TimestampTz now; + + Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) || + (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid))); + + /* If there is a free sync worker slot, start a new sync worker */ + if (nsyncworkers >= max_sync_workers_per_subscription) + return; + + now = GetCurrentTimestamp(); + + if (!(*last_start_time) || + TimestampDifferenceExceeds(*last_start_time, now, + wal_retrieve_retry_interval)) + { + /* + * Set the last_start_time even if we fail to start the worker, so + * that we won't retry until wal_retrieve_retry_interval has elapsed. + */ + *last_start_time = now; + (void) logicalrep_worker_launch(wtype, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + relid, DSM_HANDLE_INVALID, false); + } +} + +/* + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) @@ -108,6 +171,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) case WORKERTYPE_APPLY: ProcessSyncingTablesForApply(current_lsn); + ProcessSequencesForSync(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to process relations"); break; case WORKERTYPE_UNKNOWN: @@ -117,17 +186,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) } /* - * Common code to fetch the up-to-date sync state info into the static lists. + * Common code to fetch the up-to-date sync state info for tables and sequences. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we identify non-READY tables and non-READY sequences together to ensure + * consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * has_pending_subtables: true if the subscription has one or more tables that + * are not in READY state, otherwise false. + * has_pending_subsequences: true if the subscription has one or more sequences + * that are not in READY state, otherwise false. */ -bool -FetchRelationStates(bool *started_tx) +void +FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_subsequences, + bool *started_tx) { + /* + * has_subtables and has_subsequences_non_ready are declared as static, + * since the same value can be used until the system table is invalidated. + */ static bool has_subtables = false; + static bool has_subsequences_non_ready = false; *started_tx = false; @@ -135,10 +216,10 @@ FetchRelationStates(bool *started_tx) { MemoryContext oldctx; List *rstates; - ListCell *lc; SubscriptionRelState *rstate; relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + has_subsequences_non_ready = false; /* Clean the old lists. */ list_free_deep(table_states_not_ready); @@ -150,17 +231,23 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true, false, + /* Fetch tables and sequences that are in non-READY state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) + foreach_ptr(SubscriptionRelState, subrel, rstates) { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE) + has_subsequences_non_ready = true; + else + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, subrel, sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, + rstate); + } } MemoryContextSwitchTo(oldctx); @@ -185,5 +272,9 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } - return has_subtables; + if (has_pending_subtables) + *has_pending_subtables = has_subtables; + + if (has_pending_subsequences) + *has_pending_subsequences = has_subsequences_non_ready; } |
