summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c23
1 files changed, 12 insertions, 11 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d9f157172b2..1432554d5a7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
bool MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextSwitchTo(ApplyMessageContext);
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
if (len != 0)
{
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
MemoryContextReset(ApplyMessageContext);
}
- len = walrcv_receive(wrconn, &buf, &fd);
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* All done */
- walrcv_endstreaming(wrconn, &tli);
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
}
/*
@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
LSN_FORMAT_ARGS(writepos),
LSN_FORMAT_ARGS(flushpos));
- walrcv_send(wrconn, reply_message->data, reply_message->len);
+ walrcv_send(LogRepWorkerWalRcvConn,
+ reply_message->data, reply_message->len);
if (recvpos > last_recvpos)
last_recvpos = recvpos;
@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
- wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
- &err);
- if (wrconn == NULL)
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ MySubscription->name, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg)
* We don't really use the output identify_system for anything but it
* does some initializations on the upstream so let's still call it.
*/
- (void) walrcv_identify_system(wrconn, &startpointTLI);
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
}
/*
@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
options.startpoint = origin_startpos;
options.slotname = myslotname;
options.proto.logical.proto_version =
- walrcv_server_version(wrconn) >= 140000 ?
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
/* Start normal logical streaming replication. */
- walrcv_startstreaming(wrconn, &options);
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);