summaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c43
1 files changed, 40 insertions, 3 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index bdc12087241..bd0cc0848d7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -64,6 +64,7 @@
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_LSN 0x00000800
+#define SUBOPT_ORIGIN 0x00001000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
bool streaming;
bool twophase;
bool disableonerr;
+ char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_COPY_DATA));
- /* Set default values for the boolean supported options. */
+ /* Set default values for the supported options. */
if (IsSet(supported_opts, SUBOPT_CONNECT))
opts->connect = true;
if (IsSet(supported_opts, SUBOPT_ENABLED))
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
opts->disableonerr = false;
+ if (IsSet(supported_opts, SUBOPT_ORIGIN))
+ opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
/* Parse options */
foreach(lc, stmt_options)
@@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
opts->disableonerr = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
+ strcmp(defel->defname, "origin") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_ORIGIN;
+ pfree(opts->origin);
+
+ /*
+ * Even though the "origin" parameter allows only "none" and "any"
+ * values, it is implemented as a string type so that the
+ * parameter can be extended in future versions to support
+ * filtering using origin names specified by the user.
+ */
+ opts->origin = defGetString(defel);
+
+ if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
+ (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized origin value: \"%s\"", opts->origin));
+ }
else if (IsSet(supported_opts, SUBOPT_LSN) &&
strcmp(defel->defname, "lsn") == 0)
{
@@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
- SUBOPT_DISABLE_ON_ERR);
+ SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CStringGetTextDatum(opts.synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
+ values[Anum_pg_subscription_suborigin - 1] =
+ CStringGetTextDatum(opts.origin);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
+ SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+ SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
= true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
+ {
+ values[Anum_pg_subscription_suborigin - 1] =
+ CStringGetTextDatum(opts.origin);
+ replaces[Anum_pg_subscription_suborigin - 1] = true;
+ }
+
update_tuple = true;
break;
}