diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 54 |
1 files changed, 45 insertions, 9 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 7b8b11cb81f..519c6846e35 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,6 +44,7 @@ #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -60,7 +61,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, - bool *copy_data) + bool *copy_data, char **synchronous_commit) { ListCell *lc; bool connect_given = false; @@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *slot_name = NULL; if (copy_data) *copy_data = true; + if (synchronous_commit) + *synchronous_commit = NULL; /* Parse options */ foreach (lc, options) @@ -165,6 +168,21 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, copy_data_given = true; *copy_data = !defGetBoolean(defel); } + else if (strcmp(defel->defname, "synchronous_commit") == 0 && + synchronous_commit) + { + if (*synchronous_commit) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *synchronous_commit = defGetString(defel); + + /* Test if the given value is valid for synchronous_commit GUC. */ + (void) set_config_option("synchronous_commit", *synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -269,6 +287,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + char *synchronous_commit; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -280,7 +299,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data); + &enabled, &create_slot, &slotname, ©_data, + &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -311,6 +331,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ + if (synchronous_commit == NULL) + synchronous_commit = "off"; conninfo = stmt->conninfo; publications = stmt->publication; @@ -334,6 +357,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CStringGetTextDatum(conninfo); values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(slotname)); + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); @@ -582,13 +607,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_OPTIONS: { char *slot_name; + char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL); + NULL, &slot_name, NULL, + &synchronous_commit); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + if (synchronous_commit) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum(synchronous_commit); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } update_tuple = true; break; @@ -601,7 +637,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL); + NULL, NULL, NULL); Assert(enabled_given); values[Anum_pg_subscription_subenabled - 1] = @@ -626,7 +662,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -652,7 +688,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); AlterSubscription_refresh(sub, copy_data); |