From 8e90ec5580d5345fef31005d7cc2215ba2125070 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 29 Oct 2020 09:11:51 +0530 Subject: Track statistics for streaming of changes from ReorderBuffer. This adds the statistics about transactions streamed to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats and call pg_stat_reset_replication_slot to reset the stats of a particular slot. Users can pass NULL in pg_stat_reset_replication_slot to reset stats of all the slots. Commit 9868167500 has added the basic infrastructure to capture the stats of slot and this commit extends the statistics collector to track additional information about slots. Bump the catversion as we have added new columns in the catalog entry. Author: Ajin Cherian and Amit Kapila Reviewed-by: Sawada Masahiko and Dilip Kumar Discussion: https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com --- src/backend/replication/logical/reorderbuffer.c | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'src/backend/replication/logical/reorderbuffer.c') diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7a8bf760791..c1bd68011c5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -346,6 +346,9 @@ ReorderBufferAllocate(void) buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; + buffer->streamTxns = 0; + buffer->streamCount = 0; + buffer->streamBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { Snapshot snapshot_now; CommandId command_id; + Size stream_bytes; + bool txn_is_streamed; /* We can never reach here for a subtransaction. */ Assert(txn->toptxn == NULL); @@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->snapshot_now = NULL; } + /* + * Remember this information to be used later to update stats. We can't + * update the stats here as an error while processing the changes would + * lead to the accumulation of stats even though we haven't streamed all + * the changes. + */ + txn_is_streamed = rbtxn_is_streamed(txn); + stream_bytes = txn->total_size; + /* Process and send the changes to output plugin. */ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); + rb->streamCount += 1; + rb->streamBytes += stream_bytes; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (txn_is_streamed) ? 0 : 1; + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); -- cgit v1.2.3