diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e6159acba00..dc0526e2da2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -379,6 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -470,7 +471,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * refresh for the subscription where we remove the table * state and its origin and by this time the origin might be * already removed. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -480,9 +490,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* * Update the state to READY only after the origin cleanup. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn); + UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn, true); } } else @@ -533,7 +543,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -593,6 +610,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + if (started_tx) { CommitTransactionCommand(); |