summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2022-05-11 10:41:24 +0530
committerAmit Kapila <akapila@postgresql.org>2022-05-11 10:41:24 +0530
commit55558df2374167af38534df988ec3b9d0b0019f0 (patch)
treebc24f74a12194ff6b3467152e9911d83c468c77e /src/backend/replication/logical/logical.c
parentb9d70ef34b833fd63e3ff9f049dbbcdfda9dc9fe (diff)
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61902be3b0e..6b9f24e52d4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -622,6 +622,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -649,6 +650,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
@@ -684,6 +686,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
@@ -715,6 +718,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -754,6 +758,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
@@ -794,6 +800,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
@@ -820,6 +828,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -857,6 +866,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,