summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c133
1 files changed, 40 insertions, 93 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e7614bd515a..eb7293f2f33 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
- TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+ xl_xact_parsed_commit *parsed, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -188,7 +185,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
XLogReaderState *r = buf->record;
- uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
@@ -197,87 +194,41 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
switch (info)
{
case XLOG_XACT_COMMIT:
- {
- xl_xact_commit *xlrec;
- TransactionId *subxacts = NULL;
- SharedInvalidationMessage *invals = NULL;
-
- xlrec = (xl_xact_commit *) XLogRecGetData(r);
-
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
case XLOG_XACT_COMMIT_PREPARED:
{
- xl_xact_commit_prepared *prec;
xl_xact_commit *xlrec;
- TransactionId *subxacts;
- SharedInvalidationMessage *invals = NULL;
-
- /* Prepared commits contain a normal commit record... */
- prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
- xlrec = &prec->crec;
+ xl_xact_parsed_commit parsed;
+ TransactionId xid;
- subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
- DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
- xlrec->xact_time,
- xlrec->nsubxacts, subxacts,
- xlrec->nmsgs, invals);
-
- break;
- }
- case XLOG_XACT_COMMIT_COMPACT:
- {
- xl_xact_commit_compact *xlrec;
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
- xlrec->xact_time,
- xlrec->nsubxacts, xlrec->subxacts,
- 0, NULL);
+ DecodeCommit(ctx, buf, &parsed, xid);
break;
}
case XLOG_XACT_ABORT:
- {
- xl_xact_abort *xlrec;
- TransactionId *sub_xids;
-
- xlrec = (xl_xact_abort *) XLogRecGetData(r);
-
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
- DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
- sub_xids, xlrec->nsubxacts);
- break;
- }
case XLOG_XACT_ABORT_PREPARED:
{
- xl_xact_abort_prepared *prec;
xl_xact_abort *xlrec;
- TransactionId *sub_xids;
+ xl_xact_parsed_abort parsed;
+ TransactionId xid;
- /* prepared abort contain a normal commit abort... */
- prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
- xlrec = &prec->arec;
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
+ ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
- /* r->xl_xid is committed in a separate record */
- DecodeAbort(ctx, buf->origptr, prec->xid,
- sub_xids, xlrec->nsubxacts);
+ DecodeAbort(ctx, buf, &parsed, xid);
break;
}
-
case XLOG_XACT_ASSIGNMENT:
{
xl_xact_assignment *xlrec;
@@ -477,10 +428,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msgs)
+ xl_xact_parsed_commit *parsed, TransactionId xid)
{
int i;
@@ -489,15 +437,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* transaction's contents, since the various caches need to always be
* consistent.
*/
- if (ninval_msgs > 0)
+ if (parsed->nmsgs > 0)
{
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- ninval_msgs, msgs);
+ parsed->nmsgs, parsed->msgs);
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
- nsubxacts, sub_xids);
+ parsed->nsubxacts, parsed->subxacts);
/* ----
* Check whether we are interested in this specific transaction, and tell
@@ -524,12 +472,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (dboid != InvalidOid && dboid != ctx->slot->data.database))
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
{
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
- sub_xids++;
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
}
ReorderBufferForget(ctx->reorder, xid, buf->origptr);
@@ -537,16 +484,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/* tell the reorderbuffer about the surviving subtransactions */
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
buf->origptr, buf->endptr);
- sub_xids++;
}
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ parsed->xact_time);
}
/*
@@ -554,20 +500,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* snapbuild.c and reorderbuffer.c
*/
static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
- TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid)
{
int i;
- SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+ SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+ parsed->nsubxacts, parsed->subxacts);
- for (i = 0; i < nsubxacts; i++)
+ for (i = 0; i < parsed->nsubxacts; i++)
{
- ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
- sub_xids++;
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
}
- ReorderBufferAbort(ctx->reorder, xid, lsn);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
}
/*