diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 196 |
1 files changed, 21 insertions, 175 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e6da4028d39..2ba12517e93 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -117,59 +117,16 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" -#include "utils/memutils.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/usercontext.h" -typedef enum -{ - SYNC_TABLE_STATE_NEEDS_REBUILD, - SYNC_TABLE_STATE_REBUILD_STARTED, - SYNC_TABLE_STATE_VALID, -} SyncingTablesState; - -static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -static List *table_states_not_ready = NIL; -static bool FetchTableStates(bool *started_tx); +List *table_states_not_ready = NIL; static StringInfo copybuf = NULL; /* - * Exit routine for synchronization worker. - */ -pg_noreturn static void -finish_sync_worker(void) -{ - /* - * Commit any outstanding transaction. This is the usual case, unless - * there was nothing to do for the table. - */ - if (IsTransactionState()) - { - CommitTransactionCommand(); - pgstat_report_stat(true); - } - - /* And flush all writes. */ - XLogFlush(GetXLogWriteRecPtr()); - - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); - - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - - /* Stop gracefully */ - proc_exit(0); -} - -/* * Wait until the relation sync state is set in the catalog to the expected * one; return true when it happens. * @@ -180,7 +137,7 @@ finish_sync_worker(void) * CATCHUP state to SYNCDONE. */ static bool -wait_for_relation_state_change(Oid relid, char expected_state) +wait_for_table_state_change(Oid relid, char expected_state) { char state; @@ -274,15 +231,6 @@ wait_for_worker_state_change(char expected_state) } /* - * Callback from syscache invalidation. - */ -void -invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) -{ - table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; -} - -/* * Handle table synchronization cooperation from the synchronization * worker. * @@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * predetermined synchronization point in the WAL stream, mark the table as * SYNCDONE and finish. */ -static void -process_syncing_tables_for_sync(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForSync(XLogRecPtr current_lsn) { SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) /* * Start a new transaction to clean up the tablesync origin tracking. - * This transaction will be ended within the finish_sync_worker(). - * Now, even, if we fail to remove this here, the apply worker will - * ensure to clean it up afterward. + * This transaction will be ended within the FinishSyncWorker(). Now, + * even, if we fail to remove this here, the apply worker will ensure + * to clean it up afterward. * * We need to do this after the table state is set to SYNCDONE. * Otherwise, if an error occurs while performing the database @@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); - finish_sync_worker(); + FinishSyncWorker(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * If the synchronization position is reached (SYNCDONE), then the table can * be marked as READY and is no longer tracked. */ -static void -process_syncing_tables_for_apply(XLogRecPtr current_lsn) +void +ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { struct tablesync_start_time_mapping { @@ -431,7 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchTableStates(&started_tx); + FetchRelationStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -586,8 +534,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; - wait_for_relation_state_change(rstate->relid, - SUBREL_STATE_SYNCDONE); + wait_for_table_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); } else LWLockRelease(LogicalRepWorkerLock); @@ -690,37 +638,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Process possible state change(s) of tables that are being synchronized. - */ -void -process_syncing_tables(XLogRecPtr current_lsn) -{ - switch (MyLogicalRepWorker->type) - { - case WORKERTYPE_PARALLEL_APPLY: - - /* - * Skip for parallel apply workers because they only operate on - * tables that are in a READY state. See pa_can_start() and - * should_apply_changes_for_rel(). - */ - break; - - case WORKERTYPE_TABLESYNC: - process_syncing_tables_for_sync(current_lsn); - break; - - case WORKERTYPE_APPLY: - process_syncing_tables_for_apply(current_lsn); - break; - - case WORKERTYPE_UNKNOWN: - /* Should never happen. */ - elog(ERROR, "Unknown worker type"); - } -} - -/* * Create list of columns for COPY based on logical relation mapping. */ static List * @@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: - finish_sync_worker(); /* doesn't return */ + FinishSyncWorker(); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ @@ -1600,77 +1517,6 @@ copy_table_done: } /* - * Common code to fetch the up-to-date sync state info into the static lists. - * - * Returns true if subscription has 1 or more tables, else false. - * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. - */ -static bool -FetchTableStates(bool *started_tx) -{ - static bool has_subrels = false; - - *started_tx = false; - - if (table_states_validity != SYNC_TABLE_STATE_VALID) - { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED; - - /* Clean the old lists. */ - list_free_deep(table_states_not_ready); - table_states_not_ready = NIL; - - if (!IsTransactionState()) - { - StartTransactionCommand(); - *started_tx = true; - } - - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); - - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) - { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); - } - MemoryContextSwitchTo(oldctx); - - /* - * Does the subscription have tables? - * - * If there were not-READY relations found then we know it does. But - * if table_states_not_ready was empty we still need to check again to - * see if there are 0 tables. - */ - has_subrels = (table_states_not_ready != NIL) || - HasSubscriptionRelations(MySubscription->oid); - - /* - * If the subscription relation cache has been invalidated since we - * entered this routine, we still use and return the relations we just - * finished constructing, to avoid infinite loops, but we leave the - * table states marked as stale so that we'll rebuild it again on next - * access. Otherwise, we mark the table states as valid. - */ - if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED) - table_states_validity = SYNC_TABLE_STATE_VALID; - } - - return has_subrels; -} - -/* * Execute the initial sync with error handling. Disable the subscription, * if it's required. * @@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg) run_tablesync_worker(); - finish_sync_worker(); + FinishSyncWorker(); } /* @@ -1773,7 +1619,7 @@ AllTablesyncsReady(void) bool has_subrels = false; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = FetchRelationStates(&started_tx); if (started_tx) { @@ -1789,21 +1635,21 @@ AllTablesyncsReady(void) } /* - * Return whether the subscription currently has any relations. + * Return whether the subscription currently has any tables. * - * Note: Unlike HasSubscriptionRelations(), this function relies on cached - * information for subscription relations. Additionally, it should not be + * Note: Unlike HasSubscriptionTables(), this function relies on cached + * information for subscription tables. Additionally, it should not be * invoked outside of apply or tablesync workers, as MySubscription must be * initialized first. */ bool -HasSubscriptionRelationsCached(void) +HasSubscriptionTablesCached(void) { bool started_tx; bool has_subrels; /* We need up-to-date subscription tables info here */ - has_subrels = FetchTableStates(&started_tx); + has_subrels = FetchRelationStates(&started_tx); if (started_tx) { |
