diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 133 |
1 files changed, 40 insertions, 93 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index e7614bd515a..eb7293f2f33 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - TransactionId xid, Oid dboid, - TimestampTz commit_time, - int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msg); -static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, - TransactionId xid, TransactionId *sub_xids, int nsubxacts); + xl_xact_parsed_commit *parsed, TransactionId xid); +static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_abort *parsed, TransactionId xid); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -188,7 +185,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; XLogReaderState *r = buf->record; - uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; /* no point in doing anything yet, data could not be decoded anyway */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) @@ -197,87 +194,41 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) switch (info) { case XLOG_XACT_COMMIT: - { - xl_xact_commit *xlrec; - TransactionId *subxacts = NULL; - SharedInvalidationMessage *invals = NULL; - - xlrec = (xl_xact_commit *) XLogRecGetData(r); - - subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - - DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId, - xlrec->xact_time, - xlrec->nsubxacts, subxacts, - xlrec->nmsgs, invals); - - break; - } case XLOG_XACT_COMMIT_PREPARED: { - xl_xact_commit_prepared *prec; xl_xact_commit *xlrec; - TransactionId *subxacts; - SharedInvalidationMessage *invals = NULL; - - /* Prepared commits contain a normal commit record... */ - prec = (xl_xact_commit_prepared *) XLogRecGetData(r); - xlrec = &prec->crec; + xl_xact_parsed_commit parsed; + TransactionId xid; - subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - - DecodeCommit(ctx, buf, prec->xid, xlrec->dbId, - xlrec->xact_time, - xlrec->nsubxacts, subxacts, - xlrec->nmsgs, invals); - - break; - } - case XLOG_XACT_COMMIT_COMPACT: - { - xl_xact_commit_compact *xlrec; + xlrec = (xl_xact_commit *) XLogRecGetData(r); + ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); - xlrec = (xl_xact_commit_compact *) XLogRecGetData(r); + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; - DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid, - xlrec->xact_time, - xlrec->nsubxacts, xlrec->subxacts, - 0, NULL); + DecodeCommit(ctx, buf, &parsed, xid); break; } case XLOG_XACT_ABORT: - { - xl_xact_abort *xlrec; - TransactionId *sub_xids; - - xlrec = (xl_xact_abort *) XLogRecGetData(r); - - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - - DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r), - sub_xids, xlrec->nsubxacts); - break; - } case XLOG_XACT_ABORT_PREPARED: { - xl_xact_abort_prepared *prec; xl_xact_abort *xlrec; - TransactionId *sub_xids; + xl_xact_parsed_abort parsed; + TransactionId xid; - /* prepared abort contain a normal commit abort... */ - prec = (xl_xact_abort_prepared *) XLogRecGetData(r); - xlrec = &prec->arec; + xlrec = (xl_xact_abort *) XLogRecGetData(r); + ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + if (!TransactionIdIsValid(parsed.twophase_xid)) + xid = XLogRecGetXid(r); + else + xid = parsed.twophase_xid; - /* r->xl_xid is committed in a separate record */ - DecodeAbort(ctx, buf->origptr, prec->xid, - sub_xids, xlrec->nsubxacts); + DecodeAbort(ctx, buf, &parsed, xid); break; } - case XLOG_XACT_ASSIGNMENT: { xl_xact_assignment *xlrec; @@ -477,10 +428,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - TransactionId xid, Oid dboid, - TimestampTz commit_time, - int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msgs) + xl_xact_parsed_commit *parsed, TransactionId xid) { int i; @@ -489,15 +437,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * transaction's contents, since the various caches need to always be * consistent. */ - if (ninval_msgs > 0) + if (parsed->nmsgs > 0) { ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - ninval_msgs, msgs); + parsed->nmsgs, parsed->msgs); ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, - nsubxacts, sub_xids); + parsed->nsubxacts, parsed->subxacts); /* ---- * Check whether we are interested in this specific transaction, and tell @@ -524,12 +472,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * --- */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (dboid != InvalidOid && dboid != ctx->slot->data.database)) + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database)) { - for (i = 0; i < nsubxacts; i++) + for (i = 0; i < parsed->nsubxacts; i++) { - ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr); - sub_xids++; + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); } ReorderBufferForget(ctx->reorder, xid, buf->origptr); @@ -537,16 +484,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* tell the reorderbuffer about the surviving subtransactions */ - for (i = 0; i < nsubxacts; i++) + for (i = 0; i < parsed->nsubxacts; i++) { - ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids, + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], buf->origptr, buf->endptr); - sub_xids++; } /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time); + parsed->xact_time); } /* @@ -554,20 +500,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * snapbuild.c and reorderbuffer.c */ static void -DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, - TransactionId *sub_xids, int nsubxacts) +DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_abort *parsed, TransactionId xid) { int i; - SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids); + SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid, + parsed->nsubxacts, parsed->subxacts); - for (i = 0; i < nsubxacts; i++) + for (i = 0; i < parsed->nsubxacts; i++) { - ReorderBufferAbort(ctx->reorder, *sub_xids, lsn); - sub_xids++; + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); } - ReorderBufferAbort(ctx->reorder, xid, lsn); + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); } /* |