summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c125
1 files changed, 84 insertions, 41 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 37377f7eb63..add2e2e066c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -43,6 +43,7 @@
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
@@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
static bool acquire_conflict_slot_if_exists(void);
-static void advance_conflict_slot_xmin(TransactionId new_xmin);
+static void update_conflict_slot_xmin(TransactionId new_xmin);
+static void init_conflict_slot_xmin(void);
/*
@@ -152,6 +154,7 @@ get_subscription_list(void)
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
sub->retaindeadtuples = subform->subretaindeadtuples;
+ sub->retentionactive = subform->subretentionactive;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
- bool can_advance_xmin = true;
+ bool can_update_xmin = true;
bool retain_dead_tuples = false;
TransactionId xmin = InvalidTransactionId;
@@ -1215,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg)
retain_dead_tuples = true;
/*
- * Can't advance xmin of the slot unless all the subscriptions
- * with retain_dead_tuples are enabled. This is required to
- * ensure that we don't advance the xmin of
- * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
- * enabled. Otherwise, we won't be able to detect conflicts
- * reliably for such a subscription even though it has set the
- * retain_dead_tuples option.
- */
- can_advance_xmin &= sub->enabled;
-
- /*
* Create a replication slot to retain information necessary
* for conflict detection such as dead tuples, commit
* timestamps, and origins.
@@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg)
* subscription was enabled.
*/
CreateConflictDetectionSlot();
+
+ if (sub->retentionactive)
+ {
+ /*
+ * Can't advance xmin of the slot unless all the
+ * subscriptions actively retaining dead tuples are
+ * enabled. This is required to ensure that we don't
+ * advance the xmin of CONFLICT_DETECTION_SLOT if one of
+ * the subscriptions is not enabled. Otherwise, we won't
+ * be able to detect conflicts reliably for such a
+ * subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_update_xmin &= sub->enabled;
+
+ /*
+ * Initialize the slot once the subscription activiates
+ * retention.
+ */
+ if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
+ init_conflict_slot_xmin();
+ }
}
if (!sub->enabled)
@@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg)
/*
* Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply
- * workers that enables retain_dead_tuples.
+ * workers.
*/
- if (sub->retaindeadtuples && can_advance_xmin)
+ if (sub->retaindeadtuples &&
+ sub->retentionactive &&
+ can_update_xmin)
compute_min_nonremovable_xid(w, &xmin);
/* worker is running already */
@@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg)
/*
* Can't advance xmin of the slot unless all the workers
- * corresponding to subscriptions with retain_dead_tuples are
- * running, disabling the further computation of the minimum
+ * corresponding to subscriptions actively retaining dead tuples
+ * are running, disabling the further computation of the minimum
* nonremovable xid.
*/
- if (sub->retaindeadtuples)
- can_advance_xmin = false;
+ if (sub->retaindeadtuples && sub->retentionactive)
+ can_update_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg)
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID,
- sub->retaindeadtuples))
+ sub->retaindeadtuples &&
+ sub->retentionactive))
{
/*
* We get here either if we failed to launch a worker
@@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg)
* that requires us to retain dead tuples. Otherwise, if required,
* advance the slot's xmin to protect dead tuples required for the
* conflict detection.
+ *
+ * Additionally, if all apply workers for subscriptions with
+ * retain_dead_tuples enabled have requested to stop retention, the
+ * slot's xmin will be set to InvalidTransactionId allowing the
+ * removal of dead tuples.
*/
if (MyReplicationSlot)
{
if (!retain_dead_tuples)
ReplicationSlotDropAcquired();
- else if (can_advance_xmin)
- advance_conflict_slot_xmin(xmin);
+ else if (can_update_xmin)
+ update_conflict_slot_xmin(xmin);
}
/* Switch back to original memory context. */
@@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
nonremovable_xid = worker->oldest_nonremovable_xid;
SpinLockRelease(&worker->relmutex);
- Assert(TransactionIdIsValid(nonremovable_xid));
+ /*
+ * Return if the apply worker has stopped retention concurrently.
+ *
+ * Although this function is invoked only when retentionactive is true,
+ * the apply worker might stop retention after the launcher fetches the
+ * retentionactive flag.
+ */
+ if (!TransactionIdIsValid(nonremovable_xid))
+ return;
if (!TransactionIdIsValid(*xmin) ||
TransactionIdPrecedes(nonremovable_xid, *xmin))
@@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void)
}
/*
- * Advance the xmin the replication slot used to retain information required
+ * Update the xmin the replication slot used to retain information required
* for conflict detection.
*/
static void
-advance_conflict_slot_xmin(TransactionId new_xmin)
+update_conflict_slot_xmin(TransactionId new_xmin)
{
Assert(MyReplicationSlot);
- Assert(TransactionIdIsValid(new_xmin));
- Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+ Assert(!TransactionIdIsValid(new_xmin) ||
+ TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
- /* Return if the xmin value of the slot cannot be advanced */
+ /* Return if the xmin value of the slot cannot be updated */
if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
return;
@@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin)
}
/*
- * Create and acquire the replication slot used to retain information for
- * conflict detection, if not yet.
+ * Initialize the xmin for the conflict detection slot.
*/
-void
-CreateConflictDetectionSlot(void)
+static void
+init_conflict_slot_xmin(void)
{
TransactionId xmin_horizon;
- /* Exit early, if the replication slot is already created and acquired */
- if (MyReplicationSlot)
- return;
-
- ereport(LOG,
- errmsg("creating replication conflict detection slot"));
-
- ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
- false, false);
+ /* Replication slot must exist but shouldn't be initialized. */
+ Assert(MyReplicationSlot &&
+ !TransactionIdIsValid(MyReplicationSlot->data.xmin));
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -1476,6 +1499,26 @@ CreateConflictDetectionSlot(void)
}
/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ init_conflict_slot_xmin();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool