summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/catalog/pg_subscription.c33
-rw-r--r--src/backend/replication/logical/tablesync.c27
-rw-r--r--src/include/catalog/pg_subscription_rel.h2
3 files changed, 55 insertions, 7 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index add51caadf2..a520109ce5a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -323,8 +323,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
* Update the state of a subscription table.
*/
void
-UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn)
+UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn, bool already_locked)
{
Relation rel;
HeapTuple tup;
@@ -332,9 +332,24 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
Datum values[Natts_pg_subscription_rel];
bool replaces[Natts_pg_subscription_rel];
- LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+ if (already_locked)
+ {
+#ifdef USE_ASSERT_CHECKING
+ LOCKTAG tag;
- rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+ Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
+ RowExclusiveLock, true));
+ SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
+ Assert(LockHeldByMe(&tag, AccessShareLock));
+#endif
+
+ rel = table_open(SubscriptionRelRelationId, NoLock);
+ }
+ else
+ {
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+ }
/* Try finding existing mapping. */
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
@@ -369,6 +384,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
}
/*
+ * Update the state of a subscription table.
+ */
+void
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ UpdateSubscriptionRelStateEx(subid, relid, state, sublsn, false);
+}
+
+/*
* Get state of subscription table.
*
* Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e6159acba00..dc0526e2da2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -379,6 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
+ Relation rel = NULL;
Assert(!IsTransactionState());
@@ -470,7 +471,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* refresh for the subscription where we remove the table
* state and its origin and by this time the origin might be
* already removed. So passing missing_ok = true.
+ *
+ * Lock the subscription and origin in the same order as we
+ * are doing during DDL commands to avoid deadlocks. See
+ * AlterSubscription_refresh.
*/
+ LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+ 0, AccessShareLock);
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
rstate->relid,
originname,
@@ -480,9 +490,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/*
* Update the state to READY only after the origin cleanup.
*/
- UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn);
+ UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn, true);
}
}
else
@@ -533,7 +543,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* This is required to avoid any undetected deadlocks
* due to any existing lock as deadlock detector won't
* be able to detect the waits on the latch.
+ *
+ * Also close any tables prior to the commit.
*/
+ if (rel)
+ {
+ table_close(rel, NoLock);
+ rel = NULL;
+ }
CommitTransactionCommand();
pgstat_report_stat(false);
}
@@ -593,6 +610,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
}
+ /* Close table if opened */
+ if (rel)
+ table_close(rel, NoLock);
+
if (started_tx)
{
CommitTransactionCommand();
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9df99c34181..a2e510511e8 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -84,6 +84,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn);
+extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn, bool already_locked);
extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);