diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 125 | ||||
-rw-r--r-- | src/backend/replication/logical/slotsync.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 271 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 3 |
4 files changed, 315 insertions, 87 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..add2e2e066c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -43,6 +43,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); +static void init_conflict_slot_xmin(void); /* @@ -152,6 +154,7 @@ get_subscription_list(void) sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); sub->retaindeadtuples = subform->subretaindeadtuples; + sub->retentionactive = subform->subretentionactive; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; - bool can_advance_xmin = true; + bool can_update_xmin = true; bool retain_dead_tuples = false; TransactionId xmin = InvalidTransactionId; @@ -1215,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg) retain_dead_tuples = true; /* - * Can't advance xmin of the slot unless all the subscriptions - * with retain_dead_tuples are enabled. This is required to - * ensure that we don't advance the xmin of - * CONFLICT_DETECTION_SLOT if one of the subscriptions is not - * enabled. Otherwise, we won't be able to detect conflicts - * reliably for such a subscription even though it has set the - * retain_dead_tuples option. - */ - can_advance_xmin &= sub->enabled; - - /* * Create a replication slot to retain information necessary * for conflict detection such as dead tuples, commit * timestamps, and origins. @@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg) * subscription was enabled. */ CreateConflictDetectionSlot(); + + if (sub->retentionactive) + { + /* + * Can't advance xmin of the slot unless all the + * subscriptions actively retaining dead tuples are + * enabled. This is required to ensure that we don't + * advance the xmin of CONFLICT_DETECTION_SLOT if one of + * the subscriptions is not enabled. Otherwise, we won't + * be able to detect conflicts reliably for such a + * subscription even though it has set the + * retain_dead_tuples option. + */ + can_update_xmin &= sub->enabled; + + /* + * Initialize the slot once the subscription activiates + * retention. + */ + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)) + init_conflict_slot_xmin(); + } } if (!sub->enabled) @@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg) /* * Compute the minimum xmin required to protect dead tuples * required for conflict detection among all running apply - * workers that enables retain_dead_tuples. + * workers. */ - if (sub->retaindeadtuples && can_advance_xmin) + if (sub->retaindeadtuples && + sub->retentionactive && + can_update_xmin) compute_min_nonremovable_xid(w, &xmin); /* worker is running already */ @@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg) /* * Can't advance xmin of the slot unless all the workers - * corresponding to subscriptions with retain_dead_tuples are - * running, disabling the further computation of the minimum + * corresponding to subscriptions actively retaining dead tuples + * are running, disabling the further computation of the minimum * nonremovable xid. */ - if (sub->retaindeadtuples) - can_advance_xmin = false; + if (sub->retaindeadtuples && sub->retentionactive) + can_update_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg) sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID, - sub->retaindeadtuples)) + sub->retaindeadtuples && + sub->retentionactive)) { /* * We get here either if we failed to launch a worker @@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * Additionally, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to stop retention, the + * slot's xmin will be set to InvalidTransactionId allowing the + * removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); - else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + else if (can_update_xmin) + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) nonremovable_xid = worker->oldest_nonremovable_xid; SpinLockRelease(&worker->relmutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + /* + * Return if the apply worker has stopped retention concurrently. + * + * Although this function is invoked only when retentionactive is true, + * the apply worker might stop retention after the launcher fetches the + * retentionactive flag. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + return; if (!TransactionIdIsValid(*xmin) || TransactionIdPrecedes(nonremovable_xid, *xmin)) @@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; @@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin) } /* - * Create and acquire the replication slot used to retain information for - * conflict detection, if not yet. + * Initialize the xmin for the conflict detection slot. */ -void -CreateConflictDetectionSlot(void) +static void +init_conflict_slot_xmin(void) { TransactionId xmin_horizon; - /* Exit early, if the replication slot is already created and acquired */ - if (MyReplicationSlot) - return; - - ereport(LOG, - errmsg("creating replication conflict detection slot")); - - ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + /* Replication slot must exist but shouldn't be initialized. */ + Assert(MyReplicationSlot && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1476,6 +1499,26 @@ CreateConflictDetectionSlot(void) } /* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + init_conflict_slot_xmin(); +} + +/* * Is current process the logical replication launcher? */ bool diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index f5501c106dc..9d0072a49ed 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1476,7 +1476,6 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) */ wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, app_name.data, &err); - pfree(app_name.data); if (!wrconn) ereport(ERROR, @@ -1484,6 +1483,8 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) errmsg("synchronization worker \"%s\" could not connect to the primary server: %s", app_name.data, err)); + pfree(app_name.data); + /* * Register the disconnection callback. * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 22ad9051db3..f1ebd63e792 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,6 +173,14 @@ * Advance the non-removable transaction ID if the current flush location has * reached or surpassed the last received WAL position. * + * - RDT_STOP_CONFLICT_INFO_RETENTION: + * This phase is required only when max_retention_duration is defined. We + * enter this phase if the wait time in either the + * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds + * configured max_retention_duration. In this phase, + * pg_subscription.subretentionactive is updated to false within a new + * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. + * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> @@ -373,7 +381,8 @@ typedef enum RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS, - RDT_WAIT_FOR_LOCAL_FLUSH + RDT_WAIT_FOR_LOCAL_FLUSH, + RDT_STOP_CONFLICT_INFO_RETENTION } RetainDeadTuplesPhase; /* @@ -415,6 +424,9 @@ typedef struct RetainDeadTuplesData * updated in final phase * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + long table_sync_wait_time; /* time spent waiting for table sync + * to finish */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -555,6 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3219,7 +3234,6 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TimestampTz *delete_time) { TransactionId oldestxmin; - ReplicationSlot *slot; /* * Return false if either dead tuples are not retained or commit timestamp @@ -3229,32 +3243,49 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, return false; /* - * For conflict detection, we use the conflict slot's xmin value instead - * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as - * a threshold to identify tuples that were recently deleted. These tuples - * are not visible to concurrent transactions, but we log an - * update_deleted conflict if such a tuple matches the remote update being - * applied. + * For conflict detection, we use the leader worker's + * oldest_nonremovable_xid value instead of invoking + * GetOldestNonRemovableTransactionId() or using the conflict detection + * slot's xmin. The oldest_nonremovable_xid acts as a threshold to + * identify tuples that were recently deleted. These deleted tuples are no + * longer visible to concurrent transactions. However, if a remote update + * matches such a tuple, we log an update_deleted conflict. * - * Although GetOldestNonRemovableTransactionId() can return a value older - * than the slot's xmin, for our current purpose it is acceptable to treat - * tuples deleted by transactions prior to slot.xmin as update_missing - * conflicts. - * - * Ideally, we would use oldest_nonremovable_xid, which is directly - * maintained by the leader apply worker. However, this value is not - * available to table synchronization or parallel apply workers, making - * slot.xmin a practical alternative in those contexts. + * While GetOldestNonRemovableTransactionId() and slot.xmin may return + * transaction IDs older than oldest_nonremovable_xid, for our current + * purpose, it is acceptable to treat tuples deleted by transactions prior + * to oldest_nonremovable_xid as update_missing conflicts. */ - slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true); + if (am_leader_apply_worker()) + { + oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid; + } + else + { + LogicalRepWorker *leader; - Assert(slot); + /* + * Obtain the information from the leader apply worker as only the + * leader manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + leader = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); - SpinLockAcquire(&slot->mutex); - oldestxmin = slot->data.xmin; - SpinLockRelease(&slot->mutex); + SpinLockAcquire(&leader->relmutex); + oldestxmin = leader->oldest_nonremovable_xid; + SpinLockRelease(&leader->relmutex); + LWLockRelease(LogicalRepWorkerLock); + } - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; if (OidIsValid(localidxoid) && IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) @@ -4108,11 +4139,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Ensure to wake up when it's possible to advance the non-removable - * transaction ID. + * transaction ID, or when the retention duration may have exceeded + * max_retention_duration. */ - if (rdt_data.phase == RDT_GET_CANDIDATE_XID && - rdt_data.xid_advance_interval) - wait_time = Min(wait_time, rdt_data.xid_advance_interval); + if (MySubscription->retentionactive) + { + if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + rdt_data.xid_advance_interval) + wait_time = Min(wait_time, rdt_data.xid_advance_interval); + else if (MySubscription->maxretention > 0) + wait_time = Min(wait_time, MySubscription->maxretention); + } rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | @@ -4325,6 +4362,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; + /* No need to advance if we have already stopped retaining */ + if (!MySubscription->retentionactive) + return false; + return true; } @@ -4350,6 +4391,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_WAIT_FOR_LOCAL_FLUSH: wait_for_local_flush(rdt_data); break; + case RDT_STOP_CONFLICT_INFO_RETENTION: + stop_conflict_info_retention(rdt_data); + break; } } @@ -4468,6 +4512,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, if (!status_received) return; + /* + * We don't need to maintain oldest_nonremovable_xid if we decide to stop + * retaining conflict information for this worker. + */ + if (should_stop_conflict_info_retention(rdt_data)) + return; + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4549,6 +4600,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * have a WAL position greater than the rdt_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + TimestampTz now; + + now = rdt_data->last_recv_time + ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Record the time spent waiting for table sync, it is needed for the + * timeout check in should_stop_conflict_info_retention(). + */ + rdt_data->table_sync_wait_time = + TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now); + + return; + } + + /* + * We don't need to maintain oldest_nonremovable_xid if we decide to stop + * retaining conflict information for this worker. + */ + if (should_stop_conflict_info_retention(rdt_data)) return; /* @@ -4594,12 +4666,114 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Check whether conflict information retention should be stopped due to + * exceeding the maximum wait time (max_retention_duration). + * + * If retention should be stopped, transition to the + * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return + * false. + * + * Note: Retention won't be resumed automatically. The user must manually + * disable retain_dead_tuples and re-enable it after confirming that the + * replication slot maintained by the launcher has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!MySubscription->maxretention) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return early if the wait time has not exceeded the configured maximum + * (max_retention_duration). Time spent waiting for table synchronization + * is excluded from this calculation, as it occurs infrequently. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + MySubscription->maxretention + + rdt_data->table_sync_wait_time)) + return false; + + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); + + return true; +} + +/* + * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase. + */ +static void +stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + /* + * Do not update the catalog during an active transaction. The transaction + * may be started during change application, leading to a possible + * rollback of catalog updates if the application fails subsequently. + */ + if (IsTransactionState()) + return; + + StartTransactionCommand(); + /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Set pg_subscription.subretentionactive to false */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, false); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.", + MySubscription->maxretention)); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + + reset_retention_data_fields(rdt_data); +} + +/* + * Reset all data fields of RetainDeadTuplesData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_retention_data_fields(RetainDeadTuplesData *rdt_data) +{ rdt_data->phase = RDT_GET_CANDIDATE_XID; rdt_data->remote_lsn = InvalidXLogRecPtr; rdt_data->remote_oldestxid = InvalidFullTransactionId; @@ -4607,22 +4781,25 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) rdt_data->reply_time = 0; rdt_data->remote_wait_for = InvalidFullTransactionId; rdt_data->candidate_xid = InvalidTransactionId; - - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); + rdt_data->table_sync_wait_time = 0; } /* * Adjust the interval for advancing non-removable transaction IDs. * - * We double the interval to try advancing the non-removable transaction IDs - * if there is no activity on the node. The maximum value of the interval is - * capped by wal_receiver_status_interval if it is not zero, otherwise to a - * 3 minutes which should be sufficient to avoid using CPU or network - * resources without much benefit. + * If there is no activity on the node, we progressively double the interval + * used to advance non-removable transaction ID. This helps conserve CPU + * and network resources when there's little benefit to frequent updates. + * + * The interval is capped by the lowest of the following: + * - wal_receiver_status_interval (if set), + * - a default maximum of 3 minutes, + * - max_retention_duration. * - * The interval is reset to a minimum value of 100ms once there is some - * activity on the node. + * This ensures the interval never exceeds the retention boundary, even if + * other limits are higher. Once activity resumes on the node, the interval + * is reset to lesser of 100ms and max_retention_duration, allowing timely + * advancement of non-removable transaction ID. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4651,6 +4828,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) */ rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; } + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxretention); } /* @@ -5458,11 +5639,12 @@ InitializeLogRepWorker(void) * dropped, a restart is initiated. * * The oldest_nonremovable_xid should be initialized only when the - * retain_dead_tuples is enabled before launching the worker. See + * subscription's retention is active before launching the worker. See * logicalrep_worker_launch. */ if (am_leader_apply_worker() && MySubscription->retaindeadtuples && + MySubscription->retentionactive && !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) { ereport(LOG, @@ -5633,8 +5815,9 @@ DisableSubscriptionAndExit(void) * an error, as verifying commit timestamps is unnecessary in this * context. */ - if (MySubscription->retaindeadtuples) - CheckSubDeadTupleRetention(false, true, WARNING); + CheckSubDeadTupleRetention(false, true, WARNING, + MySubscription->retaindeadtuples, + MySubscription->retentionactive, false); proc_exit(0); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 69f4c6157c5..b8f21153e7b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -921,7 +921,6 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) /* Connect to the primary server. */ wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, app_name.data, &err); - pfree(app_name.data); if (!wrconn) ereport(ERROR, @@ -929,6 +928,8 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) errmsg("synchronization worker \"%s\" could not connect to the primary server: %s", app_name.data, err)); + pfree(app_name.data); + SyncReplicationSlots(wrconn); walrcv_disconnect(wrconn); |