summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/catalogs.sgml25
-rw-r--r--doc/src/sgml/ref/alter_subscription.sgml5
-rw-r--r--doc/src/sgml/ref/create_subscription.sgml43
-rw-r--r--src/backend/catalog/pg_subscription.c41
-rw-r--r--src/backend/catalog/system_views.sql4
-rw-r--r--src/backend/commands/subscriptioncmds.c163
-rw-r--r--src/backend/replication/logical/launcher.c125
-rw-r--r--src/backend/replication/logical/worker.c271
-rw-r--r--src/bin/pg_dump/pg_dump.c18
-rw-r--r--src/bin/pg_dump/pg_dump.h1
-rw-r--r--src/bin/psql/describe.c12
-rw-r--r--src/bin/psql/tab-complete.in.c6
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_subscription.h16
-rw-r--r--src/include/catalog/pg_subscription_rel.h2
-rw-r--r--src/include/commands/subscriptioncmds.h5
-rw-r--r--src/include/replication/worker_internal.h3
-rw-r--r--src/test/regress/expected/subscription.out186
-rw-r--r--src/test/regress/sql/subscription.sql16
-rw-r--r--src/test/subscription/t/035_conflicts.pl53
20 files changed, 779 insertions, 218 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index da8a7882580..e9095bedf21 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8096,6 +8096,31 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<row>
<entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>submaxretention</structfield> <type>int4</type>
+ </para>
+ <para>
+ The maximum duration (in milliseconds) for which information (e.g., dead
+ tuples, commit timestamps, and origins) useful for conflict detection can
+ be retained.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>subretentionactive</structfield> <type>bool</type>
+ </para>
+ <para>
+ The retention status of information (e.g., dead tuples, commit
+ timestamps, and origins) useful for conflict detection. True if
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+ is enabled, and the retention duration has not exceeded
+ <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>,
+ when defined.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
<structfield>subconninfo</structfield> <type>text</type>
</para>
<para>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index d48cdc76bd3..12f72ba3167 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -236,8 +236,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
- <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
- <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>.
+ <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>,
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>, and
+ <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>.
Only a superuser can set <literal>password_required = false</literal>.
</para>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 247c5bd2604..fc314437311 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -448,7 +448,7 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
If set to <literal>true</literal>, the detection of
<xref linkend="conflict-update-deleted"/> is enabled, and a physical
replication slot named <quote><literal>pg_conflict_detection</literal></quote>
- created on the subscriber to prevent the information for detecting
+ is created on the subscriber to prevent the information for detecting
conflicts from being removed.
</para>
@@ -521,6 +521,47 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="sql-createsubscription-params-with-max-retention-duration">
+ <term><literal>max_retention_duration</literal> (<type>integer</type>)</term>
+ <listitem>
+ <para>
+ Maximum duration in milliseconds for which this subscription's apply worker
+ is allowed to retain the information useful for conflict detection when
+ <literal>retain_dead_tuples</literal> is enabled. The default value
+ is <literal>0</literal>, indicating that the information is retained
+ until it is no longer needed for detection purposes.
+ </para>
+ <para>
+ The information useful for conflict detection is no longer retained if
+ all apply workers associated with the subscriptions, where
+ <literal>retain_dead_tuples</literal> is enabled, confirm that the
+ retention duration has exceeded the
+ <literal>max_retention_duration</literal> set within the corresponding
+ subscription. The retention will not be automatically resumed unless a
+ new subscription is created with <literal>retain_dead_tuples =
+ true</literal>, or the user manually re-enables
+ <literal>retain_dead_tuples</literal>.
+ </para>
+ <para>
+ Note that overall retention will not stop if other subscriptions that
+ have a value greater than 0 for this parameter have not exceeded it,
+ or if they set this option to 0.
+ </para>
+ <para>
+ This option is effective only when
+ <literal>retain_conflict_info</literal> is enabled and the apply
+ worker associated with the subscription is active.
+ </para>
+ <warning>
+ <para>
+ Note that setting a non-zero value for this option could lead to
+ information for conflict detection being removed prematurely,
+ potentially resulting in incorrect conflict detection.
+ </para>
+ </warning>
+ </listitem>
+ </varlistentry>
</variablelist></para>
</listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 244acf52f36..b885890de37 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -104,6 +104,8 @@ GetSubscription(Oid subid, bool missing_ok)
sub->runasowner = subform->subrunasowner;
sub->failover = subform->subfailover;
sub->retaindeadtuples = subform->subretaindeadtuples;
+ sub->maxretention = subform->submaxretention;
+ sub->retentionactive = subform->subretentionactive;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
@@ -598,3 +600,42 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
return res;
}
+
+/*
+ * Update the dead tuple retention status for the given subscription.
+ */
+void
+UpdateDeadTupleRetentionStatus(Oid subid, bool active)
+{
+ Relation rel;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+ HeapTuple tup;
+
+ /* Look up the subscription in the catalog */
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* Set the subscription to disabled. */
+ values[Anum_pg_subscription_subretentionactive - 1] = active;
+ replaces[Anum_pg_subscription_subretentionactive - 1] = true;
+
+ /* Update the catalog */
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+ heap_freetuple(tup);
+
+ table_close(rel, NoLock);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 1b3c5a55882..c77fa0234bb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1389,8 +1389,8 @@ REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner, subfailover,
- subretaindeadtuples, subslotname, subsynccommit,
- subpublications, suborigin)
+ subretaindeadtuples, submaxretention, subretentionactive,
+ subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0d74398faf3..82cf65fae73 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,8 +72,9 @@
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_FAILOVER 0x00002000
#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
-#define SUBOPT_LSN 0x00008000
-#define SUBOPT_ORIGIN 0x00010000
+#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
+#define SUBOPT_LSN 0x00010000
+#define SUBOPT_ORIGIN 0x00020000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -100,6 +101,7 @@ typedef struct SubOpts
bool runasowner;
bool failover;
bool retaindeadtuples;
+ int32 maxretention;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -168,6 +170,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->failover = false;
if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
opts->retaindeadtuples = false;
+ if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
+ opts->maxretention = 0;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -322,6 +326,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
opts->retaindeadtuples = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
+ strcmp(defel->defname, "max_retention_duration") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
+ opts->maxretention = defGetInt32(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -579,7 +592,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES |
+ SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -646,9 +660,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
stmt->subname)));
}
- /* Ensure that we can enable retain_dead_tuples */
- if (opts.retaindeadtuples)
- CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
+ /*
+ * Ensure that system configuration paramters are set appropriately to
+ * support retain_dead_tuples and max_retention_duration.
+ */
+ CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
+ opts.retaindeadtuples, opts.retaindeadtuples,
+ (opts.maxretention > 0));
if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
opts.slot_name == NULL)
@@ -692,6 +710,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
values[Anum_pg_subscription_subretaindeadtuples - 1] =
BoolGetDatum(opts.retaindeadtuples);
+ values[Anum_pg_subscription_submaxretention - 1] =
+ Int32GetDatum(opts.maxretention);
+ values[Anum_pg_subscription_subretentionactive - 1] =
+ Int32GetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -1175,6 +1197,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
bool update_two_phase = false;
bool check_pub_rdt = false;
bool retain_dead_tuples;
+ int max_retention;
+ bool retention_active;
char *origin;
Subscription *sub;
Form_pg_subscription form;
@@ -1205,6 +1229,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
retain_dead_tuples = sub->retaindeadtuples;
origin = sub->origin;
+ max_retention = sub->maxretention;
+ retention_active = sub->retentionactive;
/*
* Don't allow non-superuser modification of a subscription with
@@ -1234,7 +1260,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES |
+ SUBOPT_MAX_RETENTION_DURATION |
+ SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1400,6 +1428,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
BoolGetDatum(opts.retaindeadtuples);
replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
+ /*
+ * Update the retention status only if there's a change in
+ * the retain_dead_tuples option value.
+ *
+ * Automatically marking retention as active when
+ * retain_dead_tuples is enabled may not always be ideal,
+ * especially if retention was previously stopped and the
+ * user toggles retain_dead_tuples without adjusting the
+ * publisher workload. However, this behavior provides a
+ * convenient way for users to manually refresh the
+ * retention status. Since retention will be stopped again
+ * unless the publisher workload is reduced, this approach
+ * is acceptable for now.
+ */
+ if (opts.retaindeadtuples != sub->retaindeadtuples)
+ {
+ values[Anum_pg_subscription_subretentionactive - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
+ replaces[Anum_pg_subscription_subretentionactive - 1] = true;
+
+ retention_active = opts.retaindeadtuples;
+ }
+
CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
/*
@@ -1417,13 +1468,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
errhint("Try again after some time.")));
/*
- * Remind the user that enabling subscription will prevent
- * the accumulation of dead tuples.
- */
- if (opts.retaindeadtuples)
- CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
-
- /*
* Notify the launcher to manage the replication slot for
* conflict detection. This ensures that replication slot
* is efficiently handled (created, updated, or dropped)
@@ -1435,6 +1479,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
retain_dead_tuples = opts.retaindeadtuples;
}
+ if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
+ {
+ values[Anum_pg_subscription_submaxretention - 1] =
+ Int32GetDatum(opts.maxretention);
+ replaces[Anum_pg_subscription_submaxretention - 1] = true;
+
+ max_retention = opts.maxretention;
+ }
+
+ /*
+ * Ensure that system configuration paramters are set
+ * appropriately to support retain_dead_tuples and
+ * max_retention_duration.
+ */
+ if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
+ IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
+ CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
+ retain_dead_tuples,
+ retention_active,
+ (max_retention > 0));
+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
@@ -1472,9 +1537,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
* subscription in case it was disabled after creation. See
* comments atop CheckSubDeadTupleRetention() for details.
*/
- if (sub->retaindeadtuples)
- CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
- WARNING);
+ CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
+ WARNING, sub->retaindeadtuples,
+ sub->retentionactive, false);
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(opts.enabled);
@@ -2467,38 +2532,54 @@ check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
* this setting can be adjusted after subscription creation. Without it, the
* apply worker will simply skip conflict detection.
*
- * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
- * ERROR since users can only modify retain_dead_tuples for disabled
- * subscriptions. And as long as the subscription is enabled promptly, it will
- * not pose issues.
+ * Issue a WARNING or NOTICE if the subscription is disabled and the retention
+ * is active. Do not raise an ERROR since users can only modify
+ * retain_dead_tuples for disabled subscriptions. And as long as the
+ * subscription is enabled promptly, it will not pose issues.
+ *
+ * Issue a NOTICE to inform users that max_retention_duration is
+ * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
+ * is not issued because setting max_retention_duration causes no harm,
+ * even when it is ineffective.
*/
void
CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
- int elevel_for_sub_disabled)
+ int elevel_for_sub_disabled,
+ bool retain_dead_tuples, bool retention_active,
+ bool max_retention_set)
{
Assert(elevel_for_sub_disabled == NOTICE ||
elevel_for_sub_disabled == WARNING);
- if (check_guc && wal_level < WAL_LEVEL_REPLICA)
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
- errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
-
- if (check_guc && !track_commit_timestamp)
- ereport(WARNING,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
- errhint("Consider setting \"%s\" to true.",
- "track_commit_timestamp"));
-
- if (sub_disabled)
- ereport(elevel_for_sub_disabled,
+ if (retain_dead_tuples)
+ {
+ if (check_guc && wal_level < WAL_LEVEL_REPLICA)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
+ errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
+
+ if (check_guc && !track_commit_timestamp)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
+ errhint("Consider setting \"%s\" to true.",
+ "track_commit_timestamp"));
+
+ if (sub_disabled && retention_active)
+ ereport(elevel_for_sub_disabled,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
+ (elevel_for_sub_disabled > NOTICE)
+ ? errhint("Consider setting %s to false.",
+ "retain_dead_tuples") : 0);
+ }
+ else if (max_retention_set)
+ {
+ ereport(NOTICE,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
- (elevel_for_sub_disabled > NOTICE)
- ? errhint("Consider setting %s to false.",
- "retain_dead_tuples") : 0);
+ errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
+ }
}
/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 37377f7eb63..add2e2e066c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -43,6 +43,7 @@
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
/* max sleep time between cycles (3min) */
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
@@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
static bool acquire_conflict_slot_if_exists(void);
-static void advance_conflict_slot_xmin(TransactionId new_xmin);
+static void update_conflict_slot_xmin(TransactionId new_xmin);
+static void init_conflict_slot_xmin(void);
/*
@@ -152,6 +154,7 @@ get_subscription_list(void)
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
sub->retaindeadtuples = subform->subretaindeadtuples;
+ sub->retentionactive = subform->subretentionactive;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
- bool can_advance_xmin = true;
+ bool can_update_xmin = true;
bool retain_dead_tuples = false;
TransactionId xmin = InvalidTransactionId;
@@ -1215,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg)
retain_dead_tuples = true;
/*
- * Can't advance xmin of the slot unless all the subscriptions
- * with retain_dead_tuples are enabled. This is required to
- * ensure that we don't advance the xmin of
- * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
- * enabled. Otherwise, we won't be able to detect conflicts
- * reliably for such a subscription even though it has set the
- * retain_dead_tuples option.
- */
- can_advance_xmin &= sub->enabled;
-
- /*
* Create a replication slot to retain information necessary
* for conflict detection such as dead tuples, commit
* timestamps, and origins.
@@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg)
* subscription was enabled.
*/
CreateConflictDetectionSlot();
+
+ if (sub->retentionactive)
+ {
+ /*
+ * Can't advance xmin of the slot unless all the
+ * subscriptions actively retaining dead tuples are
+ * enabled. This is required to ensure that we don't
+ * advance the xmin of CONFLICT_DETECTION_SLOT if one of
+ * the subscriptions is not enabled. Otherwise, we won't
+ * be able to detect conflicts reliably for such a
+ * subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_update_xmin &= sub->enabled;
+
+ /*
+ * Initialize the slot once the subscription activiates
+ * retention.
+ */
+ if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
+ init_conflict_slot_xmin();
+ }
}
if (!sub->enabled)
@@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg)
/*
* Compute the minimum xmin required to protect dead tuples
* required for conflict detection among all running apply
- * workers that enables retain_dead_tuples.
+ * workers.
*/
- if (sub->retaindeadtuples && can_advance_xmin)
+ if (sub->retaindeadtuples &&
+ sub->retentionactive &&
+ can_update_xmin)
compute_min_nonremovable_xid(w, &xmin);
/* worker is running already */
@@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg)
/*
* Can't advance xmin of the slot unless all the workers
- * corresponding to subscriptions with retain_dead_tuples are
- * running, disabling the further computation of the minimum
+ * corresponding to subscriptions actively retaining dead tuples
+ * are running, disabling the further computation of the minimum
* nonremovable xid.
*/
- if (sub->retaindeadtuples)
- can_advance_xmin = false;
+ if (sub->retaindeadtuples && sub->retentionactive)
+ can_update_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg)
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID,
- sub->retaindeadtuples))
+ sub->retaindeadtuples &&
+ sub->retentionactive))
{
/*
* We get here either if we failed to launch a worker
@@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg)
* that requires us to retain dead tuples. Otherwise, if required,
* advance the slot's xmin to protect dead tuples required for the
* conflict detection.
+ *
+ * Additionally, if all apply workers for subscriptions with
+ * retain_dead_tuples enabled have requested to stop retention, the
+ * slot's xmin will be set to InvalidTransactionId allowing the
+ * removal of dead tuples.
*/
if (MyReplicationSlot)
{
if (!retain_dead_tuples)
ReplicationSlotDropAcquired();
- else if (can_advance_xmin)
- advance_conflict_slot_xmin(xmin);
+ else if (can_update_xmin)
+ update_conflict_slot_xmin(xmin);
}
/* Switch back to original memory context. */
@@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
nonremovable_xid = worker->oldest_nonremovable_xid;
SpinLockRelease(&worker->relmutex);
- Assert(TransactionIdIsValid(nonremovable_xid));
+ /*
+ * Return if the apply worker has stopped retention concurrently.
+ *
+ * Although this function is invoked only when retentionactive is true,
+ * the apply worker might stop retention after the launcher fetches the
+ * retentionactive flag.
+ */
+ if (!TransactionIdIsValid(nonremovable_xid))
+ return;
if (!TransactionIdIsValid(*xmin) ||
TransactionIdPrecedes(nonremovable_xid, *xmin))
@@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void)
}
/*
- * Advance the xmin the replication slot used to retain information required
+ * Update the xmin the replication slot used to retain information required
* for conflict detection.
*/
static void
-advance_conflict_slot_xmin(TransactionId new_xmin)
+update_conflict_slot_xmin(TransactionId new_xmin)
{
Assert(MyReplicationSlot);
- Assert(TransactionIdIsValid(new_xmin));
- Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+ Assert(!TransactionIdIsValid(new_xmin) ||
+ TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
- /* Return if the xmin value of the slot cannot be advanced */
+ /* Return if the xmin value of the slot cannot be updated */
if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
return;
@@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin)
}
/*
- * Create and acquire the replication slot used to retain information for
- * conflict detection, if not yet.
+ * Initialize the xmin for the conflict detection slot.
*/
-void
-CreateConflictDetectionSlot(void)
+static void
+init_conflict_slot_xmin(void)
{
TransactionId xmin_horizon;
- /* Exit early, if the replication slot is already created and acquired */
- if (MyReplicationSlot)
- return;
-
- ereport(LOG,
- errmsg("creating replication conflict detection slot"));
-
- ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
- false, false);
+ /* Replication slot must exist but shouldn't be initialized. */
+ Assert(MyReplicationSlot &&
+ !TransactionIdIsValid(MyReplicationSlot->data.xmin));
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -1476,6 +1499,26 @@ CreateConflictDetectionSlot(void)
}
/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ init_conflict_slot_xmin();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 22ad9051db3..f1ebd63e792 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -173,6 +173,14 @@
* Advance the non-removable transaction ID if the current flush location has
* reached or surpassed the last received WAL position.
*
+ * - RDT_STOP_CONFLICT_INFO_RETENTION:
+ * This phase is required only when max_retention_duration is defined. We
+ * enter this phase if the wait time in either the
+ * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
+ * configured max_retention_duration. In this phase,
+ * pg_subscription.subretentionactive is updated to false within a new
+ * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
+ *
* The overall state progression is: GET_CANDIDATE_XID ->
* REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -373,7 +381,8 @@ typedef enum
RDT_GET_CANDIDATE_XID,
RDT_REQUEST_PUBLISHER_STATUS,
RDT_WAIT_FOR_PUBLISHER_STATUS,
- RDT_WAIT_FOR_LOCAL_FLUSH
+ RDT_WAIT_FOR_LOCAL_FLUSH,
+ RDT_STOP_CONFLICT_INFO_RETENTION
} RetainDeadTuplesPhase;
/*
@@ -415,6 +424,9 @@ typedef struct RetainDeadTuplesData
* updated in final phase
* (RDT_WAIT_FOR_LOCAL_FLUSH) */
+ long table_sync_wait_time; /* time spent waiting for table sync
+ * to finish */
+
/*
* The following fields are used to determine the timing for the next
* round of transaction ID advancement.
@@ -555,6 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data);
static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
bool status_received);
static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
bool new_xid_found);
@@ -3219,7 +3234,6 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
TimestampTz *delete_time)
{
TransactionId oldestxmin;
- ReplicationSlot *slot;
/*
* Return false if either dead tuples are not retained or commit timestamp
@@ -3229,32 +3243,49 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
return false;
/*
- * For conflict detection, we use the conflict slot's xmin value instead
- * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
- * a threshold to identify tuples that were recently deleted. These tuples
- * are not visible to concurrent transactions, but we log an
- * update_deleted conflict if such a tuple matches the remote update being
- * applied.
+ * For conflict detection, we use the leader worker's
+ * oldest_nonremovable_xid value instead of invoking
+ * GetOldestNonRemovableTransactionId() or using the conflict detection
+ * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
+ * identify tuples that were recently deleted. These deleted tuples are no
+ * longer visible to concurrent transactions. However, if a remote update
+ * matches such a tuple, we log an update_deleted conflict.
*
- * Although GetOldestNonRemovableTransactionId() can return a value older
- * than the slot's xmin, for our current purpose it is acceptable to treat
- * tuples deleted by transactions prior to slot.xmin as update_missing
- * conflicts.
- *
- * Ideally, we would use oldest_nonremovable_xid, which is directly
- * maintained by the leader apply worker. However, this value is not
- * available to table synchronization or parallel apply workers, making
- * slot.xmin a practical alternative in those contexts.
+ * While GetOldestNonRemovableTransactionId() and slot.xmin may return
+ * transaction IDs older than oldest_nonremovable_xid, for our current
+ * purpose, it is acceptable to treat tuples deleted by transactions prior
+ * to oldest_nonremovable_xid as update_missing conflicts.
*/
- slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+ if (am_leader_apply_worker())
+ {
+ oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
+ }
+ else
+ {
+ LogicalRepWorker *leader;
- Assert(slot);
+ /*
+ * Obtain the information from the leader apply worker as only the
+ * leader manages conflict retention (see
+ * maybe_advance_nonremovable_xid() for details).
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ leader = logicalrep_worker_find(MyLogicalRepWorker->subid,
+ InvalidOid, false);
- SpinLockAcquire(&slot->mutex);
- oldestxmin = slot->data.xmin;
- SpinLockRelease(&slot->mutex);
+ SpinLockAcquire(&leader->relmutex);
+ oldestxmin = leader->oldest_nonremovable_xid;
+ SpinLockRelease(&leader->relmutex);
+ LWLockRelease(LogicalRepWorkerLock);
+ }
- Assert(TransactionIdIsValid(oldestxmin));
+ /*
+ * Return false if the leader apply worker has stopped retaining
+ * information for detecting conflicts. This implies that update_deleted
+ * can no longer be reliably detected.
+ */
+ if (!TransactionIdIsValid(oldestxmin))
+ return false;
if (OidIsValid(localidxoid) &&
IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
@@ -4108,11 +4139,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/*
* Ensure to wake up when it's possible to advance the non-removable
- * transaction ID.
+ * transaction ID, or when the retention duration may have exceeded
+ * max_retention_duration.
*/
- if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
- rdt_data.xid_advance_interval)
- wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+ if (MySubscription->retentionactive)
+ {
+ if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+ rdt_data.xid_advance_interval)
+ wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+ else if (MySubscription->maxretention > 0)
+ wait_time = Min(wait_time, MySubscription->maxretention);
+ }
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
@@ -4325,6 +4362,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
if (!MySubscription->retaindeadtuples)
return false;
+ /* No need to advance if we have already stopped retaining */
+ if (!MySubscription->retentionactive)
+ return false;
+
return true;
}
@@ -4350,6 +4391,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
case RDT_WAIT_FOR_LOCAL_FLUSH:
wait_for_local_flush(rdt_data);
break;
+ case RDT_STOP_CONFLICT_INFO_RETENTION:
+ stop_conflict_info_retention(rdt_data);
+ break;
}
}
@@ -4468,6 +4512,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
if (!status_received)
return;
+ /*
+ * We don't need to maintain oldest_nonremovable_xid if we decide to stop
+ * retaining conflict information for this worker.
+ */
+ if (should_stop_conflict_info_retention(rdt_data))
+ return;
+
if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
rdt_data->remote_wait_for = rdt_data->remote_nextxid;
@@ -4549,6 +4600,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* have a WAL position greater than the rdt_data->remote_lsn.
*/
if (!AllTablesyncsReady())
+ {
+ TimestampTz now;
+
+ now = rdt_data->last_recv_time
+ ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Record the time spent waiting for table sync, it is needed for the
+ * timeout check in should_stop_conflict_info_retention().
+ */
+ rdt_data->table_sync_wait_time =
+ TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
+
+ return;
+ }
+
+ /*
+ * We don't need to maintain oldest_nonremovable_xid if we decide to stop
+ * retaining conflict information for this worker.
+ */
+ if (should_stop_conflict_info_retention(rdt_data))
return;
/*
@@ -4594,12 +4666,114 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
/* Notify launcher to update the xmin of the conflict slot */
ApplyLauncherWakeup();
+ reset_retention_data_fields(rdt_data);
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Check whether conflict information retention should be stopped due to
+ * exceeding the maximum wait time (max_retention_duration).
+ *
+ * If retention should be stopped, transition to the
+ * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
+ * false.
+ *
+ * Note: Retention won't be resumed automatically. The user must manually
+ * disable retain_dead_tuples and re-enable it after confirming that the
+ * replication slot maintained by the launcher has been dropped.
+ */
+static bool
+should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+ TimestampTz now;
+
+ Assert(TransactionIdIsValid(rdt_data->candidate_xid));
+ Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
+ rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
+
+ if (!MySubscription->maxretention)
+ return false;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Return early if the wait time has not exceeded the configured maximum
+ * (max_retention_duration). Time spent waiting for table synchronization
+ * is excluded from this calculation, as it occurs infrequently.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+ MySubscription->maxretention +
+ rdt_data->table_sync_wait_time))
+ return false;
+
+ rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+
+ return true;
+}
+
+/*
+ * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
+ */
+static void
+stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+ /*
+ * Do not update the catalog during an active transaction. The transaction
+ * may be started during change application, leading to a possible
+ * rollback of catalog updates if the application fails subsequently.
+ */
+ if (IsTransactionState())
+ return;
+
+ StartTransactionCommand();
+
/*
- * Reset all data fields except those used to determine the timing for the
- * next round of transaction ID advancement. We can even use
- * flushpos_update_time in the next round to decide whether to get the
- * latest flush position.
+ * Updating pg_subscription might involve TOAST table access, so ensure we
+ * have a valid snapshot.
*/
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Set pg_subscription.subretentionactive to false */
+ UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
+ MySubscription->name),
+ errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
+ MySubscription->maxretention));
+
+ /* Notify launcher to update the conflict slot */
+ ApplyLauncherWakeup();
+
+ reset_retention_data_fields(rdt_data);
+}
+
+/*
+ * Reset all data fields of RetainDeadTuplesData except those used to
+ * determine the timing for the next round of transaction ID advancement. We
+ * can even use flushpos_update_time in the next round to decide whether to get
+ * the latest flush position.
+ */
+static void
+reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
+{
rdt_data->phase = RDT_GET_CANDIDATE_XID;
rdt_data->remote_lsn = InvalidXLogRecPtr;
rdt_data->remote_oldestxid = InvalidFullTransactionId;
@@ -4607,22 +4781,25 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
rdt_data->reply_time = 0;
rdt_data->remote_wait_for = InvalidFullTransactionId;
rdt_data->candidate_xid = InvalidTransactionId;
-
- /* process the next phase */
- process_rdt_phase_transition(rdt_data, false);
+ rdt_data->table_sync_wait_time = 0;
}
/*
* Adjust the interval for advancing non-removable transaction IDs.
*
- * We double the interval to try advancing the non-removable transaction IDs
- * if there is no activity on the node. The maximum value of the interval is
- * capped by wal_receiver_status_interval if it is not zero, otherwise to a
- * 3 minutes which should be sufficient to avoid using CPU or network
- * resources without much benefit.
+ * If there is no activity on the node, we progressively double the interval
+ * used to advance non-removable transaction ID. This helps conserve CPU
+ * and network resources when there's little benefit to frequent updates.
+ *
+ * The interval is capped by the lowest of the following:
+ * - wal_receiver_status_interval (if set),
+ * - a default maximum of 3 minutes,
+ * - max_retention_duration.
*
- * The interval is reset to a minimum value of 100ms once there is some
- * activity on the node.
+ * This ensures the interval never exceeds the retention boundary, even if
+ * other limits are higher. Once activity resumes on the node, the interval
+ * is reset to lesser of 100ms and max_retention_duration, allowing timely
+ * advancement of non-removable transaction ID.
*
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises.
@@ -4651,6 +4828,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
*/
rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
}
+
+ /* Ensure the wait time remains within the maximum limit */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+ MySubscription->maxretention);
}
/*
@@ -5458,11 +5639,12 @@ InitializeLogRepWorker(void)
* dropped, a restart is initiated.
*
* The oldest_nonremovable_xid should be initialized only when the
- * retain_dead_tuples is enabled before launching the worker. See
+ * subscription's retention is active before launching the worker. See
* logicalrep_worker_launch.
*/
if (am_leader_apply_worker() &&
MySubscription->retaindeadtuples &&
+ MySubscription->retentionactive &&
!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
{
ereport(LOG,
@@ -5633,8 +5815,9 @@ DisableSubscriptionAndExit(void)
* an error, as verifying commit timestamps is unnecessary in this
* context.
*/
- if (MySubscription->retaindeadtuples)
- CheckSubDeadTupleRetention(false, true, WARNING);
+ CheckSubDeadTupleRetention(false, true, WARNING,
+ MySubscription->retaindeadtuples,
+ MySubscription->retentionactive, false);
proc_exit(0);
}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index fc7a6639163..bea793456f9 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5048,6 +5048,7 @@ getSubscriptions(Archive *fout)
int i_subenabled;
int i_subfailover;
int i_subretaindeadtuples;
+ int i_submaxretention;
int i,
ntups;
@@ -5127,10 +5128,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
- " s.subretaindeadtuples\n");
+ " s.subretaindeadtuples,\n");
else
appendPQExpBufferStr(query,
- " false AS subretaindeadtuples\n");
+ " false AS subretaindeadtuples,\n");
+
+ if (fout->remoteVersion >= 190000)
+ appendPQExpBufferStr(query,
+ " s.submaxretention\n");
+ else
+ appendPQExpBuffer(query,
+ " 0 AS submaxretention\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
@@ -5165,6 +5173,7 @@ getSubscriptions(Archive *fout)
i_subrunasowner = PQfnumber(res, "subrunasowner");
i_subfailover = PQfnumber(res, "subfailover");
i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
+ i_submaxretention = PQfnumber(res, "submaxretention");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -5200,6 +5209,8 @@ getSubscriptions(Archive *fout)
(strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
subinfo[i].subretaindeadtuples =
(strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
+ subinfo[i].submaxretention =
+ atoi(PQgetvalue(res, i, i_submaxretention));
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subslotname))
@@ -5461,6 +5472,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
if (subinfo->subretaindeadtuples)
appendPQExpBufferStr(query, ", retain_dead_tuples = true");
+ if (subinfo->submaxretention)
+ appendPQExpBuffer(query, ", max_retention_duration = %d", subinfo->submaxretention);
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index dde85ed156c..bcc94ff07cc 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -717,6 +717,7 @@ typedef struct _SubscriptionInfo
bool subrunasowner;
bool subfailover;
bool subretaindeadtuples;
+ int submaxretention;
char *subconninfo;
char *subslotname;
char *subsynccommit;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 7a06af48842..4aa793d7de7 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false,
- false, false};
+ false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6815,10 +6815,20 @@ describeSubscriptions(const char *pattern, bool verbose)
", subfailover AS \"%s\"\n",
gettext_noop("Failover"));
if (pset.sversion >= 190000)
+ {
appendPQExpBuffer(&buf,
", subretaindeadtuples AS \"%s\"\n",
gettext_noop("Retain dead tuples"));
+ appendPQExpBuffer(&buf,
+ ", submaxretention AS \"%s\"\n",
+ gettext_noop("Max retention duration"));
+
+ appendPQExpBuffer(&buf,
+ ", subretentionactive AS \"%s\"\n",
+ gettext_noop("Retention active"));
+ }
+
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
", subconninfo AS \"%s\"\n",
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 8b10f2313f3..6b20a4404b2 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2321,7 +2321,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
- COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
+ COMPLETE_WITH("binary", "disable_on_error", "failover",
+ "max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
"synchronous_commit", "two_phase");
@@ -3780,7 +3781,8 @@ match_previous_words(int pattern_id,
/* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
- "disable_on_error", "enabled", "failover", "origin",
+ "disable_on_error", "enabled", "failover",
+ "max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
"synchronous_commit", "two_phase");
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 0ca415b4261..836369f163e 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202508131
+#define CATALOG_VERSION_NO 202509021
#endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 231ef84ec9a..55cb9b1eefa 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -81,6 +81,15 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subretaindeadtuples; /* True if dead tuples useful for
* conflict detection are retained */
+ int32 submaxretention; /* The maximum duration (in milliseconds)
+ * for which information useful for
+ * conflict detection can be retained */
+
+ bool subretentionactive; /* True if retain_dead_tuples is enabled
+ * and the retention duration has not
+ * exceeded max_retention_duration, when
+ * defined */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -136,6 +145,13 @@ typedef struct Subscription
* to be synchronized to the standbys. */
bool retaindeadtuples; /* True if dead tuples useful for conflict
* detection are retained */
+ int32 maxretention; /* The maximum duration (in milliseconds) for
+ * which information useful for conflict
+ * detection can be retained */
+ bool retentionactive; /* True if retain_dead_tuples is enabled
+ * and the retention duration has not
+ * exceeded max_retention_duration, when
+ * defined */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f458447a0e5..02f97a547dd 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -92,4 +92,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid);
extern bool HasSubscriptionRelations(Oid subid);
extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active);
+
#endif /* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 9b288ad22a6..fb4e26a51a4 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -31,6 +31,9 @@ extern char defGetStreamingMode(DefElem *def);
extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel);
extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
- int elevel_for_sub_disabled);
+ int elevel_for_sub_disabled,
+ bool retain_dead_tuples,
+ bool retention_active,
+ bool max_retention_set);
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7c0204dd6f4..62ea1a00580 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -94,6 +94,9 @@ typedef struct LogicalRepWorker
* The logical replication launcher manages an internal replication slot
* named "pg_conflict_detection". It asynchronously collects this ID to
* decide when to advance the xmin value of the slot.
+ *
+ * This ID is set to InvalidTransactionId when the apply worker stops
+ * retaining information needed for conflict detection.
*/
TransactionId oldest_nonremovable_xid;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index a98c97f7616..c7f1266fc2f 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345
(1 row)
-- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
(1 row)
BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000
(1 row)
-- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
-- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
-- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -433,10 +433,36 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - max_retention_duration must be integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = foo);
+ERROR: max_retention_duration requires an integer value
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = 1000);
+NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabled
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000
+(1 row)
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index f0f714fe747..ef0c298d2df 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -298,6 +298,22 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+-- fail - max_retention_duration must be integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = foo);
+
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = 1000);
+
+\dRs+
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
-- let's do some tests with pg_create_subscription rather than superuser
SET SESSION AUTHORIZATION regress_subscription_user3;
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 6b4a9fb8815..51b23a39fa9 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -387,6 +387,59 @@ ok( $logfile =~
'update target row was deleted in tab');
###############################################################################
+# Check that dead tuple retention stops due to the wait time surpassing
+# max_retention_duration.
+###############################################################################
+
+# Create a physical slot
+$node_B->safe_psql('postgres',
+ "SELECT * FROM pg_create_physical_replication_slot('blocker');");
+
+# Add the inactive physical slot to synchronized_standby_slots
+$node_B->append_conf('postgresql.conf',
+ "synchronized_standby_slots = 'blocker'");
+$node_B->reload;
+
+# Enable failover to activate the synchronized_standby_slots setting
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;");
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB SET (failover = true);");
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+
+# Insert a record
+$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (5, 5);");
+
+# Advance the xid on Node A to trigger the next cycle of oldest_nonremovable_xid
+# advancement.
+$node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+$log_offset = -s $node_A->logfile;
+
+# Set max_retention_duration to a minimal value to initiate retention stop.
+$node_A->safe_psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 1);");
+
+# Confirm that the retention is stopped
+$node_A->wait_for_log(
+ qr/logical replication worker for subscription "tap_sub_a_b" has stopped retaining the information for detecting conflicts/,
+ $log_offset);
+
+ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT xmin IS NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is invalid on Node A");
+
+$result = $node_A->safe_psql('postgres',
+ "SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
+is($result, qq(f), 'retention is inactive');
+
+# Drop the physical slot and reset the synchronized_standby_slots setting
+$node_B->safe_psql('postgres',
+ "SELECT * FROM pg_drop_replication_slot('blocker');");
+$node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$node_B->reload;
+
+###############################################################################
# Check that the replication slot pg_conflict_detection is dropped after
# removing all the subscriptions.
###############################################################################