/*------------------------------------------------------------------------- * syncutils.c * PostgreSQL logical replication: common synchronization code * * Copyright (c) 2025, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/syncutils.c * * NOTES * This file contains code common for synchronization workers. *------------------------------------------------------------------------- */ #include "postgres.h" #include "catalog/pg_subscription_rel.h" #include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" /* * Enum for phases of the subscription relations state. * * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations * state is no longer valid, and the subscription relations should be rebuilt. * * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription * relations state is being rebuilt. * * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is * up-to-date and valid. */ typedef enum { SYNC_RELATIONS_STATE_NEEDS_REBUILD, SYNC_RELATIONS_STATE_REBUILD_STARTED, SYNC_RELATIONS_STATE_VALID, } SyncingRelationsState; static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; /* * Exit routine for synchronization worker. */ pg_noreturn void FinishSyncWorker(void) { Assert(am_sequencesync_worker() || am_tablesync_worker()); /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. */ if (IsTransactionState()) { CommitTransactionCommand(); pgstat_report_stat(true); } /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); if (am_sequencesync_worker()) { ereport(LOG, errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", MySubscription->name)); /* * Reset last_seqsync_start_time, so that next time a sequencesync * worker is needed it can be started promptly. */ logicalrep_reset_seqsync_start_time(); } else { StartTransactionCommand(); ereport(LOG, errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, InvalidOid); } /* Stop gracefully */ proc_exit(0); } /* * Callback from syscache invalidation. */ void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) { relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; } /* * Attempt to launch a sync worker for one or more sequences or a table, if * a worker slot is available and the retry interval has elapsed. * * wtype: sync worker type. * nsyncworkers: Number of currently running sync workers for the subscription. * relid: InvalidOid for sequencesync worker, actual relid for tablesync * worker. * last_start_time: Pointer to the last start time of the worker. */ void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time) { TimestampTz now; Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) || (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid))); /* If there is a free sync worker slot, start a new sync worker */ if (nsyncworkers >= max_sync_workers_per_subscription) return; now = GetCurrentTimestamp(); if (!(*last_start_time) || TimestampDifferenceExceeds(*last_start_time, now, wal_retrieve_retry_interval)) { /* * Set the last_start_time even if we fail to start the worker, so * that we won't retry until wal_retrieve_retry_interval has elapsed. */ *last_start_time = now; (void) logicalrep_worker_launch(wtype, MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, relid, DSM_HANDLE_INVALID, false); } } /* * Process possible state change(s) of relations that are being synchronized * and start new tablesync workers for the newly added tables. Also, start a * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) { switch (MyLogicalRepWorker->type) { case WORKERTYPE_PARALLEL_APPLY: /* * Skip for parallel apply workers because they only operate on * tables that are in a READY state. See pa_can_start() and * should_apply_changes_for_rel(). */ break; case WORKERTYPE_TABLESYNC: ProcessSyncingTablesForSync(current_lsn); break; case WORKERTYPE_APPLY: ProcessSyncingTablesForApply(current_lsn); ProcessSequencesForSync(); break; case WORKERTYPE_SEQUENCESYNC: /* Should never happen. */ elog(ERROR, "sequence synchronization worker is not expected to process relations"); break; case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); } } /* * Common code to fetch the up-to-date sync state info for tables and sequences. * * The pg_subscription_rel catalog is shared by tables and sequences. Changes * to either sequences or tables can affect the validity of relation states, so * we identify non-READY tables and non-READY sequences together to ensure * consistency. * * has_pending_subtables: true if the subscription has one or more tables that * are not in READY state, otherwise false. * has_pending_subsequences: true if the subscription has one or more sequences * that are not in READY state, otherwise false. */ void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx) { /* * has_subtables and has_subsequences_non_ready are declared as static, * since the same value can be used until the system table is invalidated. */ static bool has_subtables = false; static bool has_subsequences_non_ready = false; *started_tx = false; if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) { MemoryContext oldctx; List *rstates; SubscriptionRelState *rstate; relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; has_subsequences_non_ready = false; /* Clean the old lists. */ list_free_deep(table_states_not_ready); table_states_not_ready = NIL; if (!IsTransactionState()) { StartTransactionCommand(); *started_tx = true; } /* Fetch tables and sequences that are in non-READY state. */ rstates = GetSubscriptionRelations(MySubscription->oid, true, true, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); foreach_ptr(SubscriptionRelState, subrel, rstates) { if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE) has_subsequences_non_ready = true; else { rstate = palloc(sizeof(SubscriptionRelState)); memcpy(rstate, subrel, sizeof(SubscriptionRelState)); table_states_not_ready = lappend(table_states_not_ready, rstate); } } MemoryContextSwitchTo(oldctx); /* * Does the subscription have tables? * * If there were not-READY tables found then we know it does. But if * table_states_not_ready was empty we still need to check again to * see if there are 0 tables. */ has_subtables = (table_states_not_ready != NIL) || HasSubscriptionTables(MySubscription->oid); /* * If the subscription relation cache has been invalidated since we * entered this routine, we still use and return the relations we just * finished constructing, to avoid infinite loops, but we leave the * table states marked as stale so that we'll rebuild it again on next * access. Otherwise, we mark the table states as valid. */ if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } if (has_pending_subtables) *has_pending_subtables = has_subtables; if (has_pending_subsequences) *has_pending_subsequences = has_subsequences_non_ready; }