summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c27
1 files changed, 24 insertions, 3 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e6159acba00..dc0526e2da2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -379,6 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
+ Relation rel = NULL;
Assert(!IsTransactionState());
@@ -470,7 +471,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* refresh for the subscription where we remove the table
* state and its origin and by this time the origin might be
* already removed. So passing missing_ok = true.
+ *
+ * Lock the subscription and origin in the same order as we
+ * are doing during DDL commands to avoid deadlocks. See
+ * AlterSubscription_refresh.
*/
+ LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+ 0, AccessShareLock);
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
rstate->relid,
originname,
@@ -480,9 +490,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/*
* Update the state to READY only after the origin cleanup.
*/
- UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn);
+ UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn, true);
}
}
else
@@ -533,7 +543,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* This is required to avoid any undetected deadlocks
* due to any existing lock as deadlock detector won't
* be able to detect the waits on the latch.
+ *
+ * Also close any tables prior to the commit.
*/
+ if (rel)
+ {
+ table_close(rel, NoLock);
+ rel = NULL;
+ }
CommitTransactionCommand();
pgstat_report_stat(false);
}
@@ -593,6 +610,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
}
+ /* Close table if opened */
+ if (rel)
+ table_close(rel, NoLock);
+
if (started_tx)
{
CommitTransactionCommand();