summaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c102
1 files changed, 81 insertions, 21 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9ebb026187f..40b6377a852 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -55,11 +55,15 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
* accommodate that.
*/
static void
-parse_subscription_options(List *options, bool *connect, bool *enabled_given,
- bool *enabled, bool *create_slot,
+parse_subscription_options(List *options,
+ bool *connect,
+ bool *enabled_given, bool *enabled,
+ bool *create_slot,
bool *slot_name_given, char **slot_name,
- bool *copy_data, char **synchronous_commit,
- bool *refresh)
+ bool *copy_data,
+ char **synchronous_commit,
+ bool *refresh,
+ bool *binary_given, bool *binary)
{
ListCell *lc;
bool connect_given = false;
@@ -90,6 +94,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
*synchronous_commit = NULL;
if (refresh)
*refresh = true;
+ if (binary)
+ {
+ *binary_given = false;
+ *binary = false;
+ }
/* Parse options */
foreach(lc, options)
@@ -175,6 +184,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
refresh_given = true;
*refresh = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "binary") == 0 && binary)
+ {
+ if (*binary_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ *binary_given = true;
+ *binary = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -322,6 +341,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char *conninfo;
char *slotname;
bool slotname_given;
+ bool binary;
+ bool binary_given;
char originname[NAMEDATALEN];
bool create_slot;
List *publications;
@@ -331,10 +352,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*
* Connection and publication should not be specified here.
*/
- parse_subscription_options(stmt->options, &connect, &enabled_given,
- &enabled, &create_slot, &slotname_given,
- &slotname, &copy_data, &synchronous_commit,
- NULL);
+ parse_subscription_options(stmt->options,
+ &connect,
+ &enabled_given, &enabled,
+ &create_slot,
+ &slotname_given, &slotname,
+ &copy_data,
+ &synchronous_commit,
+ NULL, /* no "refresh" */
+ &binary_given, &binary);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -400,6 +426,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+ values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
@@ -669,10 +696,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
char *slotname;
bool slotname_given;
char *synchronous_commit;
-
- parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, &slotname_given, &slotname,
- NULL, &synchronous_commit, NULL);
+ bool binary_given;
+ bool binary;
+
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ NULL, NULL, /* no "enabled" */
+ NULL, /* no "create_slot" */
+ &slotname_given, &slotname,
+ NULL, /* no "copy_data" */
+ &synchronous_commit,
+ NULL, /* no "refresh" */
+ &binary_given, &binary);
if (slotname_given)
{
@@ -697,6 +732,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
}
+ if (binary_given)
+ {
+ values[Anum_pg_subscription_subbinary - 1] =
+ BoolGetDatum(binary);
+ replaces[Anum_pg_subscription_subbinary - 1] = true;
+ }
+
update_tuple = true;
break;
}
@@ -706,9 +748,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
bool enabled,
enabled_given;
- parse_subscription_options(stmt->options, NULL,
- &enabled_given, &enabled, NULL,
- NULL, NULL, NULL, NULL, NULL);
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ &enabled_given, &enabled,
+ NULL, /* no "create_slot" */
+ NULL, NULL, /* no "slot_name" */
+ NULL, /* no "copy_data" */
+ NULL, /* no "synchronous_commit" */
+ NULL, /* no "refresh" */
+ NULL, NULL); /* no "binary" */
Assert(enabled_given);
if (!sub->slotname && enabled)
@@ -744,9 +792,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
bool copy_data;
bool refresh;
- parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, NULL, &copy_data,
- NULL, &refresh);
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ NULL, NULL, /* no "enabled" */
+ NULL, /* no "create_slot" */
+ NULL, NULL, /* no "slot_name" */
+ &copy_data,
+ NULL, /* no "synchronous_commit" */
+ &refresh,
+ NULL, NULL); /* no "binary" */
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
@@ -781,9 +835,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
- parse_subscription_options(stmt->options, NULL, NULL, NULL,
- NULL, NULL, NULL, &copy_data,
- NULL, NULL);
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ NULL, NULL, /* no "enabled" */
+ NULL, /* no "create_slot" */
+ NULL, NULL, /* no "slot_name" */
+ &copy_data,
+ NULL, /* no "synchronous_commit" */
+ NULL, /* no "refresh" */
+ NULL, NULL); /* no "binary" */
AlterSubscription_refresh(sub, copy_data);