summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/launcher.c125
-rw-r--r--src/backend/replication/logical/slotsync.c3
-rw-r--r--src/backend/replication/logical/worker.c271
3 files changed, 313 insertions, 86 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 37377f7eb63..add2e2e066c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -43,6 +43,7 @@
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
@@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
static bool acquire_conflict_slot_if_exists(void);
-static void advance_conflict_slot_xmin(TransactionId new_xmin);
+static void update_conflict_slot_xmin(TransactionId new_xmin);
+static void init_conflict_slot_xmin(void);
/*
@@ -152,6 +154,7 @@ get_subscription_list(void)
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
sub->retaindeadtuples = subform->subretaindeadtuples;
+ sub->retentionactive = subform->subretentionactive;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
- bool can_advance_xmin = true;
+ bool can_update_xmin = true;
bool retain_dead_tuples = false;
TransactionId xmin = InvalidTransactionId;
@@ -1215,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg)
retain_dead_tuples = true;
/*
- * Can't advance xmin of the slot unless all the subscriptions
- * with retain_dead_tuples are enabled. This is required to
- * ensure that we don't advance the xmin of
- * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
- * enabled. Otherwise, we won't be able to detect conflicts
- * reliably for such a subscription even though it has set the
- * retain_dead_tuples option.
- */
- can_advance_xmin &= sub->enabled;
-
- /*
* Create a replication slot to retain information necessary
* for conflict detection such as dead tuples, commit
* timestamps, and origins.
@@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg)
* subscription was enabled.
*/
CreateConflictDetectionSlot();
+
+ if (sub->retentionactive)
+ {
+ /*
+ * Can't advance xmin of the slot unless all the
+ * subscriptions actively retaining dead tuples are
+ * enabled. This is required to ensure that we don't
+ * advance the xmin of CONFLICT_DETECTION_SLOT if one of
+ * the subscriptions is not enabled. Otherwise, we won't
+ * be able to detect conflicts reliably for such a
+ * subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_update_xmin &= sub->enabled;
+
+ /*
+ * Initialize the slot once the subscription activiates
+ * retention.
+ */
+ if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
+ init_conflict_slot_xmin();
+ }
}
if (!sub->enabled)
@@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg)
/*
* Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply
- * workers that enables retain_dead_tuples.
+ * workers.
*/
- if (sub->retaindeadtuples && can_advance_xmin)
+ if (sub->retaindeadtuples &&
+ sub->retentionactive &&
+ can_update_xmin)
compute_min_nonremovable_xid(w, &xmin);
/* worker is running already */
@@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg)
/*
* Can't advance xmin of the slot unless all the workers
- * corresponding to subscriptions with retain_dead_tuples are
- * running, disabling the further computation of the minimum
+ * corresponding to subscriptions actively retaining dead tuples
+ * are running, disabling the further computation of the minimum
* nonremovable xid.
*/
- if (sub->retaindeadtuples)
- can_advance_xmin = false;
+ if (sub->retaindeadtuples && sub->retentionactive)
+ can_update_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg)
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID,
- sub->retaindeadtuples))
+ sub->retaindeadtuples &&
+ sub->retentionactive))
{
/*
* We get here either if we failed to launch a worker
@@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg)
* that requires us to retain dead tuples. Otherwise, if required,
* advance the slot's xmin to protect dead tuples required for the
* conflict detection.
+ *
+ * Additionally, if all apply workers for subscriptions with
+ * retain_dead_tuples enabled have requested to stop retention, the
+ * slot's xmin will be set to InvalidTransactionId allowing the
+ * removal of dead tuples.
*/
if (MyReplicationSlot)
{
if (!retain_dead_tuples)
ReplicationSlotDropAcquired();
- else if (can_advance_xmin)
- advance_conflict_slot_xmin(xmin);
+ else if (can_update_xmin)
+ update_conflict_slot_xmin(xmin);
}
/* Switch back to original memory context. */
@@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
nonremovable_xid = worker->oldest_nonremovable_xid;
SpinLockRelease(&worker->relmutex);
- Assert(TransactionIdIsValid(nonremovable_xid));
+ /*
+ * Return if the apply worker has stopped retention concurrently.
+ *
+ * Although this function is invoked only when retentionactive is true,
+ * the apply worker might stop retention after the launcher fetches the
+ * retentionactive flag.
+ */
+ if (!TransactionIdIsValid(nonremovable_xid))
+ return;
if (!TransactionIdIsValid(*xmin) ||
TransactionIdPrecedes(nonremovable_xid, *xmin))
@@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void)
}
/*
- * Advance the xmin the replication slot used to retain information required
+ * Update the xmin the replication slot used to retain information required
* for conflict detection.
*/
static void
-advance_conflict_slot_xmin(TransactionId new_xmin)
+update_conflict_slot_xmin(TransactionId new_xmin)
{
Assert(MyReplicationSlot);
- Assert(TransactionIdIsValid(new_xmin));
- Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+ Assert(!TransactionIdIsValid(new_xmin) ||
+ TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
- /* Return if the xmin value of the slot cannot be advanced */
+ /* Return if the xmin value of the slot cannot be updated */
if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
return;
@@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin)
}
/*
- * Create and acquire the replication slot used to retain information for
- * conflict detection, if not yet.
+ * Initialize the xmin for the conflict detection slot.
*/
-void
-CreateConflictDetectionSlot(void)
+static void
+init_conflict_slot_xmin(void)
{
TransactionId xmin_horizon;
- /* Exit early, if the replication slot is already created and acquired */
- if (MyReplicationSlot)
- return;
-
- ereport(LOG,
- errmsg("creating replication conflict detection slot"));
-
- ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
- false, false);
+ /* Replication slot must exist but shouldn't be initialized. */
+ Assert(MyReplicationSlot &&
+ !TransactionIdIsValid(MyReplicationSlot->data.xmin));
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -1476,6 +1499,26 @@ CreateConflictDetectionSlot(void)
}
/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ init_conflict_slot_xmin();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f5501c106dc..9d0072a49ed 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1476,7 +1476,6 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
*/
wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
app_name.data, &err);
- pfree(app_name.data);
if (!wrconn)
ereport(ERROR,
@@ -1484,6 +1483,8 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
app_name.data, err));
+ pfree(app_name.data);
+
/*
* Register the disconnection callback.
*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 22ad9051db3..f1ebd63e792 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,6 +173,14 @@
* Advance the non-removable transaction ID if the current flush location has
* reached or surpassed the last received WAL position.
*
+ * - RDT_STOP_CONFLICT_INFO_RETENTION:
+ * This phase is required only when max_retention_duration is defined. We
+ * enter this phase if the wait time in either the
+ * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
+ * configured max_retention_duration. In this phase,
+ * pg_subscription.subretentionactive is updated to false within a new
+ * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
+ *
* The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -373,7 +381,8 @@ typedef enum
RDT_GET_CANDIDATE_XID,
RDT_REQUEST_PUBLISHER_STATUS,
RDT_WAIT_FOR_PUBLISHER_STATUS,
- RDT_WAIT_FOR_LOCAL_FLUSH
+ RDT_WAIT_FOR_LOCAL_FLUSH,
+ RDT_STOP_CONFLICT_INFO_RETENTION
} RetainDeadTuplesPhase;
/*
@@ -415,6 +424,9 @@ typedef struct RetainDeadTuplesData
* updated in final phase
* (RDT_WAIT_FOR_LOCAL_FLUSH) */
+ long table_sync_wait_time; /* time spent waiting for table sync
+ * to finish */
+
/*
* The following fields are used to determine the timing for the next
* round of transaction ID advancement.
@@ -555,6 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data);
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
bool status_received);
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
bool new_xid_found);
@@ -3219,7 +3234,6 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
TimestampTz *delete_time)
{
TransactionId oldestxmin;
- ReplicationSlot *slot;
/*
* Return false if either dead tuples are not retained or commit timestamp
@@ -3229,32 +3243,49 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
return false;
/*
- * For conflict detection, we use the conflict slot's xmin value instead
- * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
- * a threshold to identify tuples that were recently deleted. These tuples
- * are not visible to concurrent transactions, but we log an
- * update_deleted conflict if such a tuple matches the remote update being
- * applied.
+ * For conflict detection, we use the leader worker's
+ * oldest_nonremovable_xid value instead of invoking
+ * GetOldestNonRemovableTransactionId() or using the conflict detection
+ * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
+ * identify tuples that were recently deleted. These deleted tuples are no
+ * longer visible to concurrent transactions. However, if a remote update
+ * matches such a tuple, we log an update_deleted conflict.
*
- * Although GetOldestNonRemovableTransactionId() can return a value older
- * than the slot's xmin, for our current purpose it is acceptable to treat
- * tuples deleted by transactions prior to slot.xmin as update_missing
- * conflicts.
- *
- * Ideally, we would use oldest_nonremovable_xid, which is directly
- * maintained by the leader apply worker. However, this value is not
- * available to table synchronization or parallel apply workers, making
- * slot.xmin a practical alternative in those contexts.
+ * While GetOldestNonRemovableTransactionId() and slot.xmin may return
+ * transaction IDs older than oldest_nonremovable_xid, for our current
+ * purpose, it is acceptable to treat tuples deleted by transactions prior
+ * to oldest_nonremovable_xid as update_missing conflicts.
*/
- slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+ if (am_leader_apply_worker())
+ {
+ oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
+ }
+ else
+ {
+ LogicalRepWorker *leader;
- Assert(slot);
+ /*
+ * Obtain the information from the leader apply worker as only the
+ * leader manages conflict retention (see
+ * maybe_advance_nonremovable_xid() for details).
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ leader = logicalrep_worker_find(MyLogicalRepWorker->subid,
+ InvalidOid, false);
- SpinLockAcquire(&slot->mutex);
- oldestxmin = slot->data.xmin;
- SpinLockRelease(&slot->mutex);
+ SpinLockAcquire(&leader->relmutex);
+ oldestxmin = leader->oldest_nonremovable_xid;
+ SpinLockRelease(&leader->relmutex);
+ LWLockRelease(LogicalRepWorkerLock);
+ }
- Assert(TransactionIdIsValid(oldestxmin));
+ /*
+ * Return false if the leader apply worker has stopped retaining
+ * information for detecting conflicts. This implies that update_deleted
+ * can no longer be reliably detected.
+ */
+ if (!TransactionIdIsValid(oldestxmin))
+ return false;
if (OidIsValid(localidxoid) &&
IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
@@ -4108,11 +4139,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/*
* Ensure to wake up when it's possible to advance the non-removable
- * transaction ID.
+ * transaction ID, or when the retention duration may have exceeded
+ * max_retention_duration.
*/
- if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
- rdt_data.xid_advance_interval)
- wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+ if (MySubscription->retentionactive)
+ {
+ if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+ rdt_data.xid_advance_interval)
+ wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+ else if (MySubscription->maxretention > 0)
+ wait_time = Min(wait_time, MySubscription->maxretention);
+ }
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
@@ -4325,6 +4362,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
if (!MySubscription->retaindeadtuples)
return false;
+ /* No need to advance if we have already stopped retaining */
+ if (!MySubscription->retentionactive)
+ return false;
+
return true;
}
@@ -4350,6 +4391,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
case RDT_WAIT_FOR_LOCAL_FLUSH:
wait_for_local_flush(rdt_data);
break;
+ case RDT_STOP_CONFLICT_INFO_RETENTION:
+ stop_conflict_info_retention(rdt_data);
+ break;
}
}
@@ -4468,6 +4512,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
if (!status_received)
return;
+ /*
+ * We don't need to maintain oldest_nonremovable_xid if we decide to stop
+ * retaining conflict information for this worker.
+ */
+ if (should_stop_conflict_info_retention(rdt_data))
+ return;
+
if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
rdt_data->remote_wait_for = rdt_data->remote_nextxid;
@@ -4549,6 +4600,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* have a WAL position greater than the rdt_data->remote_lsn.
*/
if (!AllTablesyncsReady())
+ {
+ TimestampTz now;
+
+ now = rdt_data->last_recv_time
+ ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Record the time spent waiting for table sync, it is needed for the
+ * timeout check in should_stop_conflict_info_retention().
+ */
+ rdt_data->table_sync_wait_time =
+ TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
+
+ return;
+ }
+
+ /*
+ * We don't need to maintain oldest_nonremovable_xid if we decide to stop
+ * retaining conflict information for this worker.
+ */
+ if (should_stop_conflict_info_retention(rdt_data))
return;
/*
@@ -4594,12 +4666,114 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
/* Notify launcher to update the xmin of the conflict slot */
ApplyLauncherWakeup();
+ reset_retention_data_fields(rdt_data);
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Check whether conflict information retention should be stopped due to
+ * exceeding the maximum wait time (max_retention_duration).
+ *
+ * If retention should be stopped, transition to the
+ * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
+ * false.
+ *
+ * Note: Retention won't be resumed automatically. The user must manually
+ * disable retain_dead_tuples and re-enable it after confirming that the
+ * replication slot maintained by the launcher has been dropped.
+ */
+static bool
+should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+ TimestampTz now;
+
+ Assert(TransactionIdIsValid(rdt_data->candidate_xid));
+ Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
+ rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
+
+ if (!MySubscription->maxretention)
+ return false;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Return early if the wait time has not exceeded the configured maximum
+ * (max_retention_duration). Time spent waiting for table synchronization
+ * is excluded from this calculation, as it occurs infrequently.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+ MySubscription->maxretention +
+ rdt_data->table_sync_wait_time))
+ return false;
+
+ rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+
+ return true;
+}
+
+/*
+ * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
+ */
+static void
+stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+ /*
+ * Do not update the catalog during an active transaction. The transaction
+ * may be started during change application, leading to a possible
+ * rollback of catalog updates if the application fails subsequently.
+ */
+ if (IsTransactionState())
+ return;
+
+ StartTransactionCommand();
+
/*
- * Reset all data fields except those used to determine the timing for the
- * next round of transaction ID advancement. We can even use
- * flushpos_update_time in the next round to decide whether to get the
- * latest flush position.
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
*/
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Set pg_subscription.subretentionactive to false */
+ UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
+ MySubscription->name),
+ errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
+ MySubscription->maxretention));
+
+ /* Notify launcher to update the conflict slot */
+ ApplyLauncherWakeup();
+
+ reset_retention_data_fields(rdt_data);
+}
+
+/*
+ * Reset all data fields of RetainDeadTuplesData except those used to
+ * determine the timing for the next round of transaction ID advancement. We
+ * can even use flushpos_update_time in the next round to decide whether to get
+ * the latest flush position.
+ */
+static void
+reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
+{
rdt_data->phase = RDT_GET_CANDIDATE_XID;
rdt_data->remote_lsn = InvalidXLogRecPtr;
rdt_data->remote_oldestxid = InvalidFullTransactionId;
@@ -4607,22 +4781,25 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
rdt_data->reply_time = 0;
rdt_data->remote_wait_for = InvalidFullTransactionId;
rdt_data->candidate_xid = InvalidTransactionId;
-
- /* process the next phase */
- process_rdt_phase_transition(rdt_data, false);
+ rdt_data->table_sync_wait_time = 0;
}
/*
* Adjust the interval for advancing non-removable transaction IDs.
*
- * We double the interval to try advancing the non-removable transaction IDs
- * if there is no activity on the node. The maximum value of the interval is
- * capped by wal_receiver_status_interval if it is not zero, otherwise to a
- * 3 minutes which should be sufficient to avoid using CPU or network
- * resources without much benefit.
+ * If there is no activity on the node, we progressively double the interval
+ * used to advance non-removable transaction ID. This helps conserve CPU
+ * and network resources when there's little benefit to frequent updates.
+ *
+ * The interval is capped by the lowest of the following:
+ * - wal_receiver_status_interval (if set),
+ * - a default maximum of 3 minutes,
+ * - max_retention_duration.
*
- * The interval is reset to a minimum value of 100ms once there is some
- * activity on the node.
+ * This ensures the interval never exceeds the retention boundary, even if
+ * other limits are higher. Once activity resumes on the node, the interval
+ * is reset to lesser of 100ms and max_retention_duration, allowing timely
+ * advancement of non-removable transaction ID.
*
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises.
@@ -4651,6 +4828,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
*/
rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
}
+
+ /* Ensure the wait time remains within the maximum limit */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+ MySubscription->maxretention);
}
/*
@@ -5458,11 +5639,12 @@ InitializeLogRepWorker(void)
* dropped, a restart is initiated.
*
* The oldest_nonremovable_xid should be initialized only when the
- * retain_dead_tuples is enabled before launching the worker. See
+ * subscription's retention is active before launching the worker. See
* logicalrep_worker_launch.
*/
if (am_leader_apply_worker() &&
MySubscription->retaindeadtuples &&
+ MySubscription->retentionactive &&
!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
{
ereport(LOG,
@@ -5633,8 +5815,9 @@ DisableSubscriptionAndExit(void)
* an error, as verifying commit timestamps is unnecessary in this
* context.
*/
- if (MySubscription->retaindeadtuples)
- CheckSubDeadTupleRetention(false, true, WARNING);
+ CheckSubDeadTupleRetention(false, true, WARNING,
+ MySubscription->retaindeadtuples,
+ MySubscription->retentionactive, false);
proc_exit(0);
}