summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c13
1 files changed, 9 insertions, 4 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 13935915382..61588d626f6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
+ SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
if (need_full_snapshot)
slot->effective_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
@@ -445,13 +447,14 @@ void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
XLogRecPtr startptr;
+ ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = ctx->slot->data.restart_lsn;
+ startptr = slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32) (ctx->slot->data.restart_lsn >> 32),
- (uint32) ctx->slot->data.restart_lsn);
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
CHECK_FOR_INTERRUPTS();
}
- ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockAcquire(&slot->mutex);
+ slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ SpinLockRelease(&slot->mutex);
}
/*