diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 94 |
1 files changed, 79 insertions, 15 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 869457003aa..b613df4c6a0 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -55,6 +55,7 @@ /* GUC variables */ int wal_receiver_status_interval; +int wal_receiver_timeout; bool hot_standby_feedback; /* libpqreceiver hooks to these when loaded */ @@ -121,7 +122,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); -static void XLogWalRcvSendReply(void); +static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(void); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -170,9 +171,10 @@ WalReceiverMain(void) { char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; - /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + TimestampTz last_recv_timestamp; + bool ping_sent; /* * WalRcv should be set up already (if we are a backend, we inherit this @@ -282,6 +284,10 @@ WalReceiverMain(void) MemSet(&reply_message, 0, sizeof(reply_message)); MemSet(&feedback_message, 0, sizeof(feedback_message)); + /* Initialize the last recv timestamp */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + /* Loop until end-of-streaming or error */ for (;;) { @@ -316,15 +322,23 @@ WalReceiverMain(void) /* Wait a while for data to arrive */ if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { + /* Something was received from master, so reset timeout */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + /* Accept the received data, and process it */ XLogWalRcvProcessMsg(type, buf, len); /* Receive any more data we can without sleeping */ while (walrcv_receive(0, &type, &buf, &len)) + { + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; XLogWalRcvProcessMsg(type, buf, len); + } /* Let the master know that we received some data. */ - XLogWalRcvSendReply(); + XLogWalRcvSendReply(false, false); /* * If we've written some records, flush them to disk and let the @@ -335,10 +349,48 @@ WalReceiverMain(void) else { /* - * We didn't receive anything new, but send a status update to the - * master anyway, to report any progress in applying WAL. + * We didn't receive anything new. If we haven't heard anything + * from the server for more than wal_receiver_timeout / 2, + * ping the server. Also, if it's been longer than + * wal_receiver_status_interval since the last update we sent, + * send a status update to the master anyway, to report any + * progress in applying WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from standby has reached the + * configured limit. */ - XLogWalRcvSendReply(); + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errmsg("terminating walreceiver due to timeout"))); + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout/2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + XLogWalRcvSendReply(requestReply, requestReply); XLogWalRcvSendHSFeedback(); } } @@ -460,6 +512,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage)); ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime); + + /* If the primary requested a reply, send one immediately */ + if (keepalive.replyRequested) + XLogWalRcvSendReply(true, false); break; } default: @@ -609,19 +665,25 @@ XLogWalRcvFlush(bool dying) /* Also let the master know that we made some progress */ if (!dying) - { - XLogWalRcvSendReply(); - XLogWalRcvSendHSFeedback(); - } + XLogWalRcvSendReply(false, false); } } /* - * Send reply message to primary, indicating our current XLOG positions and - * the current time. + * Send reply message to primary, indicating our current XLOG positions, oldest + * xmin and the current time. + * + * If 'force' is not set, the message is only sent if enough time has + * passed since last status update to reach wal_receiver_status_internal. + * If wal_receiver_status_interval is disabled altogether and 'force' is + * false, this is a no-op. + * + * If 'requestReply' is true, requests the server to reply immediately upon + * receiving this message. This is used for heartbearts, when approaching + * wal_receiver_timeout. */ static void -XLogWalRcvSendReply(void) +XLogWalRcvSendReply(bool force, bool requestReply) { char buf[sizeof(StandbyReplyMessage) + 1]; TimestampTz now; @@ -630,7 +692,7 @@ XLogWalRcvSendReply(void) * If the user doesn't want status to be reported to the master, be sure * to exit before doing anything at all. */ - if (wal_receiver_status_interval <= 0) + if (!force && wal_receiver_status_interval <= 0) return; /* Get current timestamp. */ @@ -645,7 +707,8 @@ XLogWalRcvSendReply(void) * this is only for reporting purposes and only on idle systems, that's * probably OK. */ - if (XLByteEQ(reply_message.write, LogstreamResult.Write) + if (!force + && XLByteEQ(reply_message.write, LogstreamResult.Write) && XLByteEQ(reply_message.flush, LogstreamResult.Flush) && !TimestampDifferenceExceeds(reply_message.sendTime, now, wal_receiver_status_interval * 1000)) @@ -656,6 +719,7 @@ XLogWalRcvSendReply(void) reply_message.flush = LogstreamResult.Flush; reply_message.apply = GetXLogReplayRecPtr(NULL); reply_message.sendTime = now; + reply_message.replyRequested = requestReply; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", (uint32) (reply_message.write >> 32), (uint32) reply_message.write, |