summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-07-29 15:51:45 +0530
committerAmit Kapila <akapila@postgresql.org>2021-07-29 15:51:45 +0530
commit91f9861242cd7dcf28fae216b1d8b47551c9159d (patch)
tree20c15db5314a5809714068b84b172453131e81e6 /src/backend/replication/logical/worker.c
parent454ae15d10ea2d11669b69e82c98fbd03126fd69 (diff)
Refactor to make common functions in proto.c and worker.c.
This is a non-functional change only to refactor code to extract some replication logic into static functions. This is done as preparation for the 2PC streaming patch which also shares this common logic. Author: Peter Smith Reviewed-By: Amit Kapila Discussion: https://postgr.es/m/CAHut+PuiSA8AiLcE2N5StzSKs46SQEP_vDOUD5fX2XCVtfZ7mQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c101
1 files changed, 63 insertions, 38 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b9a7a7ffbb3..3f499b11f72 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+/* Common streaming function to apply all the spooled messages */
+static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/*
* Should this worker apply changes for given relation.
@@ -885,13 +887,46 @@ apply_handle_begin_prepare(StringInfo s)
}
/*
+ * Common function to prepare the GID.
+ */
+static void
+apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
+{
+ char gid[GIDSIZE];
+
+ /*
+ * Compute unique GID for two_phase transactions. We don't use GID of
+ * prepared transaction sent by server as that can lead to deadlock when
+ * we have multiple subscriptions from same node point to publications on
+ * the same node. See comments atop worker.c
+ */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
+ gid, sizeof(gid));
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data->end_lsn;
+ replorigin_session_origin_timestamp = prepare_data->prepare_time;
+
+ PrepareTransactionBlock(gid);
+}
+
+/*
* Handle PREPARE message.
*/
static void
apply_handle_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
- char gid[GIDSIZE];
logicalrep_read_prepare(s, &prepare_data);
@@ -903,15 +938,6 @@ apply_handle_prepare(StringInfo s)
LSN_FORMAT_ARGS(remote_final_lsn))));
/*
- * Compute unique GID for two_phase transactions. We don't use GID of
- * prepared transaction sent by server as that can lead to deadlock when
- * we have multiple subscriptions from same node point to publications on
- * the same node. See comments atop worker.c
- */
- TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
- gid, sizeof(gid));
-
- /*
* Unlike commit, here, we always prepare the transaction even though no
* change has happened in this transaction. It is done this way because at
* commit prepared time, we won't know whether we have skipped preparing a
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
*/
begin_replication_step();
- /*
- * BeginTransactionBlock is necessary to balance the EndTransactionBlock
- * called within the PrepareTransactionBlock below.
- */
- BeginTransactionBlock();
- CommitTransactionCommand(); /* Completes the preceding Begin command. */
-
- /*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
- */
- replorigin_session_origin_lsn = prepare_data.end_lsn;
- replorigin_session_origin_timestamp = prepare_data.prepare_time;
+ apply_handle_prepare_internal(&prepare_data);
- PrepareTransactionBlock(gid);
end_replication_step();
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
}
/*
- * Handle STREAM COMMIT message.
+ * Common spoolfile processing.
*/
static void
-apply_handle_stream_commit(StringInfo s)
+apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
{
- TransactionId xid;
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
- if (in_streamed_transaction)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM COMMIT message without STREAM STOP")));
-
- xid = logicalrep_read_stream_commit(s, &commit_data);
-
- elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
MemoryContextSwitchTo(oldcxt);
- remote_final_lsn = commit_data.commit_lsn;
+ remote_final_lsn = lsn;
/*
* Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
+ return;
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ TransactionId xid;
+ LogicalRepCommitData commit_data;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+ xid = logicalrep_read_stream_commit(s, &commit_data);
+
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ apply_spooled_messages(xid, commit_data.commit_lsn);
+
apply_handle_commit_internal(s, &commit_data);
/* unlink the files with serialized changes and subxact info */