From 57891c256c345451b91ddb67a8e2a9c5eca9ae1c Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 30 Oct 2023 14:36:21 +0530 Subject: Add STREAM_START/STREAM_STOP for transactional messages during decoding. In test_decoding module, when skip_empty_xacts option was specified, add stream_start/stop for streaming transactional messages. This makes the handling of transactional messages stream consistent irrespective of whether skip_empty_xacts option was specified. Commit 26dd0284b9 made a similar change for non-streaming messages but forgot to update the streaming cases. Author: Peter Smith Reviewed-by: Amit Kapila Discussion: http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com --- contrib/test_decoding/test_decoding.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'contrib/test_decoding/test_decoding.c') diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ab870d9e4dc..288fd0bb4ab 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message) { + /* Output stream start if we haven't yet for transactional messages. */ + if (transactional) + { + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); if (transactional) -- cgit v1.2.3