diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 26 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 25 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 12 |
3 files changed, 59 insertions, 4 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3356bc84ee..e6da4028d39 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1789,6 +1789,32 @@ AllTablesyncsReady(void) } /* + * Return whether the subscription currently has any relations. + * + * Note: Unlike HasSubscriptionRelations(), this function relies on cached + * information for subscription relations. Additionally, it should not be + * invoked outside of apply or tablesync workers, as MySubscription must be + * initialized first. + */ +bool +HasSubscriptionRelationsCached(void) +{ + bool started_tx; + bool has_subrels; + + /* We need up-to-date subscription tables info here */ + has_subrels = FetchTableStates(&started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + return has_subrels; +} + +/* * Update the two_phase state of the specified subscription in pg_subscription. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f1ebd63e792..c0f6bef5c28 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4595,11 +4595,28 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * workers is complex and not worth the effort, so we simply return if not * all tables are in the READY state. * - * It is safe to add new tables with initial states to the subscription - * after this check because any changes applied to these tables should - * have a WAL position greater than the rdt_data->remote_lsn. + * Advancing the transaction ID is necessary even when no tables are + * currently subscribed, to avoid retaining dead tuples unnecessarily. + * While it might seem safe to skip all phases and directly assign + * candidate_xid to oldest_nonremovable_xid during the + * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users + * concurrently add tables to the subscription, the apply worker may not + * process invalidations in time. Consequently, + * HasSubscriptionRelationsCached() might miss the new tables, leading to + * premature advancement of oldest_nonremovable_xid. + * + * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as + * invalidations are guaranteed to be processed before applying changes + * from newly added tables while waiting for the local flush to reach + * remote_lsn. + * + * Additionally, even if we check for subscription tables during + * RDT_GET_CANDIDATE_XID, they might be dropped before reaching + * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify + * subscription tables at this stage to prevent unnecessary tuple + * retention. */ - if (!AllTablesyncsReady()) + if (HasSubscriptionRelationsCached() && !AllTablesyncsReady()) { TimestampTz now; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e3dce9dc68d..59822f22b8d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -51,6 +51,7 @@ #include "access/timeline.h" #include "access/transam.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogreader.h" @@ -2719,6 +2720,7 @@ ProcessStandbyPSRequestMessage(void) { XLogRecPtr lsn = InvalidXLogRecPtr; TransactionId oldestXidInCommit; + TransactionId oldestGXidInCommit; FullTransactionId nextFullXid; FullTransactionId fullOldestXidInCommit; WalSnd *walsnd = MyWalSnd; @@ -2746,6 +2748,16 @@ ProcessStandbyPSRequestMessage(void) * ones replicated. */ oldestXidInCommit = GetOldestActiveTransactionId(true, false); + oldestGXidInCommit = TwoPhaseGetOldestXidInCommit(); + + /* + * Update the oldest xid for standby transmission if an older prepared + * transaction exists and is currently in commit phase. + */ + if (TransactionIdIsValid(oldestGXidInCommit) && + TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit)) + oldestXidInCommit = oldestGXidInCommit; + nextFullXid = ReadNextFullTransactionId(); fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid, oldestXidInCommit); |