summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/proto.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
committerAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
commita8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch)
treebfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/logical/proto.c
parent6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff)
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r--src/backend/replication/logical/proto.c217
1 files changed, 214 insertions, 3 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0f..13c8c3bd5bb 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
}
@@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*
@@ -107,6 +107,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
}
/*
+ * Write BEGIN PREPARE to the output stream.
+ */
+void
+logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
+
+ /* fixed fields */
+ pq_sendint64(out, txn->final_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction BEGIN PREPARE from the stream.
+ */
+void
+logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
+{
+ /* read fields */
+ begin_data->prepare_lsn = pq_getmsgint64(in);
+ if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn not set in begin prepare message");
+ begin_data->end_lsn = pq_getmsgint64(in);
+ if (begin_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn not set in begin prepare message");
+ begin_data->prepare_time = pq_getmsgint64(in);
+ begin_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(begin_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+ Assert(rbtxn_prepared(txn));
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+ /* read fields */
+ prepare_data->prepare_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn is not set in prepare message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in prepare message");
+ prepare_data->prepare_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write COMMIT PREPARED to the output stream.
+ */
+void
+logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction COMMIT PREPARED from the stream.
+ */
+void
+logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
+
+ /* read fields */
+ prepare_data->commit_lsn = pq_getmsgint64(in);
+ if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "commit_lsn is not set in commit prepared message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in commit prepared message");
+ prepare_data->commit_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write ROLLBACK PREPARED to the output stream.
+ */
+void
+logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_end_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction ROLLBACK PREPARED from the stream.
+ */
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+ LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
+
+ /* read fields */
+ rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
+ rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
+ rollback_data->prepare_time = pq_getmsgint64(in);
+ rollback_data->rollback_time = pq_getmsgint64(in);
+ rollback_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(rollback_data->gid, pq_getmsgstring(in));
+}
+
+/*
* Write ORIGIN to the output stream.
*/
void
@@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*