diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 125 |
1 files changed, 84 insertions, 41 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..add2e2e066c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -43,6 +43,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); +static void init_conflict_slot_xmin(void); /* @@ -152,6 +154,7 @@ get_subscription_list(void) sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); sub->retaindeadtuples = subform->subretaindeadtuples; + sub->retentionactive = subform->subretentionactive; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; - bool can_advance_xmin = true; + bool can_update_xmin = true; bool retain_dead_tuples = false; TransactionId xmin = InvalidTransactionId; @@ -1215,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg) retain_dead_tuples = true; /* - * Can't advance xmin of the slot unless all the subscriptions - * with retain_dead_tuples are enabled. This is required to - * ensure that we don't advance the xmin of - * CONFLICT_DETECTION_SLOT if one of the subscriptions is not - * enabled. Otherwise, we won't be able to detect conflicts - * reliably for such a subscription even though it has set the - * retain_dead_tuples option. - */ - can_advance_xmin &= sub->enabled; - - /* * Create a replication slot to retain information necessary * for conflict detection such as dead tuples, commit * timestamps, and origins. @@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg) * subscription was enabled. */ CreateConflictDetectionSlot(); + + if (sub->retentionactive) + { + /* + * Can't advance xmin of the slot unless all the + * subscriptions actively retaining dead tuples are + * enabled. This is required to ensure that we don't + * advance the xmin of CONFLICT_DETECTION_SLOT if one of + * the subscriptions is not enabled. Otherwise, we won't + * be able to detect conflicts reliably for such a + * subscription even though it has set the + * retain_dead_tuples option. + */ + can_update_xmin &= sub->enabled; + + /* + * Initialize the slot once the subscription activiates + * retention. + */ + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)) + init_conflict_slot_xmin(); + } } if (!sub->enabled) @@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg) /* * Compute the minimum xmin required to protect dead tuples * required for conflict detection among all running apply - * workers that enables retain_dead_tuples. + * workers. */ - if (sub->retaindeadtuples && can_advance_xmin) + if (sub->retaindeadtuples && + sub->retentionactive && + can_update_xmin) compute_min_nonremovable_xid(w, &xmin); /* worker is running already */ @@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg) /* * Can't advance xmin of the slot unless all the workers - * corresponding to subscriptions with retain_dead_tuples are - * running, disabling the further computation of the minimum + * corresponding to subscriptions actively retaining dead tuples + * are running, disabling the further computation of the minimum * nonremovable xid. */ - if (sub->retaindeadtuples) - can_advance_xmin = false; + if (sub->retaindeadtuples && sub->retentionactive) + can_update_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg) sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID, - sub->retaindeadtuples)) + sub->retaindeadtuples && + sub->retentionactive)) { /* * We get here either if we failed to launch a worker @@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * Additionally, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to stop retention, the + * slot's xmin will be set to InvalidTransactionId allowing the + * removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); - else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + else if (can_update_xmin) + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) nonremovable_xid = worker->oldest_nonremovable_xid; SpinLockRelease(&worker->relmutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + /* + * Return if the apply worker has stopped retention concurrently. + * + * Although this function is invoked only when retentionactive is true, + * the apply worker might stop retention after the launcher fetches the + * retentionactive flag. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + return; if (!TransactionIdIsValid(*xmin) || TransactionIdPrecedes(nonremovable_xid, *xmin)) @@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; @@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin) } /* - * Create and acquire the replication slot used to retain information for - * conflict detection, if not yet. + * Initialize the xmin for the conflict detection slot. */ -void -CreateConflictDetectionSlot(void) +static void +init_conflict_slot_xmin(void) { TransactionId xmin_horizon; - /* Exit early, if the replication slot is already created and acquired */ - if (MyReplicationSlot) - return; - - ereport(LOG, - errmsg("creating replication conflict detection slot")); - - ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + /* Replication slot must exist but shouldn't be initialized. */ + Assert(MyReplicationSlot && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1476,6 +1499,26 @@ CreateConflictDetectionSlot(void) } /* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + init_conflict_slot_xmin(); +} + +/* * Is current process the logical replication launcher? */ bool |