diff options
author | Amit Kapila <akapila@postgresql.org> | 2023-02-08 07:58:25 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2023-02-08 07:58:25 +0530 |
commit | 8c58624df46222d4d09c5655d8350f3b037880c8 (patch) | |
tree | bf75763c08fa8cf3495e4019d3b70dad23fe46df /src/backend/replication/logical/logical.c | |
parent | fee7b77b9000f35e445de9954a8cbf241f181e60 (diff) |
Fix the logical replication timeout during large DDLs.
The DDLs like Refresh Materialized views that generate lots of temporary
data due to rewrite rules may not be processed by output plugins (for
example pgoutput). So, we won't send keep-alive messages for a long time
while processing such commands and that can lead the subscriber side to
timeout. We have previously fixed a similar case for large transactions in
commit f95d53eded where the output plugin filters all or most of the
changes but missed to handle the DDLs.
We decided not to backpatch this as this adds a new callback in the
existing exposed structure and moreover, users can increase the
wal_sender_timeout and wal_receiver_timeout to avoid this problem.
Author: Wang wei, Hou Zhijie
Reviewed-by: Peter Smith, Ashutosh Bapat, Shi yu, Amit Kapila
Discussion: https://postgr.es/m/OS3PR01MB6275478E5D29E4A563302D3D9E2B9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1a58dd76497..c3ec97a0a62 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); +/* callback to update txn's progress */ +static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, + ReorderBufferTXN *txn, + XLogRecPtr lsn); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin); /* @@ -278,6 +283,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; + /* + * Callback to support updating progress during sending data of a + * transaction (and its subtransactions) to the output plugin. + */ + ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -1584,6 +1595,45 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "update_progress_txn"; + state.report_location = lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + ctx->write_xid = txn->xid; + + /* + * Report this change's lsn so replies from clients can give an up-to-date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = lsn; + + ctx->end_xact = false; + + OutputPluginUpdateProgress(ctx, false); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. |