diff options
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r-- | src/backend/replication/logical/proto.c | 217 |
1 files changed, 214 insertions, 3 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 1cf59e0fb0f..13c8c3bd5bb 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) /* fixed fields */ pq_sendint64(out, txn->final_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); pq_sendint32(out, txn->xid); } @@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, /* send fields */ pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); } /* @@ -107,6 +107,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) } /* + * Write BEGIN PREPARE to the output stream. + */ +void +logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction BEGIN PREPARE from the stream. + */ +void +logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data) +{ + /* read fields */ + begin_data->prepare_lsn = pq_getmsgint64(in); + if (begin_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn not set in begin prepare message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin prepare message"); + begin_data->prepare_time = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} + +/* + * Write PREPARE to the output stream. + */ +void +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + Assert(rbtxn_prepared(txn)); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.prepare_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in prepare message", flags); + + /* read fields */ + prepare_data->prepare_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_lsn is not set in prepare message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in prepare message"); + prepare_data->prepare_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write COMMIT PREPARED to the output stream. + */ +void +logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->xact_time.commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction COMMIT PREPARED from the stream. + */ +void +logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit prepared message", flags); + + /* read fields */ + prepare_data->commit_lsn = pq_getmsgint64(in); + if (prepare_data->commit_lsn == InvalidXLogRecPtr) + elog(ERROR, "commit_lsn is not set in commit prepared message"); + prepare_data->end_lsn = pq_getmsgint64(in); + if (prepare_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->commit_time = pq_getmsgint64(in); + prepare_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(prepare_data->gid, pq_getmsgstring(in)); +} + +/* + * Write ROLLBACK PREPARED to the output stream. + */ +void +logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED); + + /* + * This should only ever happen for two-phase commit transactions, in + * which case we expect to have a valid GID. + */ + Assert(txn->gid != NULL); + + /* send the flags field */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, prepare_end_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); + pq_sendint64(out, txn->xact_time.commit_time); + pq_sendint32(out, txn->xid); + + /* send gid */ + pq_sendstring(out, txn->gid); +} + +/* + * Read transaction ROLLBACK PREPARED from the stream. + */ +void +logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data) +{ + /* read flags */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in rollback prepared message", flags); + + /* read fields */ + rollback_data->prepare_end_lsn = pq_getmsgint64(in); + if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); + rollback_data->rollback_end_lsn = pq_getmsgint64(in); + if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); + rollback_data->prepare_time = pq_getmsgint64(in); + rollback_data->rollback_time = pq_getmsgint64(in); + rollback_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(rollback_data->gid, pq_getmsgstring(in)); +} + +/* * Write ORIGIN to the output stream. */ void @@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, /* send fields */ pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); - pq_sendint64(out, txn->commit_time); + pq_sendint64(out, txn->xact_time.commit_time); } /* |