summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/syncutils.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/syncutils.c')
-rw-r--r--src/backend/replication/logical/syncutils.c139
1 files changed, 115 insertions, 24 deletions
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index ae8c9385916..a696b1f2dc0 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -16,6 +16,7 @@
#include "catalog/pg_subscription_rel.h"
#include "pgstat.h"
+#include "replication/logicallauncher.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "utils/lsyscache.h"
@@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE
pg_noreturn void
FinishSyncWorker(void)
{
+ Assert(am_sequencesync_worker() || am_tablesync_worker());
+
/*
* Commit any outstanding transaction. This is the usual case, unless
* there was nothing to do for the table.
@@ -61,16 +64,31 @@ FinishSyncWorker(void)
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
- StartTransactionCommand();
- ereport(LOG,
- (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
- CommitTransactionCommand();
+ if (am_sequencesync_worker())
+ {
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
+ MySubscription->name));
+
+ /*
+ * Reset last_seqsync_start_time, so that next time a sequencesync
+ * worker is needed it can be started promptly.
+ */
+ logicalrep_reset_seqsync_start_time();
+ }
+ else
+ {
+ StartTransactionCommand();
+ ereport(LOG,
+ errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
- InvalidOid);
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
+ InvalidOid);
+ }
/* Stop gracefully */
proc_exit(0);
@@ -86,7 +104,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
}
/*
- * Process possible state change(s) of relations that are being synchronized.
+ * Attempt to launch a sync worker for one or more sequences or a table, if
+ * a worker slot is available and the retry interval has elapsed.
+ *
+ * wtype: sync worker type.
+ * nsyncworkers: Number of currently running sync workers for the subscription.
+ * relid: InvalidOid for sequencesync worker, actual relid for tablesync
+ * worker.
+ * last_start_time: Pointer to the last start time of the worker.
+ */
+void
+launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
+ TimestampTz *last_start_time)
+{
+ TimestampTz now;
+
+ Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
+ (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
+
+ /* If there is a free sync worker slot, start a new sync worker */
+ if (nsyncworkers >= max_sync_workers_per_subscription)
+ return;
+
+ now = GetCurrentTimestamp();
+
+ if (!(*last_start_time) ||
+ TimestampDifferenceExceeds(*last_start_time, now,
+ wal_retrieve_retry_interval))
+ {
+ /*
+ * Set the last_start_time even if we fail to start the worker, so
+ * that we won't retry until wal_retrieve_retry_interval has elapsed.
+ */
+ *last_start_time = now;
+ (void) logicalrep_worker_launch(wtype,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ relid, DSM_HANDLE_INVALID, false);
+ }
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized
+ * and start new tablesync workers for the newly added tables. Also, start a
+ * new sequencesync worker for the newly added sequences.
*/
void
ProcessSyncingRelations(XLogRecPtr current_lsn)
@@ -108,6 +171,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
+ ProcessSequencesForSync();
+ break;
+
+ case WORKERTYPE_SEQUENCESYNC:
+ /* Should never happen. */
+ elog(ERROR, "sequence synchronization worker is not expected to process relations");
break;
case WORKERTYPE_UNKNOWN:
@@ -117,17 +186,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
}
/*
- * Common code to fetch the up-to-date sync state info into the static lists.
+ * Common code to fetch the up-to-date sync state info for tables and sequences.
*
- * Returns true if subscription has 1 or more tables, else false.
+ * The pg_subscription_rel catalog is shared by tables and sequences. Changes
+ * to either sequences or tables can affect the validity of relation states, so
+ * we identify non-READY tables and non-READY sequences together to ensure
+ * consistency.
*
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * has_pending_subtables: true if the subscription has one or more tables that
+ * are not in READY state, otherwise false.
+ * has_pending_subsequences: true if the subscription has one or more sequences
+ * that are not in READY state, otherwise false.
*/
-bool
-FetchRelationStates(bool *started_tx)
+void
+FetchRelationStates(bool *has_pending_subtables,
+ bool *has_pending_subsequences,
+ bool *started_tx)
{
+ /*
+ * has_subtables and has_subsequences_non_ready are declared as static,
+ * since the same value can be used until the system table is invalidated.
+ */
static bool has_subtables = false;
+ static bool has_subsequences_non_ready = false;
*started_tx = false;
@@ -135,10 +216,10 @@ FetchRelationStates(bool *started_tx)
{
MemoryContext oldctx;
List *rstates;
- ListCell *lc;
SubscriptionRelState *rstate;
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+ has_subsequences_non_ready = false;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
@@ -150,17 +231,23 @@ FetchRelationStates(bool *started_tx)
*started_tx = true;
}
- /* Fetch tables that are in non-ready state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
+ /* Fetch tables and sequences that are in non-READY state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
+ foreach_ptr(SubscriptionRelState, subrel, rstates)
{
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states_not_ready = lappend(table_states_not_ready, rstate);
+ if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
+ has_subsequences_non_ready = true;
+ else
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, subrel, sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready,
+ rstate);
+ }
}
MemoryContextSwitchTo(oldctx);
@@ -185,5 +272,9 @@ FetchRelationStates(bool *started_tx)
relation_states_validity = SYNC_RELATIONS_STATE_VALID;
}
- return has_subtables;
+ if (has_pending_subtables)
+ *has_pending_subtables = has_subtables;
+
+ if (has_pending_subsequences)
+ *has_pending_subsequences = has_subsequences_non_ready;
}