diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index bdc12087241..bd0cc0848d7 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,6 +64,7 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 +#define SUBOPT_ORIGIN 0x00001000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -86,6 +87,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + char *origin; XLogRecPtr lsn; } SubOpts; @@ -118,7 +120,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_COPY_DATA)); - /* Set default values for the boolean supported options. */ + /* Set default values for the supported options. */ if (IsSet(supported_opts, SUBOPT_CONNECT)) opts->connect = true; if (IsSet(supported_opts, SUBOPT_ENABLED)) @@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) opts->disableonerr = false; + if (IsSet(supported_opts, SUBOPT_ORIGIN)) + opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); /* Parse options */ foreach(lc, stmt_options) @@ -265,6 +269,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_ORIGIN) && + strcmp(defel->defname, "origin") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ORIGIN)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ORIGIN; + pfree(opts->origin); + + /* + * Even though the "origin" parameter allows only "none" and "any" + * values, it is implemented as a string type so that the + * parameter can be extended in future versions to support + * filtering using origin names specified by the user. + */ + opts->origin = defGetString(defel); + + if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) && + (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized origin value: \"%s\"", opts->origin)); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -530,7 +557,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -617,6 +644,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CStringGetTextDatum(opts.synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); + values[Anum_pg_subscription_suborigin - 1] = + CStringGetTextDatum(opts.origin); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -1014,7 +1043,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1071,6 +1101,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) + { + values[Anum_pg_subscription_suborigin - 1] = + CStringGetTextDatum(opts.origin); + replaces[Anum_pg_subscription_suborigin - 1] = true; + } + update_tuple = true; break; } |