diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3ef6607d246..3922658bbca 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -61,6 +61,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_DISABLE_ON_ERR 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -82,6 +83,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + bool disableonerr; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) + opts->disableonerr = false; /* Parse options */ foreach(lc, stmt_options) @@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) && + strcmp(defel->defname, "disable_on_error") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; + opts->disableonerr = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR)) + { + values[Anum_pg_subscription_subdisableonerr - 1] + = BoolGetDatum(opts.disableonerr); + replaces[Anum_pg_subscription_subdisableonerr - 1] + = true; + } + update_tuple = true; break; } |