diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f8e8cf71eb8..f8649e142c3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +static List *on_commit_wakeup_workers_subids = NIL; + bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; @@ -4092,3 +4094,53 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Request wakeup of the workers for the given subscription OID + * at commit of the current transaction. + * + * This is used to ensure that the workers process assorted changes + * as soon as possible. + */ +void +LogicalRepWorkersWakeupAtCommit(Oid subid) +{ + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + on_commit_wakeup_workers_subids = + list_append_unique_oid(on_commit_wakeup_workers_subids, subid); + MemoryContextSwitchTo(oldcxt); +} + +/* + * Wake up the workers of any subscriptions that were changed in this xact. + */ +void +AtEOXact_LogicalRepWorkers(bool isCommit) +{ + if (isCommit && on_commit_wakeup_workers_subids != NIL) + { + ListCell *lc; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + foreach(lc, on_commit_wakeup_workers_subids) + { + Oid subid = lfirst_oid(lc); + List *workers; + ListCell *lc2; + + workers = logicalrep_workers_find(subid, true); + foreach(lc2, workers) + { + LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2); + + logicalrep_worker_wakeup_ptr(worker); + } + } + LWLockRelease(LogicalRepWorkerLock); + } + + /* The List storage will be reclaimed automatically in xact cleanup. */ + on_commit_wakeup_workers_subids = NIL; +} |