diff options
author | Amit Kapila <akapila@postgresql.org> | 2025-09-15 08:44:54 +0000 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2025-09-15 08:46:55 +0000 |
commit | 0d48d393d465b6f1abe18b86bd5ac2de0636a40e (patch) | |
tree | 8c1a42ff5bc36afeee8c67365174b46866ee148f | |
parent | 282d0bdee6192f1a859ee34672ae73abf49794dc (diff) |
Resume conflict-relevant data retention automatically.
This commit resumes automatic retention of conflict-relevant data for a
subscription. Previously, retention would stop if the apply process failed
to advance its xmin (oldest_nonremovable_xid) within the configured
max_retention_duration and user needs to manually re-enable
retain_dead_tuples option. With this change, retention will resume
automatically once the apply worker catches up and begins advancing its
xmin (oldest_nonremovable_xid) within the configured threshold.
Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
-rw-r--r-- | doc/src/sgml/ref/create_subscription.sgml | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 10 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 200 | ||||
-rw-r--r-- | src/test/subscription/t/035_conflicts.pl | 27 |
4 files changed, 196 insertions, 50 deletions
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index fc314437311..ed82cf1809e 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -538,10 +538,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl <literal>retain_dead_tuples</literal> is enabled, confirm that the retention duration has exceeded the <literal>max_retention_duration</literal> set within the corresponding - subscription. The retention will not be automatically resumed unless a - new subscription is created with <literal>retain_dead_tuples = - true</literal>, or the user manually re-enables - <literal>retain_dead_tuples</literal>. + subscription. The retention will automatically resume when at least one + apply worker confirms that the retention duration is within the + specified limit, or when a new subscription is created with + <literal>retain_dead_tuples = true</literal>. Alternatively, retention + can be manually resumed by re-enabling <literal>retain_dead_tuples</literal>. </para> <para> Note that overall retention will not stop if other subscriptions that diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index add2e2e066c..c900b6cf3b1 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1261,24 +1261,30 @@ ApplyLauncherMain(Datum main_arg) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); if (w != NULL) { /* * Compute the minimum xmin required to protect dead tuples * required for conflict detection among all running apply - * workers. + * workers. This computation is performed while holding + * LogicalRepWorkerLock to prevent accessing invalid worker + * data, in scenarios where a worker might exit and reset its + * state concurrently. */ if (sub->retaindeadtuples && sub->retentionactive && can_update_xmin) compute_min_nonremovable_xid(w, &xmin); + LWLockRelease(LogicalRepWorkerLock); + /* worker is running already */ continue; } + LWLockRelease(LogicalRepWorkerLock); + /* * Can't advance xmin of the slot unless all the workers * corresponding to subscriptions actively retaining dead tuples diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ee6ac22329f..9b5885d57cf 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -181,6 +181,15 @@ * pg_subscription.subretentionactive is updated to false within a new * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. * + * - RDT_RESUME_CONFLICT_INFO_RETENTION: + * This phase is required only when max_retention_duration is defined. We + * enter this phase if the retention was previously stopped, and the time + * required to advance the non-removable transaction ID in the + * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits + * (or if max_retention_duration is set to 0). During this phase, + * pg_subscription.subretentionactive is updated to true within a new + * transaction, and the worker will be restarted. + * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> @@ -381,7 +390,8 @@ typedef enum RDT_REQUEST_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS, RDT_WAIT_FOR_LOCAL_FLUSH, - RDT_STOP_CONFLICT_INFO_RETENTION + RDT_STOP_CONFLICT_INFO_RETENTION, + RDT_RESUME_CONFLICT_INFO_RETENTION, } RetainDeadTuplesPhase; /* @@ -568,10 +578,14 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static bool update_retention_status(bool active); static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); +static void apply_worker_exit(void); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -4367,10 +4381,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; - /* No need to advance if we have already stopped retaining */ - if (!MySubscription->retentionactive) - return false; - return true; } @@ -4399,6 +4409,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_STOP_CONFLICT_INFO_RETENTION: stop_conflict_info_retention(rdt_data); break; + case RDT_RESUME_CONFLICT_INFO_RETENTION: + resume_conflict_info_retention(rdt_data); + break; } } @@ -4522,7 +4535,10 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, * retaining conflict information for this worker. */ if (should_stop_conflict_info_retention(rdt_data)) + { + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; return; + } if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4643,7 +4659,10 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * retaining conflict information for this worker. */ if (should_stop_conflict_info_retention(rdt_data)) + { + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; return; + } /* * Update and check the remote flush position if we are applying changes @@ -4673,6 +4692,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) return; /* + * Reaching this point implies should_stop_conflict_info_retention() + * returned false earlier, meaning that the most recent duration for + * advancing the non-removable transaction ID is within the + * max_retention_duration or max_retention_duration is set to 0. + * + * Therefore, if conflict info retention was previously stopped due to a + * timeout, it is now safe to resume retention. + */ + if (!MySubscription->retentionactive) + { + rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION; + return; + } + + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and * flushed locally. So, we can advance the non-removable transaction ID. @@ -4698,13 +4732,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * Check whether conflict information retention should be stopped due to * exceeding the maximum wait time (max_retention_duration). * - * If retention should be stopped, transition to the - * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return - * false. - * - * Note: Retention won't be resumed automatically. The user must manually - * disable retain_dead_tuples and re-enable it after confirming that the - * replication slot maintained by the launcher has been dropped. + * If retention should be stopped, return true. Otherwise, return false. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4735,11 +4763,6 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time)) return false; - rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; - - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); - return true; } @@ -4749,13 +4772,93 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) { + /* Stop retention if not yet */ + if (MySubscription->retentionactive) + { + /* + * If the retention status cannot be updated (e.g., due to active + * transaction), skip further processing to avoid inconsistent + * retention behavior. + */ + if (!update_retention_status(false)) + return; + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms.", + MySubscription->maxretention)); + } + + Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)); + + /* + * If retention has been stopped, reset to the initial phase to retry + * resuming retention. This reset is required to recalculate the current + * wait time and resume retention if the time falls within + * max_retention_duration. + */ + reset_retention_data_fields(rdt_data); +} + +/* + * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase. + */ +static void +resume_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + /* We can't resume retention without updating retention status. */ + if (!update_retention_status(true)) + return; + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts", + MySubscription->name), + MySubscription->maxretention + ? errdetail("Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms.", + MySubscription->maxretention) + : errdetail("Retention is re-enabled as max_retention_duration is set to unlimited.")); + + /* + * Restart the worker to let the launcher initialize + * oldest_nonremovable_xid at startup. + * + * While it's technically possible to derive this value on-the-fly using + * the conflict detection slot's xmin, doing so risks a race condition: + * the launcher might clean slot.xmin just after retention resumes. This + * would make oldest_nonremovable_xid unreliable, especially during xid + * wraparound. + * + * Although this can be prevented by introducing heavy weight locking, the + * complexity it will bring doesn't seem worthwhile given how rarely + * retention is resumed. + */ + apply_worker_exit(); +} + +/* + * Updates pg_subscription.subretentionactive to the given value within a + * new transaction. + * + * If already inside an active transaction, skips the update and returns + * false. + * + * Returns true if the update is successfully performed. + */ +static bool +update_retention_status(bool active) +{ /* * Do not update the catalog during an active transaction. The transaction * may be started during change application, leading to a possible * rollback of catalog updates if the application fails subsequently. */ if (IsTransactionState()) - return; + return false; StartTransactionCommand(); @@ -4765,26 +4868,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) */ PushActiveSnapshot(GetTransactionSnapshot()); - /* Set pg_subscription.subretentionactive to false */ - UpdateDeadTupleRetentionStatus(MySubscription->oid, false); + /* Update pg_subscription.subretentionactive */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, active); PopActiveSnapshot(); CommitTransactionCommand(); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", - MySubscription->name), - errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.", - MySubscription->maxretention)); - /* Notify launcher to update the conflict slot */ ApplyLauncherWakeup(); - reset_retention_data_fields(rdt_data); + MySubscription->retentionactive = active; + + return true; } /* @@ -4809,19 +4904,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) /* * Adjust the interval for advancing non-removable transaction IDs. * - * If there is no activity on the node, we progressively double the interval - * used to advance non-removable transaction ID. This helps conserve CPU - * and network resources when there's little benefit to frequent updates. + * If there is no activity on the node or retention has been stopped, we + * progressively double the interval used to advance non-removable transaction + * ID. This helps conserve CPU and network resources when there's little benefit + * to frequent updates. * * The interval is capped by the lowest of the following: - * - wal_receiver_status_interval (if set), + * - wal_receiver_status_interval (if set and retention is active), * - a default maximum of 3 minutes, - * - max_retention_duration. + * - max_retention_duration (if retention is active). * - * This ensures the interval never exceeds the retention boundary, even if - * other limits are higher. Once activity resumes on the node, the interval - * is reset to lesser of 100ms and max_retention_duration, allowing timely - * advancement of non-removable transaction ID. + * This ensures the interval never exceeds the retention boundary, even if other + * limits are higher. Once activity resumes on the node and the retention is + * active, the interval is reset to lesser of 100ms and max_retention_duration, + * allowing timely advancement of non-removable transaction ID. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4829,7 +4925,7 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) { - if (!new_xid_found && rdt_data->xid_advance_interval) + if (rdt_data->xid_advance_interval && !new_xid_found) { int max_interval = wal_receiver_status_interval ? wal_receiver_status_interval * 1000 @@ -4842,6 +4938,18 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, max_interval); } + else if (rdt_data->xid_advance_interval && + !MySubscription->retentionactive) + { + /* + * Retention has been stopped, so double the interval-capped at a + * maximum of 3 minutes. The wal_receiver_status_interval is + * intentionally not used as a upper bound, since the likelihood of + * retention resuming is lower than that of general activity resuming. + */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, + MAX_XID_ADVANCE_INTERVAL); + } else { /* @@ -4851,9 +4959,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; } - /* Ensure the wait time remains within the maximum limit */ - rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, - MySubscription->maxretention); + /* + * Ensure the wait time remains within the maximum retention time limit + * when retention is active. + */ + if (MySubscription->retentionactive) + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxretention); } /* diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 880551fc69d..f2aee0f70df 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -632,6 +632,33 @@ $node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); $node_B->reload; ############################################################################### +# Check that dead tuple retention resumes when the max_retention_duration is set +# 0. +############################################################################### + +$log_offset = -s $node_A->logfile; + +# Set max_retention_duration to 0 +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 0);"); + +# Confirm that the retention resumes +$node_A->wait_for_log( + qr/logical replication worker for subscription "tap_sub_a_b" will resume retaining the information for detecting conflicts +.*DETAIL:.* Retention is re-enabled as max_retention_duration is set to unlimited.*/, + $log_offset); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); + +$result = $node_A->safe_psql('postgres', + "SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';"); +is($result, qq(t), 'retention is active'); + +############################################################################### # Check that the replication slot pg_conflict_detection is dropped after # removing all the subscriptions. ############################################################################### |