diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 102 |
1 files changed, 81 insertions, 21 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ebb026187f..40b6377a852 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); * accommodate that. */ static void -parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, +parse_subscription_options(List *options, + bool *connect, + bool *enabled_given, bool *enabled, + bool *create_slot, bool *slot_name_given, char **slot_name, - bool *copy_data, char **synchronous_commit, - bool *refresh) + bool *copy_data, + char **synchronous_commit, + bool *refresh, + bool *binary_given, bool *binary) { ListCell *lc; bool connect_given = false; @@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *synchronous_commit = NULL; if (refresh) *refresh = true; + if (binary) + { + *binary_given = false; + *binary = false; + } /* Parse options */ foreach(lc, options) @@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, refresh_given = true; *refresh = defGetBoolean(defel); } + else if (strcmp(defel->defname, "binary") == 0 && binary) + { + if (*binary_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *binary_given = true; + *binary = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *conninfo; char *slotname; bool slotname_given; + bool binary; + bool binary_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname_given, - &slotname, ©_data, &synchronous_commit, - NULL); + parse_subscription_options(stmt->options, + &connect, + &enabled_given, &enabled, + &create_slot, + &slotname_given, &slotname, + ©_data, + &synchronous_commit, + NULL, /* no "refresh" */ + &binary_given, &binary); /* * Since creating a replication slot is not transactional, rolling back @@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *slotname; bool slotname_given; char *synchronous_commit; - - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slotname_given, &slotname, - NULL, &synchronous_commit, NULL); + bool binary_given; + bool binary; + + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + &slotname_given, &slotname, + NULL, /* no "copy_data" */ + &synchronous_commit, + NULL, /* no "refresh" */ + &binary_given, &binary); if (slotname_given) { @@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subsynccommit - 1] = true; } + if (binary_given) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(binary); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + update_tuple = true; break; } @@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool enabled, enabled_given; - parse_subscription_options(stmt->options, NULL, - &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + &enabled_given, &enabled, + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + NULL, /* no "copy_data" */ + NULL, /* no "synchronous_commit" */ + NULL, /* no "refresh" */ + NULL, NULL); /* no "binary" */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool copy_data; bool refresh; - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, NULL, ©_data, - NULL, &refresh); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + ©_data, + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL); /* no "binary" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, NULL, ©_data, - NULL, NULL); + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + ©_data, + NULL, /* no "synchronous_commit" */ + NULL, /* no "refresh" */ + NULL, NULL); /* no "binary" */ AlterSubscription_refresh(sub, copy_data); |