summaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c94
1 files changed, 79 insertions, 15 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 869457003aa..b613df4c6a0 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -55,6 +55,7 @@
/* GUC variables */
int wal_receiver_status_interval;
+int wal_receiver_timeout;
bool hot_standby_feedback;
/* libpqreceiver hooks to these when loaded */
@@ -121,7 +122,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(void);
+static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(void);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -170,9 +171,10 @@ WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
-
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ TimestampTz last_recv_timestamp;
+ bool ping_sent;
/*
* WalRcv should be set up already (if we are a backend, we inherit this
@@ -282,6 +284,10 @@ WalReceiverMain(void)
MemSet(&reply_message, 0, sizeof(reply_message));
MemSet(&feedback_message, 0, sizeof(feedback_message));
+ /* Initialize the last recv timestamp */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+
/* Loop until end-of-streaming or error */
for (;;)
{
@@ -316,15 +322,23 @@ WalReceiverMain(void)
/* Wait a while for data to arrive */
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
+ /* Something was received from master, so reset timeout */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+
/* Accept the received data, and process it */
XLogWalRcvProcessMsg(type, buf, len);
/* Receive any more data we can without sleeping */
while (walrcv_receive(0, &type, &buf, &len))
+ {
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
XLogWalRcvProcessMsg(type, buf, len);
+ }
/* Let the master know that we received some data. */
- XLogWalRcvSendReply();
+ XLogWalRcvSendReply(false, false);
/*
* If we've written some records, flush them to disk and let the
@@ -335,10 +349,48 @@ WalReceiverMain(void)
else
{
/*
- * We didn't receive anything new, but send a status update to the
- * master anyway, to report any progress in applying WAL.
+ * We didn't receive anything new. If we haven't heard anything
+ * from the server for more than wal_receiver_timeout / 2,
+ * ping the server. Also, if it's been longer than
+ * wal_receiver_status_interval since the last update we sent,
+ * send a status update to the master anyway, to report any
+ * progress in applying WAL.
+ */
+ bool requestReply = false;
+
+ /*
+ * Check if time since last receive from standby has reached the
+ * configured limit.
*/
- XLogWalRcvSendReply();
+ if (wal_receiver_timeout > 0)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz timeout;
+
+ timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+ wal_receiver_timeout);
+
+ if (now >= timeout)
+ ereport(ERROR,
+ (errmsg("terminating walreceiver due to timeout")));
+
+ /*
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
+ */
+ if (!ping_sent)
+ {
+ timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+ (wal_receiver_timeout/2));
+ if (now >= timeout)
+ {
+ requestReply = true;
+ ping_sent = true;
+ }
+ }
+ }
+
+ XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback();
}
}
@@ -460,6 +512,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+
+ /* If the primary requested a reply, send one immediately */
+ if (keepalive.replyRequested)
+ XLogWalRcvSendReply(true, false);
break;
}
default:
@@ -609,19 +665,25 @@ XLogWalRcvFlush(bool dying)
/* Also let the master know that we made some progress */
if (!dying)
- {
- XLogWalRcvSendReply();
- XLogWalRcvSendHSFeedback();
- }
+ XLogWalRcvSendReply(false, false);
}
}
/*
- * Send reply message to primary, indicating our current XLOG positions and
- * the current time.
+ * Send reply message to primary, indicating our current XLOG positions, oldest
+ * xmin and the current time.
+ *
+ * If 'force' is not set, the message is only sent if enough time has
+ * passed since last status update to reach wal_receiver_status_internal.
+ * If wal_receiver_status_interval is disabled altogether and 'force' is
+ * false, this is a no-op.
+ *
+ * If 'requestReply' is true, requests the server to reply immediately upon
+ * receiving this message. This is used for heartbearts, when approaching
+ * wal_receiver_timeout.
*/
static void
-XLogWalRcvSendReply(void)
+XLogWalRcvSendReply(bool force, bool requestReply)
{
char buf[sizeof(StandbyReplyMessage) + 1];
TimestampTz now;
@@ -630,7 +692,7 @@ XLogWalRcvSendReply(void)
* If the user doesn't want status to be reported to the master, be sure
* to exit before doing anything at all.
*/
- if (wal_receiver_status_interval <= 0)
+ if (!force && wal_receiver_status_interval <= 0)
return;
/* Get current timestamp. */
@@ -645,7 +707,8 @@ XLogWalRcvSendReply(void)
* this is only for reporting purposes and only on idle systems, that's
* probably OK.
*/
- if (XLByteEQ(reply_message.write, LogstreamResult.Write)
+ if (!force
+ && XLByteEQ(reply_message.write, LogstreamResult.Write)
&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
wal_receiver_status_interval * 1000))
@@ -656,6 +719,7 @@ XLogWalRcvSendReply(void)
reply_message.flush = LogstreamResult.Flush;
reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now;
+ reply_message.replyRequested = requestReply;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
(uint32) (reply_message.write >> 32), (uint32) reply_message.write,