summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/tablesync.c26
-rw-r--r--src/backend/replication/logical/worker.c25
-rw-r--r--src/backend/replication/walsender.c12
3 files changed, 59 insertions, 4 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d3356bc84ee..e6da4028d39 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1789,6 +1789,32 @@ AllTablesyncsReady(void)
}
/*
+ * Return whether the subscription currently has any relations.
+ *
+ * Note: Unlike HasSubscriptionRelations(), this function relies on cached
+ * information for subscription relations. Additionally, it should not be
+ * invoked outside of apply or tablesync workers, as MySubscription must be
+ * initialized first.
+ */
+bool
+HasSubscriptionRelationsCached(void)
+{
+ bool started_tx;
+ bool has_subrels;
+
+ /* We need up-to-date subscription tables info here */
+ has_subrels = FetchTableStates(&started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(true);
+ }
+
+ return has_subrels;
+}
+
+/*
* Update the two_phase state of the specified subscription in pg_subscription.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f1ebd63e792..c0f6bef5c28 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4595,11 +4595,28 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* workers is complex and not worth the effort, so we simply return if not
* all tables are in the READY state.
*
- * It is safe to add new tables with initial states to the subscription
- * after this check because any changes applied to these tables should
- * have a WAL position greater than the rdt_data->remote_lsn.
+ * Advancing the transaction ID is necessary even when no tables are
+ * currently subscribed, to avoid retaining dead tuples unnecessarily.
+ * While it might seem safe to skip all phases and directly assign
+ * candidate_xid to oldest_nonremovable_xid during the
+ * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
+ * concurrently add tables to the subscription, the apply worker may not
+ * process invalidations in time. Consequently,
+ * HasSubscriptionRelationsCached() might miss the new tables, leading to
+ * premature advancement of oldest_nonremovable_xid.
+ *
+ * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
+ * invalidations are guaranteed to be processed before applying changes
+ * from newly added tables while waiting for the local flush to reach
+ * remote_lsn.
+ *
+ * Additionally, even if we check for subscription tables during
+ * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
+ * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
+ * subscription tables at this stage to prevent unnecessary tuple
+ * retention.
*/
- if (!AllTablesyncsReady())
+ if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
{
TimestampTz now;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e3dce9dc68d..59822f22b8d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -51,6 +51,7 @@
#include "access/timeline.h"
#include "access/transam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
@@ -2719,6 +2720,7 @@ ProcessStandbyPSRequestMessage(void)
{
XLogRecPtr lsn = InvalidXLogRecPtr;
TransactionId oldestXidInCommit;
+ TransactionId oldestGXidInCommit;
FullTransactionId nextFullXid;
FullTransactionId fullOldestXidInCommit;
WalSnd *walsnd = MyWalSnd;
@@ -2746,6 +2748,16 @@ ProcessStandbyPSRequestMessage(void)
* ones replicated.
*/
oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
+
+ /*
+ * Update the oldest xid for standby transmission if an older prepared
+ * transaction exists and is currently in commit phase.
+ */
+ if (TransactionIdIsValid(oldestGXidInCommit) &&
+ TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
+ oldestXidInCommit = oldestGXidInCommit;
+
nextFullXid = ReadNextFullTransactionId();
fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
oldestXidInCommit);