summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-02-08 07:58:25 +0530
committerAmit Kapila <akapila@postgresql.org>2023-02-08 07:58:25 +0530
commit8c58624df46222d4d09c5655d8350f3b037880c8 (patch)
treebf75763c08fa8cf3495e4019d3b70dad23fe46df /src/backend/replication/logical/logical.c
parentfee7b77b9000f35e445de9954a8cbf241f181e60 (diff)
Fix the logical replication timeout during large DDLs.
The DDLs like Refresh Materialized views that generate lots of temporary data due to rewrite rules may not be processed by output plugins (for example pgoutput). So, we won't send keep-alive messages for a long time while processing such commands and that can lead the subscriber side to timeout. We have previously fixed a similar case for large transactions in commit f95d53eded where the output plugin filters all or most of the changes but missed to handle the DDLs. We decided not to backpatch this as this adds a new callback in the existing exposed structure and moreover, users can increase the wal_sender_timeout and wal_receiver_timeout to avoid this problem. Author: Wang wei, Hou Zhijie Reviewed-by: Peter Smith, Ashutosh Bapat, Shi yu, Amit Kapila Discussion: https://postgr.es/m/OS3PR01MB6275478E5D29E4A563302D3D9E2B9@OS3PR01MB6275.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c50
1 files changed, 50 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd76497..c3ec97a0a62 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
+/* callback to update txn's progress */
+static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
+ ReorderBufferTXN *txn,
+ XLogRecPtr lsn);
+
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
/*
@@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+ /*
+ * Callback to support updating progress during sending data of a
+ * transaction (and its subtransactions) to the output plugin.
+ */
+ ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
+
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
ctx->write = do_write;
@@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr lsn)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "update_progress_txn";
+ state.report_location = lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * Report this change's lsn so replies from clients can give an up-to-date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = lsn;
+
+ ctx->end_xact = false;
+
+ OutputPluginUpdateProgress(ctx, false);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.