summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c76
1 files changed, 57 insertions, 19 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7edd1c9cf06..e1c757c911e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
+ case WORKERTYPE_SEQUENCESYNC:
+ /* Should never happen. */
+ elog(ERROR, "sequence synchronization worker is not expected to apply changes");
+ break;
+
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
@@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s)
apply_handle_commit_internal(&commit_data);
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s)
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
/*
@@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s)
store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
clear_subscription_skip_lsn(prepare_data.end_lsn);
@@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s)
store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s)
pgstat_report_stat(false);
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
/*
@@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s)
break;
}
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
AcceptInvalidationMessages();
maybe_reread_subscription();
- /* Process any table synchronization changes. */
+ /*
+ * Process any relations that are being synchronized in parallel
+ * and any newly added tables or sequences.
+ */
ProcessSyncingRelations(last_received);
}
@@ -5700,8 +5726,8 @@ run_apply_worker()
}
/*
- * Common initialization for leader apply worker, parallel apply worker and
- * tablesync worker.
+ * Common initialization for leader apply worker, parallel apply worker,
+ * tablesync worker and sequencesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
@@ -5809,13 +5835,17 @@ InitializeLogRepWorker(void)
if (am_tablesync_worker())
ereport(LOG,
- (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
+ errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid)));
+ else if (am_sequencesync_worker())
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
+ MySubscription->name));
else
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" has started",
- MySubscription->name)));
+ errmsg("logical replication apply worker for subscription \"%s\" has started",
+ MySubscription->name));
CommitTransactionCommand();
}
@@ -5831,14 +5861,16 @@ replorigin_reset(int code, Datum arg)
replorigin_session_origin_timestamp = 0;
}
-/* Common function to setup the leader apply or tablesync worker. */
+/*
+ * Common function to setup the leader apply, tablesync and sequencesync worker.
+ */
void
SetupApplyOrSyncWorker(int worker_slot)
{
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
- Assert(am_tablesync_worker() || am_leader_apply_worker());
+ Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -5921,9 +5953,15 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
- /* Report the worker failed during either table synchronization or apply */
- pgstat_report_subscription_error(MyLogicalRepWorker->subid,
- !am_tablesync_worker());
+ if (am_leader_apply_worker() || am_tablesync_worker())
+ {
+ /*
+ * Report the worker failed during either table synchronization or
+ * apply.
+ */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ !am_tablesync_worker());
+ }
/* Disable the subscription */
StartTransactionCommand();