summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/logical/logical.c8
-rw-r--r--src/backend/replication/logical/reorderbuffer.c34
-rw-r--r--src/backend/utils/activity/pgstat_replslot.c1
-rw-r--r--src/backend/utils/adt/pgstatfuncs.c19
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/pgstat.h1
-rw-r--r--src/include/replication/reorderbuffer.h3
-rw-r--r--src/test/regress/expected/rules.out3
10 files changed, 57 insertions, 21 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c94f1f05f52..fcc86fd43be 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1063,6 +1063,7 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.mem_exceeded_count,
s.total_txns,
s.total_bytes,
s.stats_reset
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c68c0481f42..93ed2eb368e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1955,10 +1955,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
PgStat_StatReplSlotEntry repSlotStat;
/* Nothing to do if we don't have any replication stats to be sent. */
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 &&
+ rb->memExceededCount <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
rb,
rb->spillTxns,
rb->spillCount,
@@ -1966,6 +1967,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamTxns,
rb->streamCount,
rb->streamBytes,
+ rb->memExceededCount,
rb->totalTxns,
rb->totalBytes);
@@ -1975,6 +1977,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
+ repSlotStat.mem_exceeded_count = rb->memExceededCount;
repSlotStat.total_txns = rb->totalTxns;
repSlotStat.total_bytes = rb->totalBytes;
@@ -1986,6 +1989,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamTxns = 0;
rb->streamCount = 0;
rb->streamBytes = 0;
+ rb->memExceededCount = 0;
rb->totalTxns = 0;
rb->totalBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index a5e165fb123..a54434151de 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -390,6 +390,7 @@ ReorderBufferAllocate(void)
buffer->streamTxns = 0;
buffer->streamCount = 0;
buffer->streamBytes = 0;
+ buffer->memExceededCount = 0;
buffer->totalTxns = 0;
buffer->totalBytes = 0;
@@ -3898,14 +3899,26 @@ static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
+ bool update_stats = true;
- /*
- * Bail out if debug_logical_replication_streaming is buffered and we
- * haven't exceeded the memory limit.
- */
- if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
- rb->size < logical_decoding_work_mem * (Size) 1024)
+ if (rb->size >= logical_decoding_work_mem * (Size) 1024)
+ {
+ /*
+ * Update the statistics as the memory usage has reached the limit. We
+ * report the statistics update later in this function since we can
+ * update the slot statistics altogether while streaming or
+ * serializing transactions in most cases.
+ */
+ rb->memExceededCount += 1;
+ }
+ else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED)
+ {
+ /*
+ * Bail out if debug_logical_replication_streaming is buffered and we
+ * haven't exceeded the memory limit.
+ */
return;
+ }
/*
* If debug_logical_replication_streaming is immediate, loop until there's
@@ -3965,8 +3978,17 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
*/
Assert(txn->size == 0);
Assert(txn->nentries_mem == 0);
+
+ /*
+ * We've reported the memExceededCount update while streaming or
+ * serializing the transaction.
+ */
+ update_stats = false;
}
+ if (update_stats)
+ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
+
/* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
}
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index ccfb11c49bf..d210c261ac6 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -94,6 +94,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
REPLSLOT_ACC(stream_txns);
REPLSLOT_ACC(stream_count);
REPLSLOT_ACC(stream_bytes);
+ REPLSLOT_ACC(mem_exceeded_count);
REPLSLOT_ACC(total_txns);
REPLSLOT_ACC(total_bytes);
#undef REPLSLOT_ACC
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3802a4cb888..1fe33df2756 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2121,7 +2121,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
text *slotname_text = PG_GETARG_TEXT_P(0);
NameData slotname;
TupleDesc tupdesc;
@@ -2146,11 +2146,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2173,13 +2175,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
values[4] = Int64GetDatum(slotent->stream_txns);
values[5] = Int64GetDatum(slotent->stream_count);
values[6] = Int64GetDatum(slotent->stream_bytes);
- values[7] = Int64GetDatum(slotent->total_txns);
- values[8] = Int64GetDatum(slotent->total_bytes);
+ values[7] = Int64GetDatum(slotent->mem_exceeded_count);
+ values[8] = Int64GetDatum(slotent->total_txns);
+ values[9] = Int64GetDatum(slotent->total_bytes);
if (slotent->stat_reset_timestamp == 0)
- nulls[9] = true;
+ nulls[10] = true;
else
- values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+ values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index af54211f330..a98c6d6d820 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202510082
+#define CATALOG_VERSION_NO 202510083
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6bb31892d1d..25687eaecea 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5691,9 +5691,9 @@
{ oid => '6169', descr => 'statistics: information about replication slot',
proname => 'pg_stat_get_replication_slot', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'text',
- proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+ proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
{ oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index d24bf864a22..bc8077cbae6 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -395,6 +395,7 @@ typedef struct PgStat_StatReplSlotEntry
PgStat_Counter stream_txns;
PgStat_Counter stream_count;
PgStat_Counter stream_bytes;
+ PgStat_Counter mem_exceeded_count;
PgStat_Counter total_txns;
PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 91dc7e5e448..3cbe106a3c7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -690,6 +690,9 @@ struct ReorderBuffer
int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data decoded */
+ /* Number of times the logical_decoding_work_mem limit has been reached */
+ int64 memExceededCount;
+
/*
* Statistics about all the transactions sent to the decoding output
* plugin
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 8859a5a885f..d67af144d69 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2140,11 +2140,12 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.mem_exceeded_count,
s.total_txns,
s.total_bytes,
s.stats_reset
FROM pg_replication_slots r,
- LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+ LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset)
WHERE (r.datoid IS NOT NULL);
pg_stat_slru| SELECT name,
blks_zeroed,