diff options
author | Amit Kapila <akapila@postgresql.org> | 2023-02-08 07:58:25 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2023-02-08 07:58:25 +0530 |
commit | 8c58624df46222d4d09c5655d8350f3b037880c8 (patch) | |
tree | bf75763c08fa8cf3495e4019d3b70dad23fe46df /src/backend/replication/pgoutput/pgoutput.c | |
parent | fee7b77b9000f35e445de9954a8cbf241f181e60 (diff) |
Fix the logical replication timeout during large DDLs.
The DDLs like Refresh Materialized views that generate lots of temporary
data due to rewrite rules may not be processed by output plugins (for
example pgoutput). So, we won't send keep-alive messages for a long time
while processing such commands and that can lead the subscriber side to
timeout. We have previously fixed a similar case for large transactions in
commit f95d53eded where the output plugin filters all or most of the
changes but missed to handle the DDLs.
We decided not to backpatch this as this adds a new callback in the
existing exposed structure and moreover, users can increase the
wal_sender_timeout and wal_receiver_timeout to avoid this problem.
Author: Wang wei, Hou Zhijie
Reviewed-by: Peter Smith, Ashutosh Bapat, Shi yu, Amit Kapila
Discussion: https://postgr.es/m/OS3PR01MB6275478E5D29E4A563302D3D9E2B9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 54 |
1 files changed, 6 insertions, 48 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4938d8888f..73b080060da 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); -static void update_replication_progress(LogicalDecodingContext *ctx, - bool skipped_xact); /* * Only 3 publication actions are used for row filtering ("insert", "update", @@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - update_replication_progress(ctx, !sent_begin_txn); + OutputPluginUpdateProgress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -625,7 +623,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -639,7 +637,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - update_replication_progress(ctx, false); - if (!is_publishable_relation(relation)) return; @@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - if (!data->messages) return; @@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, } } } - -/* - * Try to update progress and send a keepalive message if too many changes were - * processed. - * - * For a large transaction, if we don't send any change to the downstream for a - * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. - * This can happen when all or most of the changes are either not published or - * got filtered out. - */ -static void -update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) -{ - static int changes_count = 0; - - /* - * We don't want to try sending a keepalive message after processing each - * change as that can have overhead. Tests revealed that there is no - * noticeable overhead in doing it after continuously processing 100 or so - * changes. - */ -#define CHANGES_THRESHOLD 100 - - /* - * If we are at the end of transaction LSN, update progress tracking. - * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we - * try to send a keepalive message if required. - */ - if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) - { - OutputPluginUpdateProgress(ctx, skipped_xact); - changes_count = 0; - } -} |