summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 61902be3b0e..6b9f24e52d4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -622,6 +622,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -649,6 +650,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
@@ -684,6 +686,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
@@ -715,6 +718,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -754,6 +758,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
@@ -794,6 +800,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
@@ -820,6 +828,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -857,6 +866,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,