summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/proto.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r--src/backend/replication/logical/proto.c217
1 files changed, 214 insertions, 3 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0f..13c8c3bd5bb 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
}
@@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*
@@ -107,6 +107,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
}
/*
+ * Write BEGIN PREPARE to the output stream.
+ */
+void
+logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
+
+ /* fixed fields */
+ pq_sendint64(out, txn->final_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction BEGIN PREPARE from the stream.
+ */
+void
+logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
+{
+ /* read fields */
+ begin_data->prepare_lsn = pq_getmsgint64(in);
+ if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn not set in begin prepare message");
+ begin_data->end_lsn = pq_getmsgint64(in);
+ if (begin_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn not set in begin prepare message");
+ begin_data->prepare_time = pq_getmsgint64(in);
+ begin_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(begin_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+ Assert(rbtxn_prepared(txn));
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+ /* read fields */
+ prepare_data->prepare_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn is not set in prepare message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in prepare message");
+ prepare_data->prepare_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write COMMIT PREPARED to the output stream.
+ */
+void
+logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction COMMIT PREPARED from the stream.
+ */
+void
+logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
+
+ /* read fields */
+ prepare_data->commit_lsn = pq_getmsgint64(in);
+ if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "commit_lsn is not set in commit prepared message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in commit prepared message");
+ prepare_data->commit_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write ROLLBACK PREPARED to the output stream.
+ */
+void
+logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_end_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction ROLLBACK PREPARED from the stream.
+ */
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+ LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
+
+ /* read fields */
+ rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
+ rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
+ rollback_data->prepare_time = pq_getmsgint64(in);
+ rollback_data->rollback_time = pq_getmsgint64(in);
+ rollback_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(rollback_data->gid, pq_getmsgstring(in));
+}
+
+/*
* Write ORIGIN to the output stream.
*/
void
@@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*