diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 0c0c3717391..f3a1c31a292 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * We assign subxact to the toplevel xact while processing each - * record if required. So, we don't need to do anything here. - * See LogicalDecodingProcessRecord. + * record if required. So, we don't need to do anything here. See + * LogicalDecodingProcessRecord. */ break; + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + xl_xact_invals *invals; + + xid = XLogRecGetXid(r); + invals = (xl_xact_invals *) XLogRecGetData(r); + + /* + * Execute the invalidations for xid-less transactions, + * otherwise, accumulate them so that they can be processed at + * the commit time. + */ + if (TransactionIdIsValid(xid)) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(reorder, xid, + buf->origptr, + invals->nmsgs, + invals->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, + buf->origptr); + } + else if ((!ctx->fast_forward)) + ReorderBufferImmediateInvalidation(ctx->reorder, + invals->nmsgs, + invals->msgs); + } + break; case XLOG_XACT_PREPARE: /* @@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_STANDBY_LOCK: break; case XLOG_INVALIDATIONS: - { - xl_invalidations *invalidations = - (xl_invalidations *) XLogRecGetData(r); - if (!ctx->fast_forward) - ReorderBufferImmediateInvalidation(ctx->reorder, - invalidations->nmsgs, - invalidations->msgs); - } + /* + * We are processing the invalidations at the command level via + * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here. + */ break; default: elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); @@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } - /* - * Process invalidation messages, even if we're not interested in the - * transaction's contents, since the various caches need to always be - * consistent. - */ - if (parsed->nmsgs > 0) - { - if (!ctx->fast_forward) - ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - parsed->nmsgs, parsed->msgs); - ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); - } - SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); |