diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/logical.c | 8 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 34 |
2 files changed, 34 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c68c0481f42..93ed2eb368e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1955,10 +1955,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 && + rb->memExceededCount <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1966,6 +1967,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamTxns, rb->streamCount, rb->streamBytes, + rb->memExceededCount, rb->totalTxns, rb->totalBytes); @@ -1975,6 +1977,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; + repSlotStat.mem_exceeded_count = rb->memExceededCount; repSlotStat.total_txns = rb->totalTxns; repSlotStat.total_bytes = rb->totalBytes; @@ -1986,6 +1989,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamTxns = 0; rb->streamCount = 0; rb->streamBytes = 0; + rb->memExceededCount = 0; rb->totalTxns = 0; rb->totalBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a5e165fb123..a54434151de 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -390,6 +390,7 @@ ReorderBufferAllocate(void) buffer->streamTxns = 0; buffer->streamCount = 0; buffer->streamBytes = 0; + buffer->memExceededCount = 0; buffer->totalTxns = 0; buffer->totalBytes = 0; @@ -3898,14 +3899,26 @@ static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ReorderBufferTXN *txn; + bool update_stats = true; - /* - * Bail out if debug_logical_replication_streaming is buffered and we - * haven't exceeded the memory limit. - */ - if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED && - rb->size < logical_decoding_work_mem * (Size) 1024) + if (rb->size >= logical_decoding_work_mem * (Size) 1024) + { + /* + * Update the statistics as the memory usage has reached the limit. We + * report the statistics update later in this function since we can + * update the slot statistics altogether while streaming or + * serializing transactions in most cases. + */ + rb->memExceededCount += 1; + } + else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED) + { + /* + * Bail out if debug_logical_replication_streaming is buffered and we + * haven't exceeded the memory limit. + */ return; + } /* * If debug_logical_replication_streaming is immediate, loop until there's @@ -3965,8 +3978,17 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) */ Assert(txn->size == 0); Assert(txn->nentries_mem == 0); + + /* + * We've reported the memExceededCount update while streaming or + * serializing the transaction. + */ + update_stats = false; } + if (update_stats) + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * (Size) 1024); } |