summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c20
1 files changed, 20 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7a8bf760791..c1bd68011c5 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
+ buffer->streamTxns = 0;
+ buffer->streamCount = 0;
+ buffer->streamBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
Snapshot snapshot_now;
CommandId command_id;
+ Size stream_bytes;
+ bool txn_is_streamed;
/* We can never reach here for a subtransaction. */
Assert(txn->toptxn == NULL);
@@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->snapshot_now = NULL;
}
+ /*
+ * Remember this information to be used later to update stats. We can't
+ * update the stats here as an error while processing the changes would
+ * lead to the accumulation of stats even though we haven't streamed all
+ * the changes.
+ */
+ txn_is_streamed = rbtxn_is_streamed(txn);
+ stream_bytes = txn->total_size;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+ rb->streamCount += 1;
+ rb->streamBytes += stream_bytes;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);