summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/ref/create_subscription.sgml9
-rw-r--r--src/backend/replication/logical/launcher.c10
-rw-r--r--src/backend/replication/logical/worker.c200
-rw-r--r--src/test/subscription/t/035_conflicts.pl27
4 files changed, 196 insertions, 50 deletions
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index fc314437311..ed82cf1809e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -538,10 +538,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<literal>retain_dead_tuples</literal> is enabled, confirm that the
retention duration has exceeded the
<literal>max_retention_duration</literal> set within the corresponding
- subscription. The retention will not be automatically resumed unless a
- new subscription is created with <literal>retain_dead_tuples =
- true</literal>, or the user manually re-enables
- <literal>retain_dead_tuples</literal>.
+ subscription. The retention will automatically resume when at least one
+ apply worker confirms that the retention duration is within the
+ specified limit, or when a new subscription is created with
+ <literal>retain_dead_tuples = true</literal>. Alternatively, retention
+ can be manually resumed by re-enabling <literal>retain_dead_tuples</literal>.
</para>
<para>
Note that overall retention will not stop if other subscriptions that
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index add2e2e066c..c900b6cf3b1 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1261,24 +1261,30 @@ ApplyLauncherMain(Datum main_arg)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
- LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
{
/*
* Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply
- * workers.
+ * workers. This computation is performed while holding
+ * LogicalRepWorkerLock to prevent accessing invalid worker
+ * data, in scenarios where a worker might exit and reset its
+ * state concurrently.
*/
if (sub->retaindeadtuples &&
sub->retentionactive &&
can_update_xmin)
compute_min_nonremovable_xid(w, &xmin);
+ LWLockRelease(LogicalRepWorkerLock);
+
/* worker is running already */
continue;
}
+ LWLockRelease(LogicalRepWorkerLock);
+
/*
* Can't advance xmin of the slot unless all the workers
* corresponding to subscriptions actively retaining dead tuples
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ee6ac22329f..9b5885d57cf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -181,6 +181,15 @@
* pg_subscription.subretentionactive is updated to false within a new
* transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
*
+ * - RDT_RESUME_CONFLICT_INFO_RETENTION:
+ * This phase is required only when max_retention_duration is defined. We
+ * enter this phase if the retention was previously stopped, and the time
+ * required to advance the non-removable transaction ID in the
+ * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
+ * (or if max_retention_duration is set to 0). During this phase,
+ * pg_subscription.subretentionactive is updated to true within a new
+ * transaction, and the worker will be restarted.
+ *
* The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -381,7 +390,8 @@ typedef enum
RDT_REQUEST_PUBLISHER_STATUS,
RDT_WAIT_FOR_PUBLISHER_STATUS,
RDT_WAIT_FOR_LOCAL_FLUSH,
- RDT_STOP_CONFLICT_INFO_RETENTION
+ RDT_STOP_CONFLICT_INFO_RETENTION,
+ RDT_RESUME_CONFLICT_INFO_RETENTION,
} RetainDeadTuplesPhase;
/*
@@ -568,10 +578,14 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static bool update_retention_status(bool active);
static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
bool new_xid_found);
+static void apply_worker_exit(void);
+
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -4367,10 +4381,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
if (!MySubscription->retaindeadtuples)
return false;
- /* No need to advance if we have already stopped retaining */
- if (!MySubscription->retentionactive)
- return false;
-
return true;
}
@@ -4399,6 +4409,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
case RDT_STOP_CONFLICT_INFO_RETENTION:
stop_conflict_info_retention(rdt_data);
break;
+ case RDT_RESUME_CONFLICT_INFO_RETENTION:
+ resume_conflict_info_retention(rdt_data);
+ break;
}
}
@@ -4522,7 +4535,10 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
* retaining conflict information for this worker.
*/
if (should_stop_conflict_info_retention(rdt_data))
+ {
+ rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
return;
+ }
if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
rdt_data->remote_wait_for = rdt_data->remote_nextxid;
@@ -4643,7 +4659,10 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* retaining conflict information for this worker.
*/
if (should_stop_conflict_info_retention(rdt_data))
+ {
+ rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
return;
+ }
/*
* Update and check the remote flush position if we are applying changes
@@ -4673,6 +4692,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
return;
/*
+ * Reaching this point implies should_stop_conflict_info_retention()
+ * returned false earlier, meaning that the most recent duration for
+ * advancing the non-removable transaction ID is within the
+ * max_retention_duration or max_retention_duration is set to 0.
+ *
+ * Therefore, if conflict info retention was previously stopped due to a
+ * timeout, it is now safe to resume retention.
+ */
+ if (!MySubscription->retentionactive)
+ {
+ rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
+ return;
+ }
+
+ /*
* Reaching here means the remote WAL position has been received, and all
* transactions up to that position on the publisher have been applied and
* flushed locally. So, we can advance the non-removable transaction ID.
@@ -4698,13 +4732,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* Check whether conflict information retention should be stopped due to
* exceeding the maximum wait time (max_retention_duration).
*
- * If retention should be stopped, transition to the
- * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
- * false.
- *
- * Note: Retention won't be resumed automatically. The user must manually
- * disable retain_dead_tuples and re-enable it after confirming that the
- * replication slot maintained by the launcher has been dropped.
+ * If retention should be stopped, return true. Otherwise, return false.
*/
static bool
should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4735,11 +4763,6 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
rdt_data->table_sync_wait_time))
return false;
- rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
-
- /* process the next phase */
- process_rdt_phase_transition(rdt_data, false);
-
return true;
}
@@ -4749,13 +4772,93 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
static void
stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
{
+ /* Stop retention if not yet */
+ if (MySubscription->retentionactive)
+ {
+ /*
+ * If the retention status cannot be updated (e.g., due to active
+ * transaction), skip further processing to avoid inconsistent
+ * retention behavior.
+ */
+ if (!update_retention_status(false))
+ return;
+
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
+ MySubscription->name),
+ errdetail("Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms.",
+ MySubscription->maxretention));
+ }
+
+ Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
+
+ /*
+ * If retention has been stopped, reset to the initial phase to retry
+ * resuming retention. This reset is required to recalculate the current
+ * wait time and resume retention if the time falls within
+ * max_retention_duration.
+ */
+ reset_retention_data_fields(rdt_data);
+}
+
+/*
+ * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
+ */
+static void
+resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+ /* We can't resume retention without updating retention status. */
+ if (!update_retention_status(true))
+ return;
+
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
+ MySubscription->name),
+ MySubscription->maxretention
+ ? errdetail("Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms.",
+ MySubscription->maxretention)
+ : errdetail("Retention is re-enabled as max_retention_duration is set to unlimited."));
+
+ /*
+ * Restart the worker to let the launcher initialize
+ * oldest_nonremovable_xid at startup.
+ *
+ * While it's technically possible to derive this value on-the-fly using
+ * the conflict detection slot's xmin, doing so risks a race condition:
+ * the launcher might clean slot.xmin just after retention resumes. This
+ * would make oldest_nonremovable_xid unreliable, especially during xid
+ * wraparound.
+ *
+ * Although this can be prevented by introducing heavy weight locking, the
+ * complexity it will bring doesn't seem worthwhile given how rarely
+ * retention is resumed.
+ */
+ apply_worker_exit();
+}
+
+/*
+ * Updates pg_subscription.subretentionactive to the given value within a
+ * new transaction.
+ *
+ * If already inside an active transaction, skips the update and returns
+ * false.
+ *
+ * Returns true if the update is successfully performed.
+ */
+static bool
+update_retention_status(bool active)
+{
/*
* Do not update the catalog during an active transaction. The transaction
* may be started during change application, leading to a possible
* rollback of catalog updates if the application fails subsequently.
*/
if (IsTransactionState())
- return;
+ return false;
StartTransactionCommand();
@@ -4765,26 +4868,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
*/
PushActiveSnapshot(GetTransactionSnapshot());
- /* Set pg_subscription.subretentionactive to false */
- UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
+ /* Update pg_subscription.subretentionactive */
+ UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
PopActiveSnapshot();
CommitTransactionCommand();
- SpinLockAcquire(&MyLogicalRepWorker->relmutex);
- MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
- ereport(LOG,
- errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
- MySubscription->name),
- errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
- MySubscription->maxretention));
-
/* Notify launcher to update the conflict slot */
ApplyLauncherWakeup();
- reset_retention_data_fields(rdt_data);
+ MySubscription->retentionactive = active;
+
+ return true;
}
/*
@@ -4809,19 +4904,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
/*
* Adjust the interval for advancing non-removable transaction IDs.
*
- * If there is no activity on the node, we progressively double the interval
- * used to advance non-removable transaction ID. This helps conserve CPU
- * and network resources when there's little benefit to frequent updates.
+ * If there is no activity on the node or retention has been stopped, we
+ * progressively double the interval used to advance non-removable transaction
+ * ID. This helps conserve CPU and network resources when there's little benefit
+ * to frequent updates.
*
* The interval is capped by the lowest of the following:
- * - wal_receiver_status_interval (if set),
+ * - wal_receiver_status_interval (if set and retention is active),
* - a default maximum of 3 minutes,
- * - max_retention_duration.
+ * - max_retention_duration (if retention is active).
*
- * This ensures the interval never exceeds the retention boundary, even if
- * other limits are higher. Once activity resumes on the node, the interval
- * is reset to lesser of 100ms and max_retention_duration, allowing timely
- * advancement of non-removable transaction ID.
+ * This ensures the interval never exceeds the retention boundary, even if other
+ * limits are higher. Once activity resumes on the node and the retention is
+ * active, the interval is reset to lesser of 100ms and max_retention_duration,
+ * allowing timely advancement of non-removable transaction ID.
*
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises.
@@ -4829,7 +4925,7 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
static void
adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
{
- if (!new_xid_found && rdt_data->xid_advance_interval)
+ if (rdt_data->xid_advance_interval && !new_xid_found)
{
int max_interval = wal_receiver_status_interval
? wal_receiver_status_interval * 1000
@@ -4842,6 +4938,18 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
max_interval);
}
+ else if (rdt_data->xid_advance_interval &&
+ !MySubscription->retentionactive)
+ {
+ /*
+ * Retention has been stopped, so double the interval-capped at a
+ * maximum of 3 minutes. The wal_receiver_status_interval is
+ * intentionally not used as a upper bound, since the likelihood of
+ * retention resuming is lower than that of general activity resuming.
+ */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+ MAX_XID_ADVANCE_INTERVAL);
+ }
else
{
/*
@@ -4851,9 +4959,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
}
- /* Ensure the wait time remains within the maximum limit */
- rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
- MySubscription->maxretention);
+ /*
+ * Ensure the wait time remains within the maximum retention time limit
+ * when retention is active.
+ */
+ if (MySubscription->retentionactive)
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+ MySubscription->maxretention);
}
/*
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 880551fc69d..f2aee0f70df 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -632,6 +632,33 @@ $node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
$node_B->reload;
###############################################################################
+# Check that dead tuple retention resumes when the max_retention_duration is set
+# 0.
+###############################################################################
+
+$log_offset = -s $node_A->logfile;
+
+# Set max_retention_duration to 0
+$node_A->safe_psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 0);");
+
+# Confirm that the retention resumes
+$node_A->wait_for_log(
+ qr/logical replication worker for subscription "tap_sub_a_b" will resume retaining the information for detecting conflicts
+.*DETAIL:.* Retention is re-enabled as max_retention_duration is set to unlimited.*/,
+ $log_offset);
+
+ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is valid on Node A");
+
+$result = $node_A->safe_psql('postgres',
+ "SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
+is($result, qq(t), 'retention is active');
+
+###############################################################################
# Check that the replication slot pg_conflict_detection is dropped after
# removing all the subscriptions.
###############################################################################