summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/logical.c8
-rw-r--r--src/backend/replication/logical/reorderbuffer.c34
2 files changed, 34 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c68c0481f42..93ed2eb368e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1955,10 +1955,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
PgStat_StatReplSlotEntry repSlotStat;
/* Nothing to do if we don't have any replication stats to be sent. */
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 &&
+ rb->memExceededCount <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
rb,
rb->spillTxns,
rb->spillCount,
@@ -1966,6 +1967,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamTxns,
rb->streamCount,
rb->streamBytes,
+ rb->memExceededCount,
rb->totalTxns,
rb->totalBytes);
@@ -1975,6 +1977,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
+ repSlotStat.mem_exceeded_count = rb->memExceededCount;
repSlotStat.total_txns = rb->totalTxns;
repSlotStat.total_bytes = rb->totalBytes;
@@ -1986,6 +1989,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamTxns = 0;
rb->streamCount = 0;
rb->streamBytes = 0;
+ rb->memExceededCount = 0;
rb->totalTxns = 0;
rb->totalBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index a5e165fb123..a54434151de 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -390,6 +390,7 @@ ReorderBufferAllocate(void)
buffer->streamTxns = 0;
buffer->streamCount = 0;
buffer->streamBytes = 0;
+ buffer->memExceededCount = 0;
buffer->totalTxns = 0;
buffer->totalBytes = 0;
@@ -3898,14 +3899,26 @@ static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
+ bool update_stats = true;
- /*
- * Bail out if debug_logical_replication_streaming is buffered and we
- * haven't exceeded the memory limit.
- */
- if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
- rb->size < logical_decoding_work_mem * (Size) 1024)
+ if (rb->size >= logical_decoding_work_mem * (Size) 1024)
+ {
+ /*
+ * Update the statistics as the memory usage has reached the limit. We
+ * report the statistics update later in this function since we can
+ * update the slot statistics altogether while streaming or
+ * serializing transactions in most cases.
+ */
+ rb->memExceededCount += 1;
+ }
+ else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED)
+ {
+ /*
+ * Bail out if debug_logical_replication_streaming is buffered and we
+ * haven't exceeded the memory limit.
+ */
return;
+ }
/*
* If debug_logical_replication_streaming is immediate, loop until there's
@@ -3965,8 +3978,17 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
*/
Assert(txn->size == 0);
Assert(txn->nentries_mem == 0);
+
+ /*
+ * We've reported the memExceededCount update while streaming or
+ * serializing the transaction.
+ */
+ update_stats = false;
}
+ if (update_stats)
+ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
+
/* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
}