summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c75
1 files changed, 75 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187a..8288da5277f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
#include "postgres.h"
#include "access/xact.h"
+#include "access/xlogutils.h"
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
@@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
+#include "utils/inval.h"
#include "utils/memutils.h"
/* data for errcontext callback */
@@ -1949,3 +1951,76 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->totalTxns = 0;
rb->totalBytes = 0;
}
+
+/*
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn.
+ * Return true if any meaningful/decodable WAL records are encountered,
+ * otherwise false.
+ */
+bool
+LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
+{
+ bool has_pending_wal = false;
+
+ Assert(MyReplicationSlot);
+
+ PG_TRY();
+ {
+ LogicalDecodingContext *ctx;
+
+ /*
+ * Create our decoding context in fast_forward mode, passing start_lsn
+ * as InvalidXLogRecPtr, so that we start processing from the slot's
+ * confirmed_flush.
+ */
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ NIL,
+ true, /* fast_forward */
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
+
+ /*
+ * Start reading at the slot's restart_lsn, which we know points to a
+ * valid record.
+ */
+ XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+ /* Invalidate non-timetravel entries */
+ InvalidateSystemCaches();
+
+ /* Loop until the end of WAL or some changes are processed */
+ while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
+ {
+ XLogRecord *record;
+ char *errm = NULL;
+
+ record = XLogReadRecord(ctx->reader, &errm);
+
+ if (errm)
+ elog(ERROR, "could not find record for logical decoding: %s", errm);
+
+ if (record != NULL)
+ LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+ has_pending_wal = ctx->processing_required;
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Clean up */
+ FreeDecodingContext(ctx);
+ InvalidateSystemCaches();
+ }
+ PG_CATCH();
+ {
+ /* clear all timetravel entries */
+ InvalidateSystemCaches();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ return has_pending_wal;
+}