diff options
author | Amit Kapila <akapila@postgresql.org> | 2022-03-14 09:32:40 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2022-03-14 09:32:40 +0530 |
commit | 705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33 (patch) | |
tree | d6606b7864bdcc3439caaeccfc0dc3a31a9386e3 /src/backend/commands/subscriptioncmds.c | |
parent | 369398ed886dc13956654777467536625e6fc7ee (diff) |
Optionally disable subscriptions on error.
Logical replication apply workers for a subscription can easily get stuck
in an infinite loop of attempting to apply a change, triggering an error
(such as a constraint violation), exiting with the error written to the
subscription server log, and restarting.
To partially remedy the situation, this patch adds a new subscription
option named 'disable_on_error'. To be consistent with old behavior, this
option defaults to false. When true, both the tablesync worker and apply
worker catch any errors thrown and disable the subscription in order to
break the loop. The error is still also written in the logs.
Once the subscription is disabled, users can either manually resolve the
conflict/error or skip the conflicting transaction by using
pg_replication_origin_advance() function. After resolving the conflict,
users need to enable the subscription to allow apply process to proceed.
Author: Osumi Takamichi and Mark Dilger
Reviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi Yu
Discussion : https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3ef6607d246..3922658bbca 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -61,6 +61,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_DISABLE_ON_ERR 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -82,6 +83,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + bool disableonerr; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) + opts->disableonerr = false; /* Parse options */ foreach(lc, stmt_options) @@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) && + strcmp(defel->defname, "disable_on_error") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; + opts->disableonerr = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR)) + { + values[Anum_pg_subscription_subdisableonerr - 1] + = BoolGetDatum(opts.disableonerr); + replaces[Anum_pg_subscription_subdisableonerr - 1] + = true; + } + update_tuple = true; break; } |