summaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c169
1 files changed, 134 insertions, 35 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 16d83b32539..d124bfe55ca 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
#include "access/htup_details.h"
#include "access/table.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
@@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static void CheckAlterSubOption(Subscription *sub, const char *option,
+ bool slot_needs_update, bool isTopLevel);
/*
@@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetStreamingMode(defel);
}
- else if (strcmp(defel->defname, "two_phase") == 0)
+ else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+ strcmp(defel->defname, "two_phase") == 0)
{
- /*
- * Do not allow toggling of two_phase option. Doing so could cause
- * missing of transactions and lead to an inconsistent replica.
- * See comments atop worker.c
- *
- * Note: Unsupported twophase indicates that this call originated
- * from AlterSubscription.
- */
- if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
-
if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
errorConflictingDefElem(defel, pstate);
@@ -1080,6 +1071,60 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
+ * Common checks for altering failover and two_phase options.
+ */
+static void
+CheckAlterSubOption(Subscription *sub, const char *option,
+ bool slot_needs_update, bool isTopLevel)
+{
+ /*
+ * The checks in this function are required only for failover and
+ * two_phase options.
+ */
+ Assert(strcmp(option, "failover") == 0 ||
+ strcmp(option, "two_phase") == 0);
+
+ /*
+ * Do not allow changing the option if the subscription is enabled. This
+ * is because both failover and two_phase options of the slot on the
+ * publisher cannot be modified if the slot is currently acquired by the
+ * existing walsender.
+ *
+ * Note that two_phase is enabled (aka changed from 'false' to 'true') on
+ * the publisher by the existing walsender, so we could have allowed that
+ * even when the subscription is enabled. But we kept this restriction for
+ * the sake of consistency and simplicity.
+ */
+ if (sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for enabled subscription",
+ option)));
+
+ if (slot_needs_update)
+ {
+ StringInfoData cmd;
+
+ /*
+ * A valid slot must be associated with the subscription for us to
+ * modify any of the slot's properties.
+ */
+ if (!sub->slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for a subscription that does not have a slot name",
+ option)));
+
+ /* The changed option of the slot can't be rolled back. */
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
+
+ PreventInTransactionBlock(isTopLevel, cmd.data);
+ pfree(cmd.data);
+ }
+}
+
+/*
* Alter the existing subscription.
*/
ObjectAddress
@@ -1094,6 +1139,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
HeapTuple tup;
Oid subid;
bool update_tuple = false;
+ bool update_failover = false;
+ bool update_two_phase = false;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
@@ -1145,7 +1192,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_ORIGIN);
@@ -1227,31 +1275,81 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subrunasowner - 1] = true;
}
- if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
{
- if (!sub->slotname)
+ /*
+ * We need to update both the slot and the subscription
+ * for the two_phase option. We can enable the two_phase
+ * option for a slot only once the initial data
+ * synchronization is done. This is to avoid missing some
+ * data as explained in comments atop worker.c.
+ */
+ update_two_phase = !opts.twophase;
+
+ CheckAlterSubOption(sub, "two_phase", update_two_phase,
+ isTopLevel);
+
+ /*
+ * Modifying the two_phase slot option requires a slot
+ * lookup by slot name, so changing the slot name at the
+ * same time is not allowed.
+ */
+ if (update_two_phase &&
+ IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("slot_name and two_phase cannot be altered at the same time")));
+
+ /*
+ * Note that workers may still survive even if the
+ * subscription has been disabled.
+ *
+ * Ensure workers have already been exited to avoid
+ * getting prepared transactions while we are disabling
+ * the two_phase option. Otherwise, the changes of an
+ * already prepared transaction can be replicated again
+ * along with its corresponding commit, leading to
+ * duplicate data or errors.
+ */
+ if (logicalrep_workers_find(subid, true, true))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot set %s for a subscription that does not have a slot name",
- "failover")));
+ errmsg("cannot alter two_phase when logical replication worker is still running"),
+ errhint("Try again after some time.")));
/*
- * Do not allow changing the failover state if the
- * subscription is enabled. This is because the failover
- * state of the slot on the publisher cannot be modified
- * if the slot is currently acquired by the apply worker.
+ * two_phase cannot be disabled if there are any
+ * uncommitted prepared transactions present otherwise it
+ * can lead to duplicate data or errors as explained in
+ * the comment above.
*/
- if (sub->enabled)
+ if (update_two_phase &&
+ sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+ LookupGXactBySubid(subid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot set %s for enabled subscription",
- "failover")));
+ errmsg("cannot disable two_phase when prepared transactions are present"),
+ errhint("Resolve these transactions and try again.")));
+
+ /* Change system catalog accordingly */
+ values[Anum_pg_subscription_subtwophasestate - 1] =
+ CharGetDatum(opts.twophase ?
+ LOGICALREP_TWOPHASE_STATE_PENDING :
+ LOGICALREP_TWOPHASE_STATE_DISABLED);
+ replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+ }
+ if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ {
/*
- * The changed failover option of the slot can't be rolled
- * back.
+ * Similar to the two_phase case above, we need to update
+ * the failover option for both the slot and the
+ * subscription.
*/
- PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)");
+ update_failover = true;
+
+ CheckAlterSubOption(sub, "failover", update_failover,
+ isTopLevel);
values[Anum_pg_subscription_subfailover - 1] =
BoolGetDatum(opts.failover);
@@ -1501,13 +1599,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
- * Try to acquire the connection necessary for altering slot.
+ * Try to acquire the connection necessary for altering the slot, if
+ * needed.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
- if (replaces[Anum_pg_subscription_subfailover - 1])
+ if (update_failover || update_two_phase)
{
bool must_use_password;
char *err;
@@ -1528,7 +1627,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
- walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+ walrcv_alter_slot(wrconn, sub->slotname,
+ update_failover ? &opts.failover : NULL,
+ update_two_phase ? &opts.twophase : NULL);
}
PG_FINALLY();
{
@@ -1675,9 +1776,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* New workers won't be started because we hold an exclusive lock on the
* subscription till the end of the transaction.
*/
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- subworkers = logicalrep_workers_find(subid, false);
- LWLockRelease(LogicalRepWorkerLock);
+ subworkers = logicalrep_workers_find(subid, false, true);
foreach(lc, subworkers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);