diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7005307dc25..30e35dbd28a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -54,6 +54,9 @@ /* Global variable to indicate if this process is a walreceiver process */ bool am_walreceiver; +/* GUC variable */ +int wal_receiver_status_interval; + /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; @@ -88,6 +91,8 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +static StandbyReplyMessage reply_message; + /* * About SIGTERM handling: * @@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); +static void XLogWalRcvSendReply(void); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -306,12 +312,23 @@ WalReceiverMain(void) while (walrcv_receive(0, &type, &buf, &len)) XLogWalRcvProcessMsg(type, buf, len); + /* Let the master know that we received some data. */ + XLogWalRcvSendReply(); + /* * If we've written some records, flush them to disk and let the * startup process know about them. */ XLogWalRcvFlush(); } + else + { + /* + * We didn't receive anything new, but send a status update to + * the master anyway, to report any progress in applying WAL. + */ + XLogWalRcvSendReply(); + } } } @@ -546,5 +563,60 @@ XLogWalRcvFlush(void) LogstreamResult.Write.xrecoff); set_ps_display(activitymsg, false); } + + /* Also let the master know that we made some progress */ + XLogWalRcvSendReply(); } } + +/* + * Send reply message to primary, indicating our current XLOG positions and + * the current time. + */ +static void +XLogWalRcvSendReply(void) +{ + char buf[sizeof(StandbyReplyMessage) + 1]; + TimestampTz now; + + /* + * If the user doesn't want status to be reported to the master, be sure + * to exit before doing anything at all. + */ + if (wal_receiver_status_interval <= 0) + return; + + /* Get current timestamp. */ + now = GetCurrentTimestamp(); + + /* + * We can compare the write and flush positions to the last message we + * sent without taking any lock, but the apply position requires a spin + * lock, so we don't check that unless something else has changed or 10 + * seconds have passed. This means that the apply log position will + * appear, from the master's point of view, to lag slightly, but since + * this is only for reporting purposes and only on idle systems, that's + * probably OK. + */ + if (XLByteEQ(reply_message.write, LogstreamResult.Write) + && XLByteEQ(reply_message.flush, LogstreamResult.Flush) + && !TimestampDifferenceExceeds(reply_message.sendTime, now, + wal_receiver_status_interval * 1000)) + return; + + /* Construct a new message. */ + reply_message.write = LogstreamResult.Write; + reply_message.flush = LogstreamResult.Flush; + reply_message.apply = GetXLogReplayRecPtr(); + reply_message.sendTime = now; + + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + reply_message.write.xlogid, reply_message.write.xrecoff, + reply_message.flush.xlogid, reply_message.flush.xrecoff, + reply_message.apply.xlogid, reply_message.apply.xrecoff); + + /* Prepend with the message type and send it. */ + buf[0] = 'r'; + memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); + walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); +} |