summaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c72
1 files changed, 72 insertions, 0 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7005307dc25..30e35dbd28a 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -54,6 +54,9 @@
/* Global variable to indicate if this process is a walreceiver process */
bool am_walreceiver;
+/* GUC variable */
+int wal_receiver_status_interval;
+
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
@@ -88,6 +91,8 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+static StandbyReplyMessage reply_message;
+
/*
* About SIGTERM handling:
*
@@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
+static void XLogWalRcvSendReply(void);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -306,12 +312,23 @@ WalReceiverMain(void)
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
+ /* Let the master know that we received some data. */
+ XLogWalRcvSendReply();
+
/*
* If we've written some records, flush them to disk and let the
* startup process know about them.
*/
XLogWalRcvFlush();
}
+ else
+ {
+ /*
+ * We didn't receive anything new, but send a status update to
+ * the master anyway, to report any progress in applying WAL.
+ */
+ XLogWalRcvSendReply();
+ }
}
}
@@ -546,5 +563,60 @@ XLogWalRcvFlush(void)
LogstreamResult.Write.xrecoff);
set_ps_display(activitymsg, false);
}
+
+ /* Also let the master know that we made some progress */
+ XLogWalRcvSendReply();
}
}
+
+/*
+ * Send reply message to primary, indicating our current XLOG positions and
+ * the current time.
+ */
+static void
+XLogWalRcvSendReply(void)
+{
+ char buf[sizeof(StandbyReplyMessage) + 1];
+ TimestampTz now;
+
+ /*
+ * If the user doesn't want status to be reported to the master, be sure
+ * to exit before doing anything at all.
+ */
+ if (wal_receiver_status_interval <= 0)
+ return;
+
+ /* Get current timestamp. */
+ now = GetCurrentTimestamp();
+
+ /*
+ * We can compare the write and flush positions to the last message we
+ * sent without taking any lock, but the apply position requires a spin
+ * lock, so we don't check that unless something else has changed or 10
+ * seconds have passed. This means that the apply log position will
+ * appear, from the master's point of view, to lag slightly, but since
+ * this is only for reporting purposes and only on idle systems, that's
+ * probably OK.
+ */
+ if (XLByteEQ(reply_message.write, LogstreamResult.Write)
+ && XLByteEQ(reply_message.flush, LogstreamResult.Flush)
+ && !TimestampDifferenceExceeds(reply_message.sendTime, now,
+ wal_receiver_status_interval * 1000))
+ return;
+
+ /* Construct a new message. */
+ reply_message.write = LogstreamResult.Write;
+ reply_message.flush = LogstreamResult.Flush;
+ reply_message.apply = GetXLogReplayRecPtr();
+ reply_message.sendTime = now;
+
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+ reply_message.write.xlogid, reply_message.write.xrecoff,
+ reply_message.flush.xlogid, reply_message.flush.xrecoff,
+ reply_message.apply.xlogid, reply_message.apply.xrecoff);
+
+ /* Prepend with the message type and send it. */
+ buf[0] = 'r';
+ memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
+ walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+}