diff options
-rw-r--r-- | src/backend/replication/logical/logical.c | 19 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 19 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 15 |
3 files changed, 37 insertions, 16 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7e363a7c05b..c68c0481f42 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -2082,7 +2082,7 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot) { LogicalDecodingContext *ctx; - ResourceOwner old_resowner = CurrentResourceOwner; + ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner; XLogRecPtr retlsn; Assert(moveto != InvalidXLogRecPtr); @@ -2141,21 +2141,24 @@ LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, * might still have critical updates to do. */ if (record) + { LogicalDecodingProcessRecord(ctx, ctx->reader); + /* + * We used to have bugs where logical decoding would fail to + * preserve the resource owner. That's important here, so + * verify that that doesn't happen anymore. XXX this could be + * removed once it's been battle-tested. + */ + Assert(CurrentResourceOwner == old_resowner); + } + CHECK_FOR_INTERRUPTS(); } if (found_consistent_snapshot && DecodingContextReady(ctx)) *found_consistent_snapshot = true; - /* - * Logical decoding could have clobbered CurrentResourceOwner during - * transaction management, so restore the executor's value. (This is - * a kluge, but it's not worth cleaning up right now.) - */ - CurrentResourceOwner = old_resowner; - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) { LogicalConfirmReceivedLocation(moveto); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index ca53caac2f2..25f890ddeed 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -107,7 +107,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin XLogRecPtr end_of_wal; XLogRecPtr wait_for_wal_lsn; LogicalDecodingContext *ctx; - ResourceOwner old_resowner = CurrentResourceOwner; + ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner; ArrayType *arr; Size ndim; List *options = NIL; @@ -263,8 +263,18 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * store the description into our tuplestore. */ if (record != NULL) + { LogicalDecodingProcessRecord(ctx, ctx->reader); + /* + * We used to have bugs where logical decoding would fail to + * preserve the resource owner. Verify that that doesn't + * happen anymore. XXX this could be removed once it's been + * battle-tested. + */ + Assert(CurrentResourceOwner == old_resowner); + } + /* check limits */ if (upto_lsn != InvalidXLogRecPtr && upto_lsn <= ctx->reader->EndRecPtr) @@ -276,13 +286,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin } /* - * Logical decoding could have clobbered CurrentResourceOwner during - * transaction management, so restore the executor's value. (This is - * a kluge, but it's not worth cleaning up right now.) - */ - CurrentResourceOwner = old_resowner; - - /* * Next time, start where we left off. (Hunting things, the family * business..) */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 34cf05668ae..4736f993c37 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2215,6 +2215,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, { bool using_subtxn; MemoryContext ccxt = CurrentMemoryContext; + ResourceOwner cowner = CurrentResourceOwner; ReorderBufferIterTXNState *volatile iterstate = NULL; volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; ReorderBufferChange *volatile specinsert = NULL; @@ -2692,7 +2693,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } if (using_subtxn) + { RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(ccxt); + CurrentResourceOwner = cowner; + } /* * We are here due to one of the four reasons: 1. Decoding an @@ -2751,7 +2756,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } if (using_subtxn) + { RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(ccxt); + CurrentResourceOwner = cowner; + } /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent @@ -3244,6 +3253,8 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations) { bool use_subtxn = IsTransactionOrTransactionBlock(); + MemoryContext ccxt = CurrentMemoryContext; + ResourceOwner cowner = CurrentResourceOwner; int i; if (use_subtxn) @@ -3262,7 +3273,11 @@ ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, LocalExecuteInvalidationMessage(&invalidations[i]); if (use_subtxn) + { RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(ccxt); + CurrentResourceOwner = cowner; + } } /* |