summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
committerAmit Kapila <akapila@postgresql.org>2021-07-14 07:33:50 +0530
commita8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch)
treebfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/logical/worker.c
parent6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff)
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c347
1 files changed, 343 insertions, 4 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5fc620c7f19..b9a7a7ffbb3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -49,6 +49,79 @@
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
+ *
+ * TWO_PHASE TRANSACTIONS
+ * ----------------------
+ * Two phase transactions are replayed at prepare and then committed or
+ * rolled back at commit prepared and rollback prepared respectively. It is
+ * possible to have a prepared transaction that arrives at the apply worker
+ * when the tablesync is busy doing the initial copy. In this case, the apply
+ * worker skips all the prepared operations [e.g. inserts] while the tablesync
+ * is still busy (see the condition of should_apply_changes_for_rel). The
+ * tablesync worker might not get such a prepared transaction because say it
+ * was prior to the initial consistent point but might have got some later
+ * commits. Now, the tablesync worker will exit without doing anything for the
+ * prepared transaction skipped by the apply worker as the sync location for it
+ * will be already ahead of the apply worker's current location. This would lead
+ * to an "empty prepare", because later when the apply worker does the commit
+ * prepare, there is nothing in it (the inserts were skipped earlier).
+ *
+ * To avoid this, and similar prepare confusions the subscription's two_phase
+ * commit is enabled only after the initial sync is over. The two_phase option
+ * has been implemented as a tri-state with values DISABLED, PENDING, and
+ * ENABLED.
+ *
+ * Even if the user specifies they want a subscription with two_phase = on,
+ * internally it will start with a tri-state of PENDING which only becomes
+ * ENABLED after all tablesync initializations are completed - i.e. when all
+ * tablesync workers have reached their READY state. In other words, the value
+ * PENDING is only a temporary state for subscription start-up.
+ *
+ * Until the two_phase is properly available (ENABLED) the subscription will
+ * behave as if two_phase = off. When the apply worker detects that all
+ * tablesyncs have become READY (while the tri-state was PENDING) it will
+ * restart the apply worker process. This happens in
+ * process_syncing_tables_for_apply.
+ *
+ * When the (re-started) apply worker finds that all tablesyncs are READY for a
+ * two_phase tri-state of PENDING it start streaming messages with the
+ * two_phase option which in turn enables the decoding of two-phase commits at
+ * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
+ * Now, it is possible that during the time we have not enabled two_phase, the
+ * publisher (replication server) would have skipped some prepares but we
+ * ensure that such prepares are sent along with commit prepare, see
+ * ReorderBufferFinishPrepared.
+ *
+ * If the subscription has no tables then a two_phase tri-state PENDING is
+ * left unchanged. This lets the user still do an ALTER TABLE REFRESH
+ * PUBLICATION which might otherwise be disallowed (see below).
+ *
+ * If ever a user needs to be aware of the tri-state value, they can fetch it
+ * from the pg_subscription catalog (see column subtwophasestate).
+ *
+ * We don't allow to toggle two_phase option of a subscription because it can
+ * lead to an inconsistent replica. Consider, initially, it was on and we have
+ * received some prepare then we turn it off, now at commit time the server
+ * will send the entire transaction data along with the commit. With some more
+ * analysis, we can allow changing this option from off to on but not sure if
+ * that alone would be useful.
+ *
+ * Finally, to avoid problems mentioned in previous paragraphs from any
+ * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
+ * to 'off' and then again back to 'on') there is a restriction for
+ * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
+ * the two_phase tri-state is ENABLED, except when copy_data = false.
+ *
+ * We can get prepare of the same GID more than once for the genuine cases
+ * where we have defined multiple subscriptions for publications on the same
+ * server and prepared transaction has operations on tables subscribed to those
+ * subscriptions. For such cases, if we use the GID sent by publisher one of
+ * the prepares will be successful and others will fail, in which case the
+ * server will send them again. Now, this can lead to a deadlock if user has
+ * set synchronous_standby_names for all the subscriptions on subscriber. To
+ * avoid such deadlocks, we generate a unique GID (consisting of the
+ * subscription oid and the xid of the prepared transaction) for each prepare
+ * transaction on the subscriber.
*-------------------------------------------------------------------------
*/
@@ -59,6 +132,7 @@
#include "access/table.h"
#include "access/tableam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
@@ -256,6 +330,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
LogicalRepTupleData *newtup,
CmdType operation);
+/* Compute GID for two_phase transactions */
+static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+
+
/*
* Should this worker apply changes for given relation.
*
@@ -784,6 +862,185 @@ apply_handle_commit(StringInfo s)
}
/*
+ * Handle BEGIN PREPARE message.
+ */
+static void
+apply_handle_begin_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData begin_data;
+
+ /* Tablesync should never receive prepare. */
+ if (am_tablesync_worker())
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+
+ logicalrep_read_begin_prepare(s, &begin_data);
+
+ remote_final_lsn = begin_data.prepare_lsn;
+
+ in_remote_transaction = true;
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData prepare_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_prepare(s, &prepare_data);
+
+ if (prepare_data.prepare_lsn != remote_final_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn))));
+
+ /*
+ * Compute unique GID for two_phase transactions. We don't use GID of
+ * prepared transaction sent by server as that can lead to deadlock when
+ * we have multiple subscriptions from same node point to publications on
+ * the same node. See comments atop worker.c
+ */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+ gid, sizeof(gid));
+
+ /*
+ * Unlike commit, here, we always prepare the transaction even though no
+ * change has happened in this transaction. It is done this way because at
+ * commit prepared time, we won't know whether we have skipped preparing a
+ * transaction because of no change.
+ *
+ * XXX, We can optimize such that at commit prepared time, we first check
+ * whether we have prepared the transaction or not but that doesn't seem
+ * worthwhile because such cases shouldn't be common.
+ */
+ begin_replication_step();
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+ PrepareTransactionBlock(gid);
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a COMMIT PREPARED of a previously PREPARED transaction.
+ */
+static void
+apply_handle_commit_prepared(StringInfo s)
+{
+ LogicalRepCommitPreparedTxnData prepare_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_commit_prepared(s, &prepare_data);
+
+ /* Compute GID for two_phase transactions. */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+ gid, sizeof(gid));
+
+ /* There is no transaction when COMMIT PREPARED is called */
+ begin_replication_step();
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.commit_time;
+
+ FinishPreparedTransaction(gid, true);
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
+ */
+static void
+apply_handle_rollback_prepared(StringInfo s)
+{
+ LogicalRepRollbackPreparedTxnData rollback_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_rollback_prepared(s, &rollback_data);
+
+ /* Compute GID for two_phase transactions. */
+ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
+ gid, sizeof(gid));
+
+ /*
+ * It is possible that we haven't received prepare because it occurred
+ * before walsender reached a consistent point or the two_phase was still
+ * not enabled by that time, so in such cases, we need to skip rollback
+ * prepared.
+ */
+ if (LookupGXact(gid, rollback_data.prepare_end_lsn,
+ rollback_data.prepare_time))
+ {
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
+ replorigin_session_origin_timestamp = rollback_data.rollback_time;
+
+ /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
+ begin_replication_step();
+ FinishPreparedTransaction(gid, false);
+ end_replication_step();
+ CommitTransactionCommand();
+ }
+
+ pgstat_report_stat(false);
+
+ store_flush_position(rollback_data.rollback_end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(rollback_data.rollback_end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
* Handle ORIGIN message.
*
* TODO, support tracking of multiple origins
@@ -2060,6 +2317,22 @@ apply_dispatch(StringInfo s)
case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s);
return;
+
+ case LOGICAL_REP_MSG_BEGIN_PREPARE:
+ apply_handle_begin_prepare(s);
+ return;
+
+ case LOGICAL_REP_MSG_PREPARE:
+ apply_handle_prepare(s);
+ return;
+
+ case LOGICAL_REP_MSG_COMMIT_PREPARED:
+ apply_handle_commit_prepared(s);
+ return;
+
+ case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+ apply_handle_rollback_prepared(s);
+ return;
}
ereport(ERROR,
@@ -2539,6 +2812,9 @@ maybe_reread_subscription(void)
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
+ /* two-phase should not be altered */
+ Assert(newsub->twophasestate == MySubscription->twophasestate);
+
/*
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
@@ -3040,6 +3316,24 @@ cleanup_subxact_info()
subxact_data.nsubxacts_max = 0;
}
+/*
+ * Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+static void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
+{
+ Assert(subid != InvalidRepOriginId);
+
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid two-phase transaction ID")));
+
+ snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -3050,6 +3344,7 @@ ApplyWorkerMain(Datum main_arg)
XLogRecPtr origin_startpos;
char *myslotname;
WalRcvStreamOptions options;
+ int server_version;
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
@@ -3208,15 +3503,59 @@ ApplyWorkerMain(Datum main_arg)
options.logical = true;
options.startpoint = origin_startpos;
options.slotname = myslotname;
+
+ server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
- walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
- LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
+ server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+ server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+ LOGICALREP_PROTO_VERSION_NUM;
+
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
+ options.proto.logical.twophase = false;
+
+ if (!am_tablesync_worker())
+ {
+ /*
+ * Even when the two_phase mode is requested by the user, it remains
+ * as the tri-state PENDING until all tablesyncs have reached READY
+ * state. Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ {
+ /* Start streaming with two_phase enabled */
+ options.proto.logical.twophase = true;
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- /* Start normal logical streaming replication. */
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ StartTransactionCommand();
+ UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+ MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ }
+
+ ereport(DEBUG1,
+ (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
+ MySubscription->name,
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+ "?")));
+ }
+ else
+ {
+ /* Start normal logical streaming replication. */
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ }
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);