diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187a..8288da5277f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -29,6 +29,7 @@ #include "postgres.h" #include "access/xact.h" +#include "access/xlogutils.h" #include "access/xlog_internal.h" #include "fmgr.h" #include "miscadmin.h" @@ -41,6 +42,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/memutils.h" /* data for errcontext callback */ @@ -1949,3 +1951,76 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->totalTxns = 0; rb->totalBytes = 0; } + +/* + * Read up to the end of WAL starting from the decoding slot's restart_lsn. + * Return true if any meaningful/decodable WAL records are encountered, + * otherwise false. + */ +bool +LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) +{ + bool has_pending_wal = false; + + Assert(MyReplicationSlot); + + PG_TRY(); + { + LogicalDecodingContext *ctx; + + /* + * Create our decoding context in fast_forward mode, passing start_lsn + * as InvalidXLogRecPtr, so that we start processing from the slot's + * confirmed_flush. + */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, /* fast_forward */ + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); + + /* + * Start reading at the slot's restart_lsn, which we know points to a + * valid record. + */ + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); + + /* Invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Loop until the end of WAL or some changes are processed */ + while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal) + { + XLogRecord *record; + char *errm = NULL; + + record = XLogReadRecord(ctx->reader, &errm); + + if (errm) + elog(ERROR, "could not find record for logical decoding: %s", errm); + + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + has_pending_wal = ctx->processing_required; + + CHECK_FOR_INTERRUPTS(); + } + + /* Clean up */ + FreeDecodingContext(ctx); + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return has_pending_wal; +} |