diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 169 |
1 files changed, 134 insertions, 35 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 16d83b32539..d124bfe55ca 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -16,6 +16,7 @@ #include "access/htup_details.h" #include "access/table.h" +#include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" @@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn, static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +static void CheckAlterSubOption(Subscription *sub, const char *option, + bool slot_needs_update, bool isTopLevel); /* @@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetStreamingMode(defel); } - else if (strcmp(defel->defname, "two_phase") == 0) + else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) && + strcmp(defel->defname, "two_phase") == 0) { - /* - * Do not allow toggling of two_phase option. Doing so could cause - * missing of transactions and lead to an inconsistent replica. - * See comments atop worker.c - * - * Note: Unsupported twophase indicates that this call originated - * from AlterSubscription. - */ - if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized subscription parameter: \"%s\"", defel->defname))); - if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) errorConflictingDefElem(defel, pstate); @@ -1080,6 +1071,60 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* + * Common checks for altering failover and two_phase options. + */ +static void +CheckAlterSubOption(Subscription *sub, const char *option, + bool slot_needs_update, bool isTopLevel) +{ + /* + * The checks in this function are required only for failover and + * two_phase options. + */ + Assert(strcmp(option, "failover") == 0 || + strcmp(option, "two_phase") == 0); + + /* + * Do not allow changing the option if the subscription is enabled. This + * is because both failover and two_phase options of the slot on the + * publisher cannot be modified if the slot is currently acquired by the + * existing walsender. + * + * Note that two_phase is enabled (aka changed from 'false' to 'true') on + * the publisher by the existing walsender, so we could have allowed that + * even when the subscription is enabled. But we kept this restriction for + * the sake of consistency and simplicity. + */ + if (sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for enabled subscription", + option))); + + if (slot_needs_update) + { + StringInfoData cmd; + + /* + * A valid slot must be associated with the subscription for us to + * modify any of the slot's properties. + */ + if (!sub->slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for a subscription that does not have a slot name", + option))); + + /* The changed option of the slot can't be rolled back. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option); + + PreventInTransactionBlock(isTopLevel, cmd.data); + pfree(cmd.data); + } +} + +/* * Alter the existing subscription. */ ObjectAddress @@ -1094,6 +1139,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, HeapTuple tup; Oid subid; bool update_tuple = false; + bool update_failover = false; + bool update_two_phase = false; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1145,7 +1192,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); @@ -1227,31 +1275,81 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subrunasowner - 1] = true; } - if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) + if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) { - if (!sub->slotname) + /* + * We need to update both the slot and the subscription + * for the two_phase option. We can enable the two_phase + * option for a slot only once the initial data + * synchronization is done. This is to avoid missing some + * data as explained in comments atop worker.c. + */ + update_two_phase = !opts.twophase; + + CheckAlterSubOption(sub, "two_phase", update_two_phase, + isTopLevel); + + /* + * Modifying the two_phase slot option requires a slot + * lookup by slot name, so changing the slot name at the + * same time is not allowed. + */ + if (update_two_phase && + IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name and two_phase cannot be altered at the same time"))); + + /* + * Note that workers may still survive even if the + * subscription has been disabled. + * + * Ensure workers have already been exited to avoid + * getting prepared transactions while we are disabling + * the two_phase option. Otherwise, the changes of an + * already prepared transaction can be replicated again + * along with its corresponding commit, leading to + * duplicate data or errors. + */ + if (logicalrep_workers_find(subid, true, true)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot set %s for a subscription that does not have a slot name", - "failover"))); + errmsg("cannot alter two_phase when logical replication worker is still running"), + errhint("Try again after some time."))); /* - * Do not allow changing the failover state if the - * subscription is enabled. This is because the failover - * state of the slot on the publisher cannot be modified - * if the slot is currently acquired by the apply worker. + * two_phase cannot be disabled if there are any + * uncommitted prepared transactions present otherwise it + * can lead to duplicate data or errors as explained in + * the comment above. */ - if (sub->enabled) + if (update_two_phase && + sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + LookupGXactBySubid(subid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot set %s for enabled subscription", - "failover"))); + errmsg("cannot disable two_phase when prepared transactions are present"), + errhint("Resolve these transactions and try again."))); + + /* Change system catalog accordingly */ + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(opts.twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) + { /* - * The changed failover option of the slot can't be rolled - * back. + * Similar to the two_phase case above, we need to update + * the failover option for both the slot and the + * subscription. */ - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)"); + update_failover = true; + + CheckAlterSubOption(sub, "failover", update_failover, + isTopLevel); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); @@ -1501,13 +1599,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering slot. + * Try to acquire the connection necessary for altering the slot, if + * needed. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1]) + if (update_failover || update_two_phase) { bool must_use_password; char *err; @@ -1528,7 +1627,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover); + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -1675,9 +1776,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * New workers won't be started because we hold an exclusive lock on the * subscription till the end of the transaction. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - subworkers = logicalrep_workers_find(subid, false); - LWLockRelease(LogicalRepWorkerLock); + subworkers = logicalrep_workers_find(subid, false, true); foreach(lc, subworkers) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); |