diff options
-rw-r--r-- | doc/src/sgml/catalogs.sgml | 25 | ||||
-rw-r--r-- | doc/src/sgml/ref/alter_subscription.sgml | 5 | ||||
-rw-r--r-- | doc/src/sgml/ref/create_subscription.sgml | 43 | ||||
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 41 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 4 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 163 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 125 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 271 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_dump.c | 18 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_dump.h | 1 | ||||
-rw-r--r-- | src/bin/psql/describe.c | 12 | ||||
-rw-r--r-- | src/bin/psql/tab-complete.in.c | 6 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_subscription.h | 16 | ||||
-rw-r--r-- | src/include/catalog/pg_subscription_rel.h | 2 | ||||
-rw-r--r-- | src/include/commands/subscriptioncmds.h | 5 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 3 | ||||
-rw-r--r-- | src/test/regress/expected/subscription.out | 186 | ||||
-rw-r--r-- | src/test/regress/sql/subscription.sql | 16 | ||||
-rw-r--r-- | src/test/subscription/t/035_conflicts.pl | 53 |
20 files changed, 779 insertions, 218 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index da8a7882580..e9095bedf21 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8096,6 +8096,31 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <row> <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>submaxretention</structfield> <type>int4</type> + </para> + <para> + The maximum duration (in milliseconds) for which information (e.g., dead + tuples, commit timestamps, and origins) useful for conflict detection can + be retained. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>subretentionactive</structfield> <type>bool</type> + </para> + <para> + The retention status of information (e.g., dead tuples, commit + timestamps, and origins) useful for conflict detection. True if + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + is enabled, and the retention duration has not exceeded + <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>, + when defined. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> <structfield>subconninfo</structfield> <type>text</type> </para> <para> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index d48cdc76bd3..12f72ba3167 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -236,8 +236,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, - <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and - <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>. + <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>, and + <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>. Only a superuser can set <literal>password_required = false</literal>. </para> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 247c5bd2604..fc314437311 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -448,7 +448,7 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl If set to <literal>true</literal>, the detection of <xref linkend="conflict-update-deleted"/> is enabled, and a physical replication slot named <quote><literal>pg_conflict_detection</literal></quote> - created on the subscriber to prevent the information for detecting + is created on the subscriber to prevent the information for detecting conflicts from being removed. </para> @@ -521,6 +521,47 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> </listitem> </varlistentry> + + <varlistentry id="sql-createsubscription-params-with-max-retention-duration"> + <term><literal>max_retention_duration</literal> (<type>integer</type>)</term> + <listitem> + <para> + Maximum duration in milliseconds for which this subscription's apply worker + is allowed to retain the information useful for conflict detection when + <literal>retain_dead_tuples</literal> is enabled. The default value + is <literal>0</literal>, indicating that the information is retained + until it is no longer needed for detection purposes. + </para> + <para> + The information useful for conflict detection is no longer retained if + all apply workers associated with the subscriptions, where + <literal>retain_dead_tuples</literal> is enabled, confirm that the + retention duration has exceeded the + <literal>max_retention_duration</literal> set within the corresponding + subscription. The retention will not be automatically resumed unless a + new subscription is created with <literal>retain_dead_tuples = + true</literal>, or the user manually re-enables + <literal>retain_dead_tuples</literal>. + </para> + <para> + Note that overall retention will not stop if other subscriptions that + have a value greater than 0 for this parameter have not exceeded it, + or if they set this option to 0. + </para> + <para> + This option is effective only when + <literal>retain_conflict_info</literal> is enabled and the apply + worker associated with the subscription is active. + </para> + <warning> + <para> + Note that setting a non-zero value for this option could lead to + information for conflict detection being removed prematurely, + potentially resulting in incorrect conflict detection. + </para> + </warning> + </listitem> + </varlistentry> </variablelist></para> </listitem> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 244acf52f36..b885890de37 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,8 @@ GetSubscription(Oid subid, bool missing_ok) sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; sub->retaindeadtuples = subform->subretaindeadtuples; + sub->maxretention = subform->submaxretention; + sub->retentionactive = subform->subretentionactive; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, @@ -598,3 +600,42 @@ GetSubscriptionRelations(Oid subid, bool not_ready) return res; } + +/* + * Update the dead tuple retention status for the given subscription. + */ +void +UpdateDeadTupleRetentionStatus(Oid subid, bool active) +{ + Relation rel; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + HeapTuple tup; + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for subscription %u", subid); + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Set the subscription to disabled. */ + values[Anum_pg_subscription_subretentionactive - 1] = active; + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + + /* Update the catalog */ + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + heap_freetuple(tup); + + table_close(rel, NoLock); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 1b3c5a55882..c77fa0234bb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1389,8 +1389,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subretaindeadtuples, subslotname, subsynccommit, - subpublications, suborigin) + subretaindeadtuples, submaxretention, subretentionactive, + subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0d74398faf3..82cf65fae73 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -72,8 +72,9 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 -#define SUBOPT_LSN 0x00008000 -#define SUBOPT_ORIGIN 0x00010000 +#define SUBOPT_MAX_RETENTION_DURATION 0x00008000 +#define SUBOPT_LSN 0x00010000 +#define SUBOPT_ORIGIN 0x00020000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -100,6 +101,7 @@ typedef struct SubOpts bool runasowner; bool failover; bool retaindeadtuples; + int32 maxretention; char *origin; XLogRecPtr lsn; } SubOpts; @@ -168,6 +170,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) opts->retaindeadtuples = false; + if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION)) + opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -322,6 +326,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; opts->retaindeadtuples = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) && + strcmp(defel->defname, "max_retention_duration") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION; + opts->maxretention = defGetInt32(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -579,7 +592,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -646,9 +660,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, stmt->subname))); } - /* Ensure that we can enable retain_dead_tuples */ - if (opts.retaindeadtuples) - CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + /* + * Ensure that system configuration paramters are set appropriately to + * support retain_dead_tuples and max_retention_duration. + */ + CheckSubDeadTupleRetention(true, !opts.enabled, WARNING, + opts.retaindeadtuples, opts.retaindeadtuples, + (opts.maxretention > 0)); if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) @@ -692,6 +710,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subretaindeadtuples - 1] = BoolGetDatum(opts.retaindeadtuples); + values[Anum_pg_subscription_submaxretention - 1] = + Int32GetDatum(opts.maxretention); + values[Anum_pg_subscription_subretentionactive - 1] = + Int32GetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1175,6 +1197,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_two_phase = false; bool check_pub_rdt = false; bool retain_dead_tuples; + int max_retention; + bool retention_active; char *origin; Subscription *sub; Form_pg_subscription form; @@ -1205,6 +1229,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, retain_dead_tuples = sub->retaindeadtuples; origin = sub->origin; + max_retention = sub->maxretention; + retention_active = sub->retentionactive; /* * Don't allow non-superuser modification of a subscription with @@ -1234,7 +1260,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_RETENTION_DURATION | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1400,6 +1428,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, BoolGetDatum(opts.retaindeadtuples); replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + /* + * Update the retention status only if there's a change in + * the retain_dead_tuples option value. + * + * Automatically marking retention as active when + * retain_dead_tuples is enabled may not always be ideal, + * especially if retention was previously stopped and the + * user toggles retain_dead_tuples without adjusting the + * publisher workload. However, this behavior provides a + * convenient way for users to manually refresh the + * retention status. Since retention will be stopped again + * unless the publisher workload is reduced, this approach + * is acceptable for now. + */ + if (opts.retaindeadtuples != sub->retaindeadtuples) + { + values[Anum_pg_subscription_subretentionactive - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + + retention_active = opts.retaindeadtuples; + } + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); /* @@ -1417,13 +1468,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Try again after some time."))); /* - * Remind the user that enabling subscription will prevent - * the accumulation of dead tuples. - */ - if (opts.retaindeadtuples) - CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE); - - /* * Notify the launcher to manage the replication slot for * conflict detection. This ensures that replication slot * is efficiently handled (created, updated, or dropped) @@ -1435,6 +1479,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, retain_dead_tuples = opts.retaindeadtuples; } + if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION)) + { + values[Anum_pg_subscription_submaxretention - 1] = + Int32GetDatum(opts.maxretention); + replaces[Anum_pg_subscription_submaxretention - 1] = true; + + max_retention = opts.maxretention; + } + + /* + * Ensure that system configuration paramters are set + * appropriately to support retain_dead_tuples and + * max_retention_duration. + */ + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) || + IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION)) + CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE, + retain_dead_tuples, + retention_active, + (max_retention > 0)); + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1472,9 +1537,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * subscription in case it was disabled after creation. See * comments atop CheckSubDeadTupleRetention() for details. */ - if (sub->retaindeadtuples) - CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, - WARNING); + CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, + WARNING, sub->retaindeadtuples, + sub->retentionactive, false); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); @@ -2467,38 +2532,54 @@ check_pub_dead_tuple_retention(WalReceiverConn *wrconn) * this setting can be adjusted after subscription creation. Without it, the * apply worker will simply skip conflict detection. * - * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an - * ERROR since users can only modify retain_dead_tuples for disabled - * subscriptions. And as long as the subscription is enabled promptly, it will - * not pose issues. + * Issue a WARNING or NOTICE if the subscription is disabled and the retention + * is active. Do not raise an ERROR since users can only modify + * retain_dead_tuples for disabled subscriptions. And as long as the + * subscription is enabled promptly, it will not pose issues. + * + * Issue a NOTICE to inform users that max_retention_duration is + * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR + * is not issued because setting max_retention_duration causes no harm, + * even when it is ineffective. */ void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, - int elevel_for_sub_disabled) + int elevel_for_sub_disabled, + bool retain_dead_tuples, bool retention_active, + bool max_retention_set) { Assert(elevel_for_sub_disabled == NOTICE || elevel_for_sub_disabled == WARNING); - if (check_guc && wal_level < WAL_LEVEL_REPLICA) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), - errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); - - if (check_guc && !track_commit_timestamp) - ereport(WARNING, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), - errhint("Consider setting \"%s\" to true.", - "track_commit_timestamp")); - - if (sub_disabled) - ereport(elevel_for_sub_disabled, + if (retain_dead_tuples) + { + if (check_guc && wal_level < WAL_LEVEL_REPLICA) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), + errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); + + if (check_guc && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), + errhint("Consider setting \"%s\" to true.", + "track_commit_timestamp")); + + if (sub_disabled && retention_active) + ereport(elevel_for_sub_disabled, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), + (elevel_for_sub_disabled > NOTICE) + ? errhint("Consider setting %s to false.", + "retain_dead_tuples") : 0); + } + else if (max_retention_set) + { + ereport(NOTICE, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), - (elevel_for_sub_disabled > NOTICE) - ? errhint("Consider setting %s to false.", - "retain_dead_tuples") : 0); + errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled")); + } } /* diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 37377f7eb63..add2e2e066c 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -43,6 +43,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L @@ -102,7 +103,8 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); -static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void update_conflict_slot_xmin(TransactionId new_xmin); +static void init_conflict_slot_xmin(void); /* @@ -152,6 +154,7 @@ get_subscription_list(void) sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); sub->retaindeadtuples = subform->subretaindeadtuples; + sub->retentionactive = subform->subretentionactive; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -1181,7 +1184,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; - bool can_advance_xmin = true; + bool can_update_xmin = true; bool retain_dead_tuples = false; TransactionId xmin = InvalidTransactionId; @@ -1215,17 +1218,6 @@ ApplyLauncherMain(Datum main_arg) retain_dead_tuples = true; /* - * Can't advance xmin of the slot unless all the subscriptions - * with retain_dead_tuples are enabled. This is required to - * ensure that we don't advance the xmin of - * CONFLICT_DETECTION_SLOT if one of the subscriptions is not - * enabled. Otherwise, we won't be able to detect conflicts - * reliably for such a subscription even though it has set the - * retain_dead_tuples option. - */ - can_advance_xmin &= sub->enabled; - - /* * Create a replication slot to retain information necessary * for conflict detection such as dead tuples, commit * timestamps, and origins. @@ -1240,6 +1232,28 @@ ApplyLauncherMain(Datum main_arg) * subscription was enabled. */ CreateConflictDetectionSlot(); + + if (sub->retentionactive) + { + /* + * Can't advance xmin of the slot unless all the + * subscriptions actively retaining dead tuples are + * enabled. This is required to ensure that we don't + * advance the xmin of CONFLICT_DETECTION_SLOT if one of + * the subscriptions is not enabled. Otherwise, we won't + * be able to detect conflicts reliably for such a + * subscription even though it has set the + * retain_dead_tuples option. + */ + can_update_xmin &= sub->enabled; + + /* + * Initialize the slot once the subscription activiates + * retention. + */ + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin)) + init_conflict_slot_xmin(); + } } if (!sub->enabled) @@ -1254,9 +1268,11 @@ ApplyLauncherMain(Datum main_arg) /* * Compute the minimum xmin required to protect dead tuples * required for conflict detection among all running apply - * workers that enables retain_dead_tuples. + * workers. */ - if (sub->retaindeadtuples && can_advance_xmin) + if (sub->retaindeadtuples && + sub->retentionactive && + can_update_xmin) compute_min_nonremovable_xid(w, &xmin); /* worker is running already */ @@ -1265,12 +1281,12 @@ ApplyLauncherMain(Datum main_arg) /* * Can't advance xmin of the slot unless all the workers - * corresponding to subscriptions with retain_dead_tuples are - * running, disabling the further computation of the minimum + * corresponding to subscriptions actively retaining dead tuples + * are running, disabling the further computation of the minimum * nonremovable xid. */ - if (sub->retaindeadtuples) - can_advance_xmin = false; + if (sub->retaindeadtuples && sub->retentionactive) + can_update_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1295,7 +1311,8 @@ ApplyLauncherMain(Datum main_arg) sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID, - sub->retaindeadtuples)) + sub->retaindeadtuples && + sub->retentionactive)) { /* * We get here either if we failed to launch a worker @@ -1320,13 +1337,18 @@ ApplyLauncherMain(Datum main_arg) * that requires us to retain dead tuples. Otherwise, if required, * advance the slot's xmin to protect dead tuples required for the * conflict detection. + * + * Additionally, if all apply workers for subscriptions with + * retain_dead_tuples enabled have requested to stop retention, the + * slot's xmin will be set to InvalidTransactionId allowing the + * removal of dead tuples. */ if (MyReplicationSlot) { if (!retain_dead_tuples) ReplicationSlotDropAcquired(); - else if (can_advance_xmin) - advance_conflict_slot_xmin(xmin); + else if (can_update_xmin) + update_conflict_slot_xmin(xmin); } /* Switch back to original memory context. */ @@ -1378,7 +1400,15 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) nonremovable_xid = worker->oldest_nonremovable_xid; SpinLockRelease(&worker->relmutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + /* + * Return if the apply worker has stopped retention concurrently. + * + * Although this function is invoked only when retentionactive is true, + * the apply worker might stop retention after the launcher fetches the + * retentionactive flag. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + return; if (!TransactionIdIsValid(*xmin) || TransactionIdPrecedes(nonremovable_xid, *xmin)) @@ -1402,17 +1432,17 @@ acquire_conflict_slot_if_exists(void) } /* - * Advance the xmin the replication slot used to retain information required + * Update the xmin the replication slot used to retain information required * for conflict detection. */ static void -advance_conflict_slot_xmin(TransactionId new_xmin) +update_conflict_slot_xmin(TransactionId new_xmin) { Assert(MyReplicationSlot); - Assert(TransactionIdIsValid(new_xmin)); - Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + Assert(!TransactionIdIsValid(new_xmin) || + TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); - /* Return if the xmin value of the slot cannot be advanced */ + /* Return if the xmin value of the slot cannot be updated */ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) return; @@ -1439,23 +1469,16 @@ advance_conflict_slot_xmin(TransactionId new_xmin) } /* - * Create and acquire the replication slot used to retain information for - * conflict detection, if not yet. + * Initialize the xmin for the conflict detection slot. */ -void -CreateConflictDetectionSlot(void) +static void +init_conflict_slot_xmin(void) { TransactionId xmin_horizon; - /* Exit early, if the replication slot is already created and acquired */ - if (MyReplicationSlot) - return; - - ereport(LOG, - errmsg("creating replication conflict detection slot")); - - ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + /* Replication slot must exist but shouldn't be initialized. */ + Assert(MyReplicationSlot && + !TransactionIdIsValid(MyReplicationSlot->data.xmin)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1476,6 +1499,26 @@ CreateConflictDetectionSlot(void) } /* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + init_conflict_slot_xmin(); +} + +/* * Is current process the logical replication launcher? */ bool diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 22ad9051db3..f1ebd63e792 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,6 +173,14 @@ * Advance the non-removable transaction ID if the current flush location has * reached or surpassed the last received WAL position. * + * - RDT_STOP_CONFLICT_INFO_RETENTION: + * This phase is required only when max_retention_duration is defined. We + * enter this phase if the wait time in either the + * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds + * configured max_retention_duration. In this phase, + * pg_subscription.subretentionactive is updated to false within a new + * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId. + * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> @@ -373,7 +381,8 @@ typedef enum RDT_GET_CANDIDATE_XID, RDT_REQUEST_PUBLISHER_STATUS, RDT_WAIT_FOR_PUBLISHER_STATUS, - RDT_WAIT_FOR_LOCAL_FLUSH + RDT_WAIT_FOR_LOCAL_FLUSH, + RDT_STOP_CONFLICT_INFO_RETENTION } RetainDeadTuplesPhase; /* @@ -415,6 +424,9 @@ typedef struct RetainDeadTuplesData * updated in final phase * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + long table_sync_wait_time; /* time spent waiting for table sync + * to finish */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -555,6 +567,9 @@ static void request_publisher_status(RetainDeadTuplesData *rdt_data); static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, bool status_received); static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data); +static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data); static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found); @@ -3219,7 +3234,6 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, TimestampTz *delete_time) { TransactionId oldestxmin; - ReplicationSlot *slot; /* * Return false if either dead tuples are not retained or commit timestamp @@ -3229,32 +3243,49 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, return false; /* - * For conflict detection, we use the conflict slot's xmin value instead - * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as - * a threshold to identify tuples that were recently deleted. These tuples - * are not visible to concurrent transactions, but we log an - * update_deleted conflict if such a tuple matches the remote update being - * applied. + * For conflict detection, we use the leader worker's + * oldest_nonremovable_xid value instead of invoking + * GetOldestNonRemovableTransactionId() or using the conflict detection + * slot's xmin. The oldest_nonremovable_xid acts as a threshold to + * identify tuples that were recently deleted. These deleted tuples are no + * longer visible to concurrent transactions. However, if a remote update + * matches such a tuple, we log an update_deleted conflict. * - * Although GetOldestNonRemovableTransactionId() can return a value older - * than the slot's xmin, for our current purpose it is acceptable to treat - * tuples deleted by transactions prior to slot.xmin as update_missing - * conflicts. - * - * Ideally, we would use oldest_nonremovable_xid, which is directly - * maintained by the leader apply worker. However, this value is not - * available to table synchronization or parallel apply workers, making - * slot.xmin a practical alternative in those contexts. + * While GetOldestNonRemovableTransactionId() and slot.xmin may return + * transaction IDs older than oldest_nonremovable_xid, for our current + * purpose, it is acceptable to treat tuples deleted by transactions prior + * to oldest_nonremovable_xid as update_missing conflicts. */ - slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true); + if (am_leader_apply_worker()) + { + oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid; + } + else + { + LogicalRepWorker *leader; - Assert(slot); + /* + * Obtain the information from the leader apply worker as only the + * leader manages conflict retention (see + * maybe_advance_nonremovable_xid() for details). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + leader = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); - SpinLockAcquire(&slot->mutex); - oldestxmin = slot->data.xmin; - SpinLockRelease(&slot->mutex); + SpinLockAcquire(&leader->relmutex); + oldestxmin = leader->oldest_nonremovable_xid; + SpinLockRelease(&leader->relmutex); + LWLockRelease(LogicalRepWorkerLock); + } - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; if (OidIsValid(localidxoid) && IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin)) @@ -4108,11 +4139,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Ensure to wake up when it's possible to advance the non-removable - * transaction ID. + * transaction ID, or when the retention duration may have exceeded + * max_retention_duration. */ - if (rdt_data.phase == RDT_GET_CANDIDATE_XID && - rdt_data.xid_advance_interval) - wait_time = Min(wait_time, rdt_data.xid_advance_interval); + if (MySubscription->retentionactive) + { + if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + rdt_data.xid_advance_interval) + wait_time = Min(wait_time, rdt_data.xid_advance_interval); + else if (MySubscription->maxretention > 0) + wait_time = Min(wait_time, MySubscription->maxretention); + } rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | @@ -4325,6 +4362,10 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; + /* No need to advance if we have already stopped retaining */ + if (!MySubscription->retentionactive) + return false; + return true; } @@ -4350,6 +4391,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, case RDT_WAIT_FOR_LOCAL_FLUSH: wait_for_local_flush(rdt_data); break; + case RDT_STOP_CONFLICT_INFO_RETENTION: + stop_conflict_info_retention(rdt_data); + break; } } @@ -4468,6 +4512,13 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, if (!status_received) return; + /* + * We don't need to maintain oldest_nonremovable_xid if we decide to stop + * retaining conflict information for this worker. + */ + if (should_stop_conflict_info_retention(rdt_data)) + return; + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) rdt_data->remote_wait_for = rdt_data->remote_nextxid; @@ -4549,6 +4600,27 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * have a WAL position greater than the rdt_data->remote_lsn. */ if (!AllTablesyncsReady()) + { + TimestampTz now; + + now = rdt_data->last_recv_time + ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Record the time spent waiting for table sync, it is needed for the + * timeout check in should_stop_conflict_info_retention(). + */ + rdt_data->table_sync_wait_time = + TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now); + + return; + } + + /* + * We don't need to maintain oldest_nonremovable_xid if we decide to stop + * retaining conflict information for this worker. + */ + if (should_stop_conflict_info_retention(rdt_data)) return; /* @@ -4594,12 +4666,114 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) /* Notify launcher to update the xmin of the conflict slot */ ApplyLauncherWakeup(); + reset_retention_data_fields(rdt_data); + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Check whether conflict information retention should be stopped due to + * exceeding the maximum wait time (max_retention_duration). + * + * If retention should be stopped, transition to the + * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return + * false. + * + * Note: Retention won't be resumed automatically. The user must manually + * disable retain_dead_tuples and re-enable it after confirming that the + * replication slot maintained by the launcher has been dropped. + */ +static bool +should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + TimestampTz now; + + Assert(TransactionIdIsValid(rdt_data->candidate_xid)); + Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS || + rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH); + + if (!MySubscription->maxretention) + return false; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Return early if the wait time has not exceeded the configured maximum + * (max_retention_duration). Time spent waiting for table synchronization + * is excluded from this calculation, as it occurs infrequently. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + MySubscription->maxretention + + rdt_data->table_sync_wait_time)) + return false; + + rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); + + return true; +} + +/* + * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase. + */ +static void +stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) +{ + /* + * Do not update the catalog during an active transaction. The transaction + * may be started during change application, leading to a possible + * rollback of catalog updates if the application fails subsequently. + */ + if (IsTransactionState()) + return; + + StartTransactionCommand(); + /* - * Reset all data fields except those used to determine the timing for the - * next round of transaction ID advancement. We can even use - * flushpos_update_time in the next round to decide whether to get the - * latest flush position. + * Updating pg_subscription might involve TOAST table access, so ensure we + * have a valid snapshot. */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Set pg_subscription.subretentionactive to false */ + UpdateDeadTupleRetentionStatus(MySubscription->oid, false); + + PopActiveSnapshot(); + CommitTransactionCommand(); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts", + MySubscription->name), + errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.", + MySubscription->maxretention)); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + + reset_retention_data_fields(rdt_data); +} + +/* + * Reset all data fields of RetainDeadTuplesData except those used to + * determine the timing for the next round of transaction ID advancement. We + * can even use flushpos_update_time in the next round to decide whether to get + * the latest flush position. + */ +static void +reset_retention_data_fields(RetainDeadTuplesData *rdt_data) +{ rdt_data->phase = RDT_GET_CANDIDATE_XID; rdt_data->remote_lsn = InvalidXLogRecPtr; rdt_data->remote_oldestxid = InvalidFullTransactionId; @@ -4607,22 +4781,25 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) rdt_data->reply_time = 0; rdt_data->remote_wait_for = InvalidFullTransactionId; rdt_data->candidate_xid = InvalidTransactionId; - - /* process the next phase */ - process_rdt_phase_transition(rdt_data, false); + rdt_data->table_sync_wait_time = 0; } /* * Adjust the interval for advancing non-removable transaction IDs. * - * We double the interval to try advancing the non-removable transaction IDs - * if there is no activity on the node. The maximum value of the interval is - * capped by wal_receiver_status_interval if it is not zero, otherwise to a - * 3 minutes which should be sufficient to avoid using CPU or network - * resources without much benefit. + * If there is no activity on the node, we progressively double the interval + * used to advance non-removable transaction ID. This helps conserve CPU + * and network resources when there's little benefit to frequent updates. + * + * The interval is capped by the lowest of the following: + * - wal_receiver_status_interval (if set), + * - a default maximum of 3 minutes, + * - max_retention_duration. * - * The interval is reset to a minimum value of 100ms once there is some - * activity on the node. + * This ensures the interval never exceeds the retention boundary, even if + * other limits are higher. Once activity resumes on the node, the interval + * is reset to lesser of 100ms and max_retention_duration, allowing timely + * advancement of non-removable transaction ID. * * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can * consider the other interval or a separate GUC if the need arises. @@ -4651,6 +4828,10 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) */ rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; } + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxretention); } /* @@ -5458,11 +5639,12 @@ InitializeLogRepWorker(void) * dropped, a restart is initiated. * * The oldest_nonremovable_xid should be initialized only when the - * retain_dead_tuples is enabled before launching the worker. See + * subscription's retention is active before launching the worker. See * logicalrep_worker_launch. */ if (am_leader_apply_worker() && MySubscription->retaindeadtuples && + MySubscription->retentionactive && !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) { ereport(LOG, @@ -5633,8 +5815,9 @@ DisableSubscriptionAndExit(void) * an error, as verifying commit timestamps is unnecessary in this * context. */ - if (MySubscription->retaindeadtuples) - CheckSubDeadTupleRetention(false, true, WARNING); + CheckSubDeadTupleRetention(false, true, WARNING, + MySubscription->retaindeadtuples, + MySubscription->retentionactive, false); proc_exit(0); } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fc7a6639163..bea793456f9 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5048,6 +5048,7 @@ getSubscriptions(Archive *fout) int i_subenabled; int i_subfailover; int i_subretaindeadtuples; + int i_submaxretention; int i, ntups; @@ -5127,10 +5128,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.subretaindeadtuples\n"); + " s.subretaindeadtuples,\n"); else appendPQExpBufferStr(query, - " false AS subretaindeadtuples\n"); + " false AS subretaindeadtuples,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.submaxretention\n"); + else + appendPQExpBuffer(query, + " 0 AS submaxretention\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5165,6 +5173,7 @@ getSubscriptions(Archive *fout) i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); + i_submaxretention = PQfnumber(res, "submaxretention"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5200,6 +5209,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); subinfo[i].subretaindeadtuples = (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); + subinfo[i].submaxretention = + atoi(PQgetvalue(res, i, i_submaxretention)); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5461,6 +5472,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subretaindeadtuples) appendPQExpBufferStr(query, ", retain_dead_tuples = true"); + if (subinfo->submaxretention) + appendPQExpBuffer(query, ", max_retention_duration = %d", subinfo->submaxretention); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index dde85ed156c..bcc94ff07cc 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -717,6 +717,7 @@ typedef struct _SubscriptionInfo bool subrunasowner; bool subfailover; bool subretaindeadtuples; + int submaxretention; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7a06af48842..4aa793d7de7 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false}; + false, false, false, false}; if (pset.sversion < 100000) { @@ -6815,10 +6815,20 @@ describeSubscriptions(const char *pattern, bool verbose) ", subfailover AS \"%s\"\n", gettext_noop("Failover")); if (pset.sversion >= 190000) + { appendPQExpBuffer(&buf, ", subretaindeadtuples AS \"%s\"\n", gettext_noop("Retain dead tuples")); + appendPQExpBuffer(&buf, + ", submaxretention AS \"%s\"\n", + gettext_noop("Max retention duration")); + + appendPQExpBuffer(&buf, + ", subretentionactive AS \"%s\"\n", + gettext_noop("Retention active")); + } + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8b10f2313f3..6b20a4404b2 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2321,7 +2321,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION <name> SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", + COMPLETE_WITH("binary", "disable_on_error", "failover", + "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3780,7 +3781,8 @@ match_previous_words(int pattern_id, /* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", "origin", + "disable_on_error", "enabled", "failover", + "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 0ca415b4261..836369f163e 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202508131 +#define CATALOG_VERSION_NO 202509021 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 231ef84ec9a..55cb9b1eefa 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -81,6 +81,15 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subretaindeadtuples; /* True if dead tuples useful for * conflict detection are retained */ + int32 submaxretention; /* The maximum duration (in milliseconds) + * for which information useful for + * conflict detection can be retained */ + + bool subretentionactive; /* True if retain_dead_tuples is enabled + * and the retention duration has not + * exceeded max_retention_duration, when + * defined */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -136,6 +145,13 @@ typedef struct Subscription * to be synchronized to the standbys. */ bool retaindeadtuples; /* True if dead tuples useful for conflict * detection are retained */ + int32 maxretention; /* The maximum duration (in milliseconds) for + * which information useful for conflict + * detection can be retained */ + bool retentionactive; /* True if retain_dead_tuples is enabled + * and the retention duration has not + * exceeded max_retention_duration, when + * defined */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index f458447a0e5..02f97a547dd 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -92,4 +92,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); + #endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 9b288ad22a6..fb4e26a51a4 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -31,6 +31,9 @@ extern char defGetStreamingMode(DefElem *def); extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel); extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, - int elevel_for_sub_disabled); + int elevel_for_sub_disabled, + bool retain_dead_tuples, + bool retention_active, + bool max_retention_set); #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..62ea1a00580 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -94,6 +94,9 @@ typedef struct LogicalRepWorker * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. + * + * This ID is set to InvalidTransactionId when the apply worker stops + * retaining information needed for conflict detection. */ TransactionId oldest_nonremovable_xid; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a98c97f7616..c7f1266fc2f 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,36 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - max_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = foo); +ERROR: max_retention_duration requires an integer value +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = 1000); +NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabled +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index f0f714fe747..ef0c298d2df 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -298,6 +298,22 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - max_retention_duration must be integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = foo); + +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, max_retention_duration = 1000); + +\dRs+ + +-- ok +ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 6b4a9fb8815..51b23a39fa9 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -387,6 +387,59 @@ ok( $logfile =~ 'update target row was deleted in tab'); ############################################################################### +# Check that dead tuple retention stops due to the wait time surpassing +# max_retention_duration. +############################################################################### + +# Create a physical slot +$node_B->safe_psql('postgres', + "SELECT * FROM pg_create_physical_replication_slot('blocker');"); + +# Add the inactive physical slot to synchronized_standby_slots +$node_B->append_conf('postgresql.conf', + "synchronized_standby_slots = 'blocker'"); +$node_B->reload; + +# Enable failover to activate the synchronized_standby_slots setting +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;"); +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB SET (failover = true);"); +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); + +# Insert a record +$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (5, 5);"); + +# Advance the xid on Node A to trigger the next cycle of oldest_nonremovable_xid +# advancement. +$node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); + +$log_offset = -s $node_A->logfile; + +# Set max_retention_duration to a minimal value to initiate retention stop. +$node_A->safe_psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 1);"); + +# Confirm that the retention is stopped +$node_A->wait_for_log( + qr/logical replication worker for subscription "tap_sub_a_b" has stopped retaining the information for detecting conflicts/, + $log_offset); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is invalid on Node A"); + +$result = $node_A->safe_psql('postgres', + "SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';"); +is($result, qq(f), 'retention is inactive'); + +# Drop the physical slot and reset the synchronized_standby_slots setting +$node_B->safe_psql('postgres', + "SELECT * FROM pg_drop_replication_slot('blocker');"); +$node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$node_B->reload; + +############################################################################### # Check that the replication slot pg_conflict_detection is dropped after # removing all the subscriptions. ############################################################################### |