summaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c163
1 files changed, 122 insertions, 41 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0d74398faf3..82cf65fae73 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,8 +72,9 @@
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_FAILOVER 0x00002000
#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
-#define SUBOPT_LSN 0x00008000
-#define SUBOPT_ORIGIN 0x00010000
+#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
+#define SUBOPT_LSN 0x00010000
+#define SUBOPT_ORIGIN 0x00020000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -100,6 +101,7 @@ typedef struct SubOpts
bool runasowner;
bool failover;
bool retaindeadtuples;
+ int32 maxretention;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -168,6 +170,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->failover = false;
if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
opts->retaindeadtuples = false;
+ if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
+ opts->maxretention = 0;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -322,6 +326,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
opts->retaindeadtuples = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
+ strcmp(defel->defname, "max_retention_duration") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
+ opts->maxretention = defGetInt32(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -579,7 +592,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES |
+ SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -646,9 +660,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
stmt->subname)));
}
- /* Ensure that we can enable retain_dead_tuples */
- if (opts.retaindeadtuples)
- CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
+ /*
+ * Ensure that system configuration paramters are set appropriately to
+ * support retain_dead_tuples and max_retention_duration.
+ */
+ CheckSubDeadTupleRetention(true, !opts.enabled, WARNING,
+ opts.retaindeadtuples, opts.retaindeadtuples,
+ (opts.maxretention > 0));
if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
opts.slot_name == NULL)
@@ -692,6 +710,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
values[Anum_pg_subscription_subretaindeadtuples - 1] =
BoolGetDatum(opts.retaindeadtuples);
+ values[Anum_pg_subscription_submaxretention - 1] =
+ Int32GetDatum(opts.maxretention);
+ values[Anum_pg_subscription_subretentionactive - 1] =
+ Int32GetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -1175,6 +1197,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
bool update_two_phase = false;
bool check_pub_rdt = false;
bool retain_dead_tuples;
+ int max_retention;
+ bool retention_active;
char *origin;
Subscription *sub;
Form_pg_subscription form;
@@ -1205,6 +1229,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
retain_dead_tuples = sub->retaindeadtuples;
origin = sub->origin;
+ max_retention = sub->maxretention;
+ retention_active = sub->retentionactive;
/*
* Don't allow non-superuser modification of a subscription with
@@ -1234,7 +1260,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES |
+ SUBOPT_MAX_RETENTION_DURATION |
+ SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1400,6 +1428,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
BoolGetDatum(opts.retaindeadtuples);
replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
+ /*
+ * Update the retention status only if there's a change in
+ * the retain_dead_tuples option value.
+ *
+ * Automatically marking retention as active when
+ * retain_dead_tuples is enabled may not always be ideal,
+ * especially if retention was previously stopped and the
+ * user toggles retain_dead_tuples without adjusting the
+ * publisher workload. However, this behavior provides a
+ * convenient way for users to manually refresh the
+ * retention status. Since retention will be stopped again
+ * unless the publisher workload is reduced, this approach
+ * is acceptable for now.
+ */
+ if (opts.retaindeadtuples != sub->retaindeadtuples)
+ {
+ values[Anum_pg_subscription_subretentionactive - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
+ replaces[Anum_pg_subscription_subretentionactive - 1] = true;
+
+ retention_active = opts.retaindeadtuples;
+ }
+
CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
/*
@@ -1417,13 +1468,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
errhint("Try again after some time.")));
/*
- * Remind the user that enabling subscription will prevent
- * the accumulation of dead tuples.
- */
- if (opts.retaindeadtuples)
- CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
-
- /*
* Notify the launcher to manage the replication slot for
* conflict detection. This ensures that replication slot
* is efficiently handled (created, updated, or dropped)
@@ -1435,6 +1479,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
retain_dead_tuples = opts.retaindeadtuples;
}
+ if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
+ {
+ values[Anum_pg_subscription_submaxretention - 1] =
+ Int32GetDatum(opts.maxretention);
+ replaces[Anum_pg_subscription_submaxretention - 1] = true;
+
+ max_retention = opts.maxretention;
+ }
+
+ /*
+ * Ensure that system configuration paramters are set
+ * appropriately to support retain_dead_tuples and
+ * max_retention_duration.
+ */
+ if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
+ IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
+ CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE,
+ retain_dead_tuples,
+ retention_active,
+ (max_retention > 0));
+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
@@ -1472,9 +1537,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
* subscription in case it was disabled after creation. See
* comments atop CheckSubDeadTupleRetention() for details.
*/
- if (sub->retaindeadtuples)
- CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
- WARNING);
+ CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
+ WARNING, sub->retaindeadtuples,
+ sub->retentionactive, false);
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(opts.enabled);
@@ -2467,38 +2532,54 @@ check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
* this setting can be adjusted after subscription creation. Without it, the
* apply worker will simply skip conflict detection.
*
- * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
- * ERROR since users can only modify retain_dead_tuples for disabled
- * subscriptions. And as long as the subscription is enabled promptly, it will
- * not pose issues.
+ * Issue a WARNING or NOTICE if the subscription is disabled and the retention
+ * is active. Do not raise an ERROR since users can only modify
+ * retain_dead_tuples for disabled subscriptions. And as long as the
+ * subscription is enabled promptly, it will not pose issues.
+ *
+ * Issue a NOTICE to inform users that max_retention_duration is
+ * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
+ * is not issued because setting max_retention_duration causes no harm,
+ * even when it is ineffective.
*/
void
CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
- int elevel_for_sub_disabled)
+ int elevel_for_sub_disabled,
+ bool retain_dead_tuples, bool retention_active,
+ bool max_retention_set)
{
Assert(elevel_for_sub_disabled == NOTICE ||
elevel_for_sub_disabled == WARNING);
- if (check_guc && wal_level < WAL_LEVEL_REPLICA)
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
- errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
-
- if (check_guc && !track_commit_timestamp)
- ereport(WARNING,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
- errhint("Consider setting \"%s\" to true.",
- "track_commit_timestamp"));
-
- if (sub_disabled)
- ereport(elevel_for_sub_disabled,
+ if (retain_dead_tuples)
+ {
+ if (check_guc && wal_level < WAL_LEVEL_REPLICA)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
+ errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
+
+ if (check_guc && !track_commit_timestamp)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
+ errhint("Consider setting \"%s\" to true.",
+ "track_commit_timestamp"));
+
+ if (sub_disabled && retention_active)
+ ereport(elevel_for_sub_disabled,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
+ (elevel_for_sub_disabled > NOTICE)
+ ? errhint("Consider setting %s to false.",
+ "retain_dead_tuples") : 0);
+ }
+ else if (max_retention_set)
+ {
+ ereport(NOTICE,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
- (elevel_for_sub_disabled > NOTICE)
- ? errhint("Consider setting %s to false.",
- "retain_dead_tuples") : 0);
+ errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
+ }
}
/*