diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 88 |
1 files changed, 56 insertions, 32 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4a7515fb13e..2af38f1cbea 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -82,7 +82,7 @@ static bool replication_started = false; /* Started streaming yet? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ -int replication_timeout = 60 * 1000; /* maximum time to send one +int wal_sender_timeout = 60 * 1000; /* maximum time to send one * WAL data message */ /* * State for WalSndWakeupRequest @@ -103,15 +103,20 @@ static uint32 sendOff = 0; */ static XLogRecPtr sentPtr = 0; +/* Buffer for processing reply messages. */ +static StringInfoData reply_message; /* - * Buffer for processing reply messages. + * Buffer for constructing outgoing messages. + * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes) */ -static StringInfoData reply_message; +static char *output_message; /* * Timestamp of the last receipt of the reply from the standby. */ static TimestampTz last_reply_timestamp; +/* Have we sent a heartbeat message asking for reply, since last reply? */ +static bool ping_sent = false; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; @@ -126,14 +131,14 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS); static void WalSndLoop(void) __attribute__((noreturn)); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); -static void XLogSend(char *msgbuf, bool *caughtup); +static void XLogSend(bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); -static void WalSndKeepalive(char *msgbuf); +static void WalSndKeepalive(bool requestReply); /* Initialize walsender process before entering the main command loop */ @@ -465,7 +470,10 @@ ProcessRepliesIfAny(void) * Save the last reply timestamp if we've received at least one reply. */ if (received) + { last_reply_timestamp = GetCurrentTimestamp(); + ping_sent = false; + } } /* @@ -527,6 +535,10 @@ ProcessStandbyReplyMessage(void) (uint32) (reply.flush >> 32), (uint32) reply.flush, (uint32) (reply.apply >> 32), (uint32) reply.apply); + /* Send a reply if the standby requested one. */ + if (reply.replyRequested) + WalSndKeepalive(false); + /* * Update shared state for this WalSender process based on reply data from * standby. @@ -620,7 +632,6 @@ ProcessStandbyHSFeedbackMessage(void) static void WalSndLoop(void) { - char *output_message; bool caughtup = false; /* @@ -638,6 +649,7 @@ WalSndLoop(void) /* Initialize the last reply timestamp */ last_reply_timestamp = GetCurrentTimestamp(); + ping_sent = false; /* Loop forever, unless we get an error */ for (;;) @@ -672,7 +684,7 @@ WalSndLoop(void) * caught up. */ if (!pq_is_send_pending()) - XLogSend(output_message, &caughtup); + XLogSend(&caughtup); else caughtup = false; @@ -708,7 +720,7 @@ WalSndLoop(void) if (walsender_ready_to_stop) { /* ... let's just be real sure we're caught up ... */ - XLogSend(output_message, &caughtup); + XLogSend(&caughtup); if (caughtup && !pq_is_send_pending()) { /* Inform the standby that XLOG streaming is done */ @@ -738,23 +750,34 @@ WalSndLoop(void) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - else if (MyWalSnd->sendKeepalive) + else if (wal_sender_timeout > 0 && !ping_sent) { - WalSndKeepalive(output_message); - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - break; + /* + * If half of wal_sender_timeout has lapsed without receiving + * any reply from standby, send a keep-alive message to standby + * requesting an immediate reply. + */ + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + if (GetCurrentTimestamp() >= timeout) + { + WalSndKeepalive(true); + ping_sent = true; + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + break; + } } /* Determine time until replication timeout */ - if (replication_timeout > 0) + if (wal_sender_timeout > 0) { timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, - replication_timeout); - sleeptime = 1 + (replication_timeout / 10); + wal_sender_timeout); + sleeptime = 1 + (wal_sender_timeout / 10); } - /* Sleep until something happens or replication timeout */ + /* Sleep until something happens or we time out */ ImmediateInterruptOK = true; CHECK_FOR_INTERRUPTS(); WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, @@ -766,8 +789,7 @@ WalSndLoop(void) * possibility that the client replied just as we reached the * timeout ... he's supposed to reply *before* that. */ - if (replication_timeout > 0 && - GetCurrentTimestamp() >= timeout) + if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout) { /* * Since typically expiration of replication timeout means @@ -1016,15 +1038,11 @@ retry: * but not yet sent to the client, and buffer it in the libpq output * buffer. * - * msgbuf is a work area in which the output message is constructed. It's - * passed in just so we can avoid re-palloc'ing the buffer on each cycle. - * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. - * * If there is no unsent WAL remaining, *caughtup is set to true, otherwise * *caughtup is set to false. */ static void -XLogSend(char *msgbuf, bool *caughtup) +XLogSend(bool *caughtup) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; @@ -1107,13 +1125,13 @@ XLogSend(char *msgbuf, bool *caughtup) /* * OK to read and send the slice. */ - msgbuf[0] = 'w'; + output_message[0] = 'w'; /* * Read the log directly into the output buffer to avoid extra memcpy * calls. */ - XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); + XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); /* * We fill the message header last so that the send timestamp is taken as @@ -1123,9 +1141,9 @@ XLogSend(char *msgbuf, bool *caughtup) msghdr.walEnd = SendRqstPtr; msghdr.sendTime = GetCurrentTimestamp(); - memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); + memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader)); - pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); + pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes); sentPtr = endptr; @@ -1492,21 +1510,27 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) return (Datum) 0; } +/* + * This function is used to send keepalive message to standby. + * If requestReply is set, sets a flag in the message requesting the standby + * to send a message back to us, for heartbeat purposes. + */ static void -WalSndKeepalive(char *msgbuf) +WalSndKeepalive(bool requestReply) { PrimaryKeepaliveMessage keepalive_message; /* Construct a new message */ keepalive_message.walEnd = sentPtr; keepalive_message.sendTime = GetCurrentTimestamp(); + keepalive_message.replyRequested = requestReply; elog(DEBUG2, "sending replication keepalive"); /* Prepend with the message type and send it. */ - msgbuf[0] = 'k'; - memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); - pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1); + output_message[0] = 'k'; + memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage)); + pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1); } /* |