summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-04-16 07:34:43 +0530
committerAmit Kapila <akapila@postgresql.org>2021-04-16 07:34:43 +0530
commitf5fc2f5b23d1b1dff60f8ca5dc211161df47eda4 (patch)
tree440595580ef83eac4609e3cd74c2fe43bc73f28b /src
parent1bf946bd43e545b86e567588b791311fe4e36a8c (diff)
Add information of total data processed to replication slot stats.
This adds the statistics about total transactions count and total transaction data logically sent to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats. Suggested-by: Andres Freund Author: Vignesh C and Amit Kapila Reviewed-by: Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql2
-rw-r--r--src/backend/postmaster/pgstat.c6
-rw-r--r--src/backend/replication/logical/logical.c18
-rw-r--r--src/backend/replication/logical/reorderbuffer.c21
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c8
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/pgstat.h4
-rw-r--r--src/include/replication/reorderbuffer.h7
-rw-r--r--src/test/regress/expected/rules.out4
10 files changed, 63 insertions, 15 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a0..6d78b335908 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 666ce95d083..e1ec7d8b7d6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
msg.m_stream_txns = repSlotStat->stream_txns;
msg.m_stream_count = repSlotStat->stream_count;
msg.m_stream_bytes = repSlotStat->stream_bytes;
+ msg.m_total_txns = repSlotStat->total_txns;
+ msg.m_total_bytes = repSlotStat->total_bytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].stream_txns += msg->m_stream_txns;
replSlotStats[idx].stream_count += msg->m_stream_count;
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+ replSlotStats[idx].total_txns += msg->m_total_txns;
+ replSlotStats[idx].total_bytes += msg->m_total_bytes;
}
}
@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].stream_txns = 0;
replSlotStats[i].stream_count = 0;
replSlotStats[i].stream_bytes = 0;
+ replSlotStats[i].total_txns = 0;
+ replSlotStats[i].total_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12b..35b0c676412 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder;
PgStat_ReplSlotStats repSlotStat;
- /*
- * Nothing to do if we haven't spilled or streamed anything since the last
- * time the stats has been sent.
- */
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+ /* Nothing to do if we don't have any replication stats to be sent. */
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
(long long) rb->spillBytes,
(long long) rb->streamTxns,
(long long) rb->streamCount,
- (long long) rb->streamBytes);
+ (long long) rb->streamBytes,
+ (long long) rb->totalTxns,
+ (long long) rb->totalBytes);
namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
+ repSlotStat.total_txns = rb->totalTxns;
+ repSlotStat.total_bytes = rb->totalBytes;
pgstat_report_replslot(&repSlotStat);
+
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
rb->streamTxns = 0;
rb->streamCount = 0;
rb->streamBytes = 0;
+ rb->totalTxns = 0;
+ rb->totalBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a21..5cb484f0323 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
buffer->streamTxns = 0;
buffer->streamCount = 0;
buffer->streamBytes = 0;
+ buffer->totalTxns = 0;
+ buffer->totalBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node);
+ /*
+ * Update the total bytes processed before releasing the current set
+ * of changes and restoring the new set of changes.
+ */
+ rb->totalBytes += rb->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
@@ -2364,6 +2371,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
iterstate = NULL;
/*
+ * Update total transaction count and total transaction bytes
+ * processed. Ensure to not count the streamed transaction multiple
+ * times.
+ *
+ * Note that the statistics computation has to be done after
+ * ReorderBufferIterTXNFinish as it releases the serialized change
+ * which we have already accounted in ReorderBufferIterTXNNext.
+ */
+ if (!rbtxn_is_streamed(txn))
+ rb->totalTxns++;
+
+ rb->totalBytes += rb->size;
+
+ /*
* Done with current changes, send the last message for this set of
* changes depending upon streaming mode.
*/
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba736143..2680190a402 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[4] = Int64GetDatum(s->stream_txns);
values[5] = Int64GetDatum(s->stream_count);
values[6] = Int64GetDatum(s->stream_bytes);
+ values[7] = Int64GetDatum(s->total_txns);
+ values[8] = Int64GetDatum(s->total_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[7] = true;
+ nulls[9] = true;
else
- values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 87e9596da56..904b0c97ec2 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202104151
+#define CATALOG_VERSION_NO 202104161
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae6..591753fe817 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5315,9 +5315,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 8e11215058e..2aeb3cded4d 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_stream_txns;
PgStat_Counter m_stream_count;
PgStat_Counter m_stream_bytes;
+ PgStat_Counter m_total_txns;
+ PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
/* ----------
@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter stream_txns;
PgStat_Counter stream_count;
PgStat_Counter stream_bytes;
+ PgStat_Counter total_txns;
+ PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6ab..bfab8303ee7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,13 @@ struct ReorderBuffer
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */
+
+ /*
+ * Statistics about all the transactions sent to the decoding output
+ * plugin
+ */
+ int64 totalTxns; /* total number of transactions sent */
+ int64 totalBytes; /* total amount of data sent */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c6..6399f3feef8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,