diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 163 |
1 files changed, 122 insertions, 41 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0d74398faf3..82cf65fae73 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -72,8 +72,9 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 -#define SUBOPT_LSN 0x00008000 -#define SUBOPT_ORIGIN 0x00010000 +#define SUBOPT_MAX_RETENTION_DURATION 0x00008000 +#define SUBOPT_LSN 0x00010000 +#define SUBOPT_ORIGIN 0x00020000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -100,6 +101,7 @@ typedef struct SubOpts bool runasowner; bool failover; bool retaindeadtuples; + int32 maxretention; char *origin; XLogRecPtr lsn; } SubOpts; @@ -168,6 +170,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) opts->retaindeadtuples = false; + if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION)) + opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -322,6 +326,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; opts->retaindeadtuples = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) && + strcmp(defel->defname, "max_retention_duration") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION; + opts->maxretention = defGetInt32(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -579,7 +592,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -646,9 +660,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, stmt->subname))); } - /* Ensure that we can enable retain_dead_tuples */ - if (opts.retaindeadtuples) - CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + /* + * Ensure that system configuration paramters are set appropriately to + * support retain_dead_tuples and max_retention_duration. + */ + CheckSubDeadTupleRetention(true, !opts.enabled, WARNING, + opts.retaindeadtuples, opts.retaindeadtuples, + (opts.maxretention > 0)); if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) @@ -692,6 +710,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); values[Anum_pg_subscription_subretaindeadtuples - 1] = BoolGetDatum(opts.retaindeadtuples); + values[Anum_pg_subscription_submaxretention - 1] = + Int32GetDatum(opts.maxretention); + values[Anum_pg_subscription_subretentionactive - 1] = + Int32GetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1175,6 +1197,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_two_phase = false; bool check_pub_rdt = false; bool retain_dead_tuples; + int max_retention; + bool retention_active; char *origin; Subscription *sub; Form_pg_subscription form; @@ -1205,6 +1229,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, retain_dead_tuples = sub->retaindeadtuples; origin = sub->origin; + max_retention = sub->maxretention; + retention_active = sub->retentionactive; /* * Don't allow non-superuser modification of a subscription with @@ -1234,7 +1260,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | + SUBOPT_MAX_RETENTION_DURATION | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1400,6 +1428,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, BoolGetDatum(opts.retaindeadtuples); replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + /* + * Update the retention status only if there's a change in + * the retain_dead_tuples option value. + * + * Automatically marking retention as active when + * retain_dead_tuples is enabled may not always be ideal, + * especially if retention was previously stopped and the + * user toggles retain_dead_tuples without adjusting the + * publisher workload. However, this behavior provides a + * convenient way for users to manually refresh the + * retention status. Since retention will be stopped again + * unless the publisher workload is reduced, this approach + * is acceptable for now. + */ + if (opts.retaindeadtuples != sub->retaindeadtuples) + { + values[Anum_pg_subscription_subretentionactive - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretentionactive - 1] = true; + + retention_active = opts.retaindeadtuples; + } + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); /* @@ -1417,13 +1468,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Try again after some time."))); /* - * Remind the user that enabling subscription will prevent - * the accumulation of dead tuples. - */ - if (opts.retaindeadtuples) - CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE); - - /* * Notify the launcher to manage the replication slot for * conflict detection. This ensures that replication slot * is efficiently handled (created, updated, or dropped) @@ -1435,6 +1479,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, retain_dead_tuples = opts.retaindeadtuples; } + if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION)) + { + values[Anum_pg_subscription_submaxretention - 1] = + Int32GetDatum(opts.maxretention); + replaces[Anum_pg_subscription_submaxretention - 1] = true; + + max_retention = opts.maxretention; + } + + /* + * Ensure that system configuration paramters are set + * appropriately to support retain_dead_tuples and + * max_retention_duration. + */ + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) || + IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION)) + CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE, + retain_dead_tuples, + retention_active, + (max_retention > 0)); + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -1472,9 +1537,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * subscription in case it was disabled after creation. See * comments atop CheckSubDeadTupleRetention() for details. */ - if (sub->retaindeadtuples) - CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, - WARNING); + CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, + WARNING, sub->retaindeadtuples, + sub->retentionactive, false); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); @@ -2467,38 +2532,54 @@ check_pub_dead_tuple_retention(WalReceiverConn *wrconn) * this setting can be adjusted after subscription creation. Without it, the * apply worker will simply skip conflict detection. * - * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an - * ERROR since users can only modify retain_dead_tuples for disabled - * subscriptions. And as long as the subscription is enabled promptly, it will - * not pose issues. + * Issue a WARNING or NOTICE if the subscription is disabled and the retention + * is active. Do not raise an ERROR since users can only modify + * retain_dead_tuples for disabled subscriptions. And as long as the + * subscription is enabled promptly, it will not pose issues. + * + * Issue a NOTICE to inform users that max_retention_duration is + * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR + * is not issued because setting max_retention_duration causes no harm, + * even when it is ineffective. */ void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, - int elevel_for_sub_disabled) + int elevel_for_sub_disabled, + bool retain_dead_tuples, bool retention_active, + bool max_retention_set) { Assert(elevel_for_sub_disabled == NOTICE || elevel_for_sub_disabled == WARNING); - if (check_guc && wal_level < WAL_LEVEL_REPLICA) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), - errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); - - if (check_guc && !track_commit_timestamp) - ereport(WARNING, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), - errhint("Consider setting \"%s\" to true.", - "track_commit_timestamp")); - - if (sub_disabled) - ereport(elevel_for_sub_disabled, + if (retain_dead_tuples) + { + if (check_guc && wal_level < WAL_LEVEL_REPLICA) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), + errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); + + if (check_guc && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), + errhint("Consider setting \"%s\" to true.", + "track_commit_timestamp")); + + if (sub_disabled && retention_active) + ereport(elevel_for_sub_disabled, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), + (elevel_for_sub_disabled > NOTICE) + ? errhint("Consider setting %s to false.", + "retain_dead_tuples") : 0); + } + else if (max_retention_set) + { + ereport(NOTICE, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), - (elevel_for_sub_disabled > NOTICE) - ? errhint("Consider setting %s to false.", - "retain_dead_tuples") : 0); + errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled")); + } } /* |