summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/logical.c19
-rw-r--r--src/backend/replication/logical/logicalfuncs.c19
-rw-r--r--src/backend/replication/logical/reorderbuffer.c15
3 files changed, 37 insertions, 16 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7e363a7c05b..c68c0481f42 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -2082,7 +2082,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
bool *found_consistent_snapshot)
{
LogicalDecodingContext *ctx;
- ResourceOwner old_resowner = CurrentResourceOwner;
+ ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
XLogRecPtr retlsn;
Assert(moveto != InvalidXLogRecPtr);
@@ -2141,21 +2141,24 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
* might still have critical updates to do.
*/
if (record)
+ {
LogicalDecodingProcessRecord(ctx, ctx->reader);
+ /*
+ * We used to have bugs where logical decoding would fail to
+ * preserve the resource owner. That's important here, so
+ * verify that that doesn't happen anymore. XXX this could be
+ * removed once it's been battle-tested.
+ */
+ Assert(CurrentResourceOwner == old_resowner);
+ }
+
CHECK_FOR_INTERRUPTS();
}
if (found_consistent_snapshot && DecodingContextReady(ctx))
*found_consistent_snapshot = true;
- /*
- * Logical decoding could have clobbered CurrentResourceOwner during
- * transaction management, so restore the executor's value. (This is
- * a kluge, but it's not worth cleaning up right now.)
- */
- CurrentResourceOwner = old_resowner;
-
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
{
LogicalConfirmReceivedLocation(moveto);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index ca53caac2f2..25f890ddeed 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -107,7 +107,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
XLogRecPtr end_of_wal;
XLogRecPtr wait_for_wal_lsn;
LogicalDecodingContext *ctx;
- ResourceOwner old_resowner = CurrentResourceOwner;
+ ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
ArrayType *arr;
Size ndim;
List *options = NIL;
@@ -263,8 +263,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* store the description into our tuplestore.
*/
if (record != NULL)
+ {
LogicalDecodingProcessRecord(ctx, ctx->reader);
+ /*
+ * We used to have bugs where logical decoding would fail to
+ * preserve the resource owner. Verify that that doesn't
+ * happen anymore. XXX this could be removed once it's been
+ * battle-tested.
+ */
+ Assert(CurrentResourceOwner == old_resowner);
+ }
+
/* check limits */
if (upto_lsn != InvalidXLogRecPtr &&
upto_lsn <= ctx->reader->EndRecPtr)
@@ -276,13 +286,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
}
/*
- * Logical decoding could have clobbered CurrentResourceOwner during
- * transaction management, so restore the executor's value. (This is
- * a kluge, but it's not worth cleaning up right now.)
- */
- CurrentResourceOwner = old_resowner;
-
- /*
* Next time, start where we left off. (Hunting things, the family
* business..)
*/
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 34cf05668ae..4736f993c37 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2215,6 +2215,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
{
bool using_subtxn;
MemoryContext ccxt = CurrentMemoryContext;
+ ResourceOwner cowner = CurrentResourceOwner;
ReorderBufferIterTXNState *volatile iterstate = NULL;
volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
ReorderBufferChange *volatile specinsert = NULL;
@@ -2692,7 +2693,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
if (using_subtxn)
+ {
RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(ccxt);
+ CurrentResourceOwner = cowner;
+ }
/*
* We are here due to one of the four reasons: 1. Decoding an
@@ -2751,7 +2756,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
if (using_subtxn)
+ {
RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(ccxt);
+ CurrentResourceOwner = cowner;
+ }
/*
* The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
@@ -3244,6 +3253,8 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
SharedInvalidationMessage *invalidations)
{
bool use_subtxn = IsTransactionOrTransactionBlock();
+ MemoryContext ccxt = CurrentMemoryContext;
+ ResourceOwner cowner = CurrentResourceOwner;
int i;
if (use_subtxn)
@@ -3262,7 +3273,11 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
LocalExecuteInvalidationMessage(&invalidations[i]);
if (use_subtxn)
+ {
RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(ccxt);
+ CurrentResourceOwner = cowner;
+ }
}
/*