summaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/subscriptioncmds.c27
1 files changed, 25 insertions, 2 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d246..3922658bbca 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
+#define SUBOPT_DISABLE_ON_ERR 0x00000400
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
bool binary;
bool streaming;
bool twophase;
+ bool disableonerr;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->streaming = false;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
+ if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+ opts->disableonerr = false;
/* Parse options */
foreach(lc, stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
opts->twophase = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+ strcmp(defel->defname, "disable_on_error") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+ opts->disableonerr = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
+ values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING);
+ SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_substream - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+ {
+ values[Anum_pg_subscription_subdisableonerr - 1]
+ = BoolGetDatum(opts.disableonerr);
+ replaces[Anum_pg_subscription_subdisableonerr - 1]
+ = true;
+ }
+
update_tuple = true;
break;
}