summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c58
1 files changed, 35 insertions, 23 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0c0c3717391..f3a1c31a292 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* We assign subxact to the toplevel xact while processing each
- * record if required. So, we don't need to do anything here.
- * See LogicalDecodingProcessRecord.
+ * record if required. So, we don't need to do anything here. See
+ * LogicalDecodingProcessRecord.
*/
break;
+ case XLOG_XACT_INVALIDATIONS:
+ {
+ TransactionId xid;
+ xl_xact_invals *invals;
+
+ xid = XLogRecGetXid(r);
+ invals = (xl_xact_invals *) XLogRecGetData(r);
+
+ /*
+ * Execute the invalidations for xid-less transactions,
+ * otherwise, accumulate them so that they can be processed at
+ * the commit time.
+ */
+ if (TransactionIdIsValid(xid))
+ {
+ if (!ctx->fast_forward)
+ ReorderBufferAddInvalidations(reorder, xid,
+ buf->origptr,
+ invals->nmsgs,
+ invals->msgs);
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
+ buf->origptr);
+ }
+ else if ((!ctx->fast_forward))
+ ReorderBufferImmediateInvalidation(ctx->reorder,
+ invals->nmsgs,
+ invals->msgs);
+ }
+ break;
case XLOG_XACT_PREPARE:
/*
@@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_STANDBY_LOCK:
break;
case XLOG_INVALIDATIONS:
- {
- xl_invalidations *invalidations =
- (xl_invalidations *) XLogRecGetData(r);
- if (!ctx->fast_forward)
- ReorderBufferImmediateInvalidation(ctx->reorder,
- invalidations->nmsgs,
- invalidations->msgs);
- }
+ /*
+ * We are processing the invalidations at the command level via
+ * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
+ */
break;
default:
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
@@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
commit_time = parsed->origin_timestamp;
}
- /*
- * Process invalidation messages, even if we're not interested in the
- * transaction's contents, since the various caches need to always be
- * consistent.
- */
- if (parsed->nmsgs > 0)
- {
- if (!ctx->fast_forward)
- ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- parsed->nmsgs, parsed->msgs);
- ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
- }
-
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
parsed->nsubxacts, parsed->subxacts);