diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d9f157172b2..1432554d5a7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL; /* per stream context for streaming transactions */ static MemoryContext LogicalStreamingContext = NULL; -WalReceiverConn *wrconn = NULL; +WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; @@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextSwitchTo(ApplyMessageContext); - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); if (len != 0) { @@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) MemoryContextReset(ApplyMessageContext); } - len = walrcv_receive(wrconn, &buf, &fd); + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } @@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } /* All done */ - walrcv_endstreaming(wrconn, &tli); + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); } /* @@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) LSN_FORMAT_ARGS(writepos), LSN_FORMAT_ARGS(flushpos)); - walrcv_send(wrconn, reply_message->data, reply_message->len); + walrcv_send(LogRepWorkerWalRcvConn, + reply_message->data, reply_message->len); if (recvpos > last_recvpos) last_recvpos = recvpos; @@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); - wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, - &err); - if (wrconn == NULL) + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); @@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg) * We don't really use the output identify_system for anything but it * does some initializations on the upstream so let's still call it. */ - (void) walrcv_identify_system(wrconn, &startpointTLI); + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); } /* @@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg) options.startpoint = origin_startpos; options.slotname = myslotname; options.proto.logical.proto_version = - walrcv_server_version(wrconn) >= 140000 ? + walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; /* Start normal logical streaming replication. */ - walrcv_startstreaming(wrconn, &options); + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); |