summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c44
1 files changed, 23 insertions, 21 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c2e5e3abf82..0c0c3717391 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,11 +94,27 @@ void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
XLogRecordBuffer buf;
+ TransactionId txid;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
buf.record = record;
+ txid = XLogRecGetTopXid(record);
+
+ /*
+ * If the top-level xid is valid, we need to assign the subxact to the
+ * top-level xact. We need to do this for all records, hence we do it
+ * before the switch.
+ */
+ if (TransactionIdIsValid(txid))
+ {
+ ReorderBufferAssignChild(ctx->reorder,
+ txid,
+ record->decoded_record->xl_xid,
+ buf.origptr);
+ }
+
/* cast so we get a warning when new rmgrs are added */
switch ((RmgrId) XLogRecGetRmid(record))
{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If the snapshot isn't yet fully built, we cannot decode anything, so
* bail out.
- *
- * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
- * when the snapshot is being built: it is possible to get later records
- * that require subxids to be properly assigned.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
- info != XLOG_XACT_ASSIGNMENT)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
}
case XLOG_XACT_ASSIGNMENT:
- {
- xl_xact_assignment *xlrec;
- int i;
- TransactionId *sub_xid;
- xlrec = (xl_xact_assignment *) XLogRecGetData(r);
-
- sub_xid = &xlrec->xsub[0];
-
- for (i = 0; i < xlrec->nsubxacts; i++)
- {
- ReorderBufferAssignChild(reorder, xlrec->xtop,
- *(sub_xid++), buf->origptr);
- }
- break;
- }
+ /*
+ * We assign subxact to the toplevel xact while processing each
+ * record if required. So, we don't need to do anything here.
+ * See LogicalDecodingProcessRecord.
+ */
+ break;
case XLOG_XACT_PREPARE:
/*