summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2025-10-16 05:10:50 +0000
committerAmit Kapila <akapila@postgresql.org>2025-10-16 05:10:50 +0000
commit41c674d2e31e8304a6edbcb5183d326798ba00f6 (patch)
tree590620215a2e855f26223558abf8197d05592212
parent905e932f0922a837bb3e4e482089c7c2e98bea67 (diff)
Refactor logical worker synchronization code into a separate file.
To support the upcoming addition of a sequence synchronization worker, this patch extracts common synchronization logic shared by table sync workers and the new sequence sync worker into a dedicated file. This modularization improves code reuse, maintainability, and clarity in the logical workers framework. Author: vignesh C <vignesh21@gmail.com> Author: Hou Zhijie <houzj.fnst@fujitsu.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
-rw-r--r--src/backend/catalog/pg_subscription.c4
-rw-r--r--src/backend/replication/logical/Makefile1
-rw-r--r--src/backend/replication/logical/applyparallelworker.c2
-rw-r--r--src/backend/replication/logical/meson.build1
-rw-r--r--src/backend/replication/logical/syncutils.c187
-rw-r--r--src/backend/replication/logical/tablesync.c196
-rw-r--r--src/backend/replication/logical/worker.c22
-rw-r--r--src/bin/pg_dump/common.c4
-rw-r--r--src/bin/pg_dump/pg_dump.c8
-rw-r--r--src/bin/pg_dump/pg_dump.h2
-rw-r--r--src/include/catalog/pg_subscription_rel.h2
-rw-r--r--src/include/replication/worker_internal.h14
-rw-r--r--src/tools/pgindent/typedefs.list2
13 files changed, 243 insertions, 202 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index b885890de37..e06587b0265 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -506,13 +506,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
}
/*
- * Does the subscription have any relations?
+ * Does the subscription have any tables?
*
* Use this function only to know true/false, and when you have no need for the
* List returned by GetSubscriptionRelations.
*/
bool
-HasSubscriptionRelations(Oid subid)
+HasSubscriptionTables(Oid subid)
{
Relation rel;
ScanKeyData skey[1];
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 1e08bbbd4eb..c62c8c67521 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -28,6 +28,7 @@ OBJS = \
reorderbuffer.o \
slotsync.o \
snapbuild.o \
+ syncutils.o \
tablesync.o \
worker.o
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 33b7ec7f029..14325581afc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -970,7 +970,7 @@ ParallelApplyWorkerMain(Datum main_arg)
* the subscription relation state.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
- invalidate_syncing_table_states,
+ InvalidateSyncingRelStates,
(Datum) 0);
set_apply_error_context_origin(originname);
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 6f19614c79d..9283e996ef4 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -14,6 +14,7 @@ backend_sources += files(
'reorderbuffer.c',
'slotsync.c',
'snapbuild.c',
+ 'syncutils.c',
'tablesync.c',
'worker.c',
)
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
new file mode 100644
index 00000000000..1bb3ca01db0
--- /dev/null
+++ b/src/backend/replication/logical/syncutils.c
@@ -0,0 +1,187 @@
+/*-------------------------------------------------------------------------
+ * syncutils.c
+ * PostgreSQL logical replication: common synchronization code
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/syncutils.c
+ *
+ * NOTES
+ * This file contains code common for synchronization workers.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "catalog/pg_subscription_rel.h"
+#include "pgstat.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+
+/*
+ * Enum for phases of the subscription relations state.
+ *
+ * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
+ * state is no longer valid, and the subscription relations should be rebuilt.
+ *
+ * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
+ * relations state is being rebuilt.
+ *
+ * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
+ * up-to-date and valid.
+ */
+typedef enum
+{
+ SYNC_RELATIONS_STATE_NEEDS_REBUILD,
+ SYNC_RELATIONS_STATE_REBUILD_STARTED,
+ SYNC_RELATIONS_STATE_VALID,
+} SyncingRelationsState;
+
+static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
+
+/*
+ * Exit routine for synchronization worker.
+ */
+pg_noreturn void
+FinishSyncWorker(void)
+{
+ /*
+ * Commit any outstanding transaction. This is the usual case, unless
+ * there was nothing to do for the table.
+ */
+ if (IsTransactionState())
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(true);
+ }
+
+ /* And flush all writes. */
+ XLogFlush(GetXLogWriteRecPtr());
+
+ StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
+ CommitTransactionCommand();
+
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+
+ /* Stop gracefully */
+ proc_exit(0);
+}
+
+/*
+ * Callback from syscache invalidation.
+ */
+void
+InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
+{
+ relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized.
+ */
+void
+ProcessSyncingRelations(XLogRecPtr current_lsn)
+{
+ switch (MyLogicalRepWorker->type)
+ {
+ case WORKERTYPE_PARALLEL_APPLY:
+
+ /*
+ * Skip for parallel apply workers because they only operate on
+ * tables that are in a READY state. See pa_can_start() and
+ * should_apply_changes_for_rel().
+ */
+ break;
+
+ case WORKERTYPE_TABLESYNC:
+ ProcessSyncingTablesForSync(current_lsn);
+ break;
+
+ case WORKERTYPE_APPLY:
+ ProcessSyncingTablesForApply(current_lsn);
+ break;
+
+ case WORKERTYPE_UNKNOWN:
+ /* Should never happen. */
+ elog(ERROR, "Unknown worker type");
+ }
+}
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+bool
+FetchRelationStates(bool *started_tx)
+{
+ static bool has_subtables = false;
+
+ *started_tx = false;
+
+ if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
+ {
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
+ SubscriptionRelState *rstate;
+
+ relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+
+ /* Clean the old lists. */
+ list_free_deep(table_states_not_ready);
+ table_states_not_ready = NIL;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ *started_tx = true;
+ }
+
+ /* Fetch tables and sequences that are in non-ready state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true);
+
+ /* Allocate the tracking info in a permanent memory context. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ foreach(lc, rstates)
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready, rstate);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Does the subscription have tables?
+ *
+ * If there were not-READY tables found then we know it does. But if
+ * table_states_not_ready was empty we still need to check again to
+ * see if there are 0 tables.
+ */
+ has_subtables = (table_states_not_ready != NIL) ||
+ HasSubscriptionTables(MySubscription->oid);
+
+ /*
+ * If the subscription relation cache has been invalidated since we
+ * entered this routine, we still use and return the relations we just
+ * finished constructing, to avoid infinite loops, but we leave the
+ * table states marked as stale so that we'll rebuild it again on next
+ * access. Otherwise, we mark the table states as valid.
+ */
+ if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
+ relation_states_validity = SYNC_RELATIONS_STATE_VALID;
+ }
+
+ return has_subtables;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e6da4028d39..2ba12517e93 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -117,59 +117,16 @@
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
-#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
-typedef enum
-{
- SYNC_TABLE_STATE_NEEDS_REBUILD,
- SYNC_TABLE_STATE_REBUILD_STARTED,
- SYNC_TABLE_STATE_VALID,
-} SyncingTablesState;
-
-static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
-static List *table_states_not_ready = NIL;
-static bool FetchTableStates(bool *started_tx);
+List *table_states_not_ready = NIL;
static StringInfo copybuf = NULL;
/*
- * Exit routine for synchronization worker.
- */
-pg_noreturn static void
-finish_sync_worker(void)
-{
- /*
- * Commit any outstanding transaction. This is the usual case, unless
- * there was nothing to do for the table.
- */
- if (IsTransactionState())
- {
- CommitTransactionCommand();
- pgstat_report_stat(true);
- }
-
- /* And flush all writes. */
- XLogFlush(GetXLogWriteRecPtr());
-
- StartTransactionCommand();
- ereport(LOG,
- (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
- CommitTransactionCommand();
-
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
-
- /* Stop gracefully */
- proc_exit(0);
-}
-
-/*
* Wait until the relation sync state is set in the catalog to the expected
* one; return true when it happens.
*
@@ -180,7 +137,7 @@ finish_sync_worker(void)
* CATCHUP state to SYNCDONE.
*/
static bool
-wait_for_relation_state_change(Oid relid, char expected_state)
+wait_for_table_state_change(Oid relid, char expected_state)
{
char state;
@@ -274,15 +231,6 @@ wait_for_worker_state_change(char expected_state)
}
/*
- * Callback from syscache invalidation.
- */
-void
-invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
-{
- table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
-}
-
-/*
* Handle table synchronization cooperation from the synchronization
* worker.
*
@@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
* predetermined synchronization point in the WAL stream, mark the table as
* SYNCDONE and finish.
*/
-static void
-process_syncing_tables_for_sync(XLogRecPtr current_lsn)
+void
+ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
{
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/*
* Start a new transaction to clean up the tablesync origin tracking.
- * This transaction will be ended within the finish_sync_worker().
- * Now, even, if we fail to remove this here, the apply worker will
- * ensure to clean it up afterward.
+ * This transaction will be ended within the FinishSyncWorker(). Now,
+ * even, if we fail to remove this here, the apply worker will ensure
+ * to clean it up afterward.
*
* We need to do this after the table state is set to SYNCDONE.
* Otherwise, if an error occurs while performing the database
@@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*/
replorigin_drop_by_name(originname, true, false);
- finish_sync_worker();
+ FinishSyncWorker();
}
else
SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* If the synchronization position is reached (SYNCDONE), then the table can
* be marked as READY and is no longer tracked.
*/
-static void
-process_syncing_tables_for_apply(XLogRecPtr current_lsn)
+void
+ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
{
struct tablesync_start_time_mapping
{
@@ -431,7 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
- FetchTableStates(&started_tx);
+ FetchRelationStates(&started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
@@ -586,8 +534,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand();
started_tx = true;
- wait_for_relation_state_change(rstate->relid,
- SUBREL_STATE_SYNCDONE);
+ wait_for_table_state_change(rstate->relid,
+ SUBREL_STATE_SYNCDONE);
}
else
LWLockRelease(LogicalRepWorkerLock);
@@ -690,37 +638,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
/*
- * Process possible state change(s) of tables that are being synchronized.
- */
-void
-process_syncing_tables(XLogRecPtr current_lsn)
-{
- switch (MyLogicalRepWorker->type)
- {
- case WORKERTYPE_PARALLEL_APPLY:
-
- /*
- * Skip for parallel apply workers because they only operate on
- * tables that are in a READY state. See pa_can_start() and
- * should_apply_changes_for_rel().
- */
- break;
-
- case WORKERTYPE_TABLESYNC:
- process_syncing_tables_for_sync(current_lsn);
- break;
-
- case WORKERTYPE_APPLY:
- process_syncing_tables_for_apply(current_lsn);
- break;
-
- case WORKERTYPE_UNKNOWN:
- /* Should never happen. */
- elog(ERROR, "Unknown worker type");
- }
-}
-
-/*
* Create list of columns for COPY based on logical relation mapping.
*/
static List *
@@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
case SUBREL_STATE_SYNCDONE:
case SUBREL_STATE_READY:
case SUBREL_STATE_UNKNOWN:
- finish_sync_worker(); /* doesn't return */
+ FinishSyncWorker(); /* doesn't return */
}
/* Calculate the name of the tablesync slot. */
@@ -1600,77 +1517,6 @@ copy_table_done:
}
/*
- * Common code to fetch the up-to-date sync state info into the static lists.
- *
- * Returns true if subscription has 1 or more tables, else false.
- *
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
- */
-static bool
-FetchTableStates(bool *started_tx)
-{
- static bool has_subrels = false;
-
- *started_tx = false;
-
- if (table_states_validity != SYNC_TABLE_STATE_VALID)
- {
- MemoryContext oldctx;
- List *rstates;
- ListCell *lc;
- SubscriptionRelState *rstate;
-
- table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
-
- /* Clean the old lists. */
- list_free_deep(table_states_not_ready);
- table_states_not_ready = NIL;
-
- if (!IsTransactionState())
- {
- StartTransactionCommand();
- *started_tx = true;
- }
-
- /* Fetch all non-ready tables. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true);
-
- /* Allocate the tracking info in a permanent memory context. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
- {
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states_not_ready = lappend(table_states_not_ready, rstate);
- }
- MemoryContextSwitchTo(oldctx);
-
- /*
- * Does the subscription have tables?
- *
- * If there were not-READY relations found then we know it does. But
- * if table_states_not_ready was empty we still need to check again to
- * see if there are 0 tables.
- */
- has_subrels = (table_states_not_ready != NIL) ||
- HasSubscriptionRelations(MySubscription->oid);
-
- /*
- * If the subscription relation cache has been invalidated since we
- * entered this routine, we still use and return the relations we just
- * finished constructing, to avoid infinite loops, but we leave the
- * table states marked as stale so that we'll rebuild it again on next
- * access. Otherwise, we mark the table states as valid.
- */
- if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
- table_states_validity = SYNC_TABLE_STATE_VALID;
- }
-
- return has_subrels;
-}
-
-/*
* Execute the initial sync with error handling. Disable the subscription,
* if it's required.
*
@@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg)
run_tablesync_worker();
- finish_sync_worker();
+ FinishSyncWorker();
}
/*
@@ -1773,7 +1619,7 @@ AllTablesyncsReady(void)
bool has_subrels = false;
/* We need up-to-date sync state info for subscription tables here. */
- has_subrels = FetchTableStates(&started_tx);
+ has_subrels = FetchRelationStates(&started_tx);
if (started_tx)
{
@@ -1789,21 +1635,21 @@ AllTablesyncsReady(void)
}
/*
- * Return whether the subscription currently has any relations.
+ * Return whether the subscription currently has any tables.
*
- * Note: Unlike HasSubscriptionRelations(), this function relies on cached
- * information for subscription relations. Additionally, it should not be
+ * Note: Unlike HasSubscriptionTables(), this function relies on cached
+ * information for subscription tables. Additionally, it should not be
* invoked outside of apply or tablesync workers, as MySubscription must be
* initialized first.
*/
bool
-HasSubscriptionRelationsCached(void)
+HasSubscriptionTablesCached(void)
{
bool started_tx;
bool has_subrels;
/* We need up-to-date subscription tables info here */
- has_subrels = FetchTableStates(&started_tx);
+ has_subrels = FetchRelationStates(&started_tx);
if (started_tx)
{
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 419e478b4c6..3c58ad88476 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -91,7 +91,7 @@
* behave as if two_phase = off. When the apply worker detects that all
* tablesyncs have become READY (while the tri-state was PENDING) it will
* restart the apply worker process. This happens in
- * process_syncing_tables_for_apply.
+ * ProcessSyncingTablesForApply.
*
* When the (re-started) apply worker finds that all tablesyncs are READY for a
* two_phase tri-state of PENDING it start streaming messages with the
@@ -1243,7 +1243,7 @@ apply_handle_commit(StringInfo s)
apply_handle_commit_internal(&commit_data);
/* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(commit_data.end_lsn);
+ ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
@@ -1365,7 +1365,7 @@ apply_handle_prepare(StringInfo s)
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(prepare_data.end_lsn);
+ ProcessSyncingRelations(prepare_data.end_lsn);
/*
* Since we have already prepared the transaction, in a case where the
@@ -1421,7 +1421,7 @@ apply_handle_commit_prepared(StringInfo s)
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(prepare_data.end_lsn);
+ ProcessSyncingRelations(prepare_data.end_lsn);
clear_subscription_skip_lsn(prepare_data.end_lsn);
@@ -1487,7 +1487,7 @@ apply_handle_rollback_prepared(StringInfo s)
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(rollback_data.rollback_end_lsn);
+ ProcessSyncingRelations(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
@@ -1622,7 +1622,7 @@ apply_handle_stream_prepare(StringInfo s)
pgstat_report_stat(false);
/* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(prepare_data.end_lsn);
+ ProcessSyncingRelations(prepare_data.end_lsn);
/*
* Similar to prepare case, the subskiplsn could be left in a case of
@@ -2464,7 +2464,7 @@ apply_handle_stream_commit(StringInfo s)
}
/* Process any tables that are being synchronized in parallel. */
- process_syncing_tables(commit_data.end_lsn);
+ ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -4133,7 +4133,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_reread_subscription();
/* Process any table synchronization changes. */
- process_syncing_tables(last_received);
+ ProcessSyncingRelations(last_received);
}
/* Cleanup the memory. */
@@ -4623,7 +4623,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
* concurrently add tables to the subscription, the apply worker may not
* process invalidations in time. Consequently,
- * HasSubscriptionRelationsCached() might miss the new tables, leading to
+ * HasSubscriptionTablesCached() might miss the new tables, leading to
* premature advancement of oldest_nonremovable_xid.
*
* Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
@@ -4637,7 +4637,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* subscription tables at this stage to prevent unnecessary tuple
* retention.
*/
- if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
+ if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
{
TimestampTz now;
@@ -5876,7 +5876,7 @@ SetupApplyOrSyncWorker(int worker_slot)
* the subscription relation state.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
- invalidate_syncing_table_states,
+ InvalidateSyncingRelStates,
(Datum) 0);
}
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index a1976fae607..4e7303ea631 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -244,8 +244,8 @@ getSchemaData(Archive *fout, int *numTablesPtr)
pg_log_info("reading subscriptions");
getSubscriptions(fout);
- pg_log_info("reading subscription membership of tables");
- getSubscriptionTables(fout);
+ pg_log_info("reading subscription membership of relations");
+ getSubscriptionRelations(fout);
free(inhinfo); /* not needed any longer */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 641bece12c7..890db7b08c2 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5305,12 +5305,12 @@ getSubscriptions(Archive *fout)
}
/*
- * getSubscriptionTables
- * Get information about subscription membership for dumpable tables. This
+ * getSubscriptionRelations
+ * Get information about subscription membership for dumpable relations. This
* will be used only in binary-upgrade mode for PG17 or later versions.
*/
void
-getSubscriptionTables(Archive *fout)
+getSubscriptionRelations(Archive *fout)
{
DumpOptions *dopt = fout->dopt;
SubscriptionInfo *subinfo = NULL;
@@ -5364,7 +5364,7 @@ getSubscriptionTables(Archive *fout)
tblinfo = findTableByOid(relid);
if (tblinfo == NULL)
- pg_fatal("failed sanity check, table with OID %u not found",
+ pg_fatal("failed sanity check, relation with OID %u not found",
relid);
/* OK, make a DumpableObject for this relationship */
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index fa6d1a510f7..72a00e1bc20 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -829,6 +829,6 @@ extern void getPublicationNamespaces(Archive *fout);
extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
int numTables);
extern void getSubscriptions(Archive *fout);
-extern void getSubscriptionTables(Archive *fout);
+extern void getSubscriptionRelations(Archive *fout);
#endif /* PG_DUMP_H */
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 02f97a547dd..61b63c6bb7a 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
-extern bool HasSubscriptionRelations(Oid subid);
+extern bool HasSubscriptionTables(Oid subid);
extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index de003802612..ae352f6e691 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -251,6 +251,8 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+extern PGDLLIMPORT List *table_states_not_ready;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -272,12 +274,16 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
char *originname, Size szoriginname);
extern bool AllTablesyncsReady(void);
-extern bool HasSubscriptionRelationsCached(void);
+extern bool HasSubscriptionTablesCached(void);
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
-extern void process_syncing_tables(XLogRecPtr current_lsn);
-extern void invalidate_syncing_table_states(Datum arg, int cacheid,
- uint32 hashvalue);
+extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
+extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+
+pg_noreturn extern void FinishSyncWorker(void);
+extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
+extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
+extern bool FetchRelationStates(bool *started_tx);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5290b91e83e..ee1cab6190f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2922,7 +2922,7 @@ SyncRepStandbyData
SyncRequestHandler
SyncRequestType
SyncStandbySlotsConfigData
-SyncingTablesState
+SyncingRelationsState
SysFKRelationship
SysScanDesc
SyscacheCallbackFunction