summaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c88
1 files changed, 56 insertions, 32 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4a7515fb13e..2af38f1cbea 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -82,7 +82,7 @@ static bool replication_started = false; /* Started streaming yet? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
-int replication_timeout = 60 * 1000; /* maximum time to send one
+int wal_sender_timeout = 60 * 1000; /* maximum time to send one
* WAL data message */
/*
* State for WalSndWakeupRequest
@@ -103,15 +103,20 @@ static uint32 sendOff = 0;
*/
static XLogRecPtr sentPtr = 0;
+/* Buffer for processing reply messages. */
+static StringInfoData reply_message;
/*
- * Buffer for processing reply messages.
+ * Buffer for constructing outgoing messages.
+ * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes)
*/
-static StringInfoData reply_message;
+static char *output_message;
/*
* Timestamp of the last receipt of the reply from the standby.
*/
static TimestampTz last_reply_timestamp;
+/* Have we sent a heartbeat message asking for reply, since last reply? */
+static bool ping_sent = false;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
@@ -126,14 +131,14 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS);
static void WalSndLoop(void) __attribute__((noreturn));
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
-static void XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(bool *caughtup);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
-static void WalSndKeepalive(char *msgbuf);
+static void WalSndKeepalive(bool requestReply);
/* Initialize walsender process before entering the main command loop */
@@ -465,7 +470,10 @@ ProcessRepliesIfAny(void)
* Save the last reply timestamp if we've received at least one reply.
*/
if (received)
+ {
last_reply_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+ }
}
/*
@@ -527,6 +535,10 @@ ProcessStandbyReplyMessage(void)
(uint32) (reply.flush >> 32), (uint32) reply.flush,
(uint32) (reply.apply >> 32), (uint32) reply.apply);
+ /* Send a reply if the standby requested one. */
+ if (reply.replyRequested)
+ WalSndKeepalive(false);
+
/*
* Update shared state for this WalSender process based on reply data from
* standby.
@@ -620,7 +632,6 @@ ProcessStandbyHSFeedbackMessage(void)
static void
WalSndLoop(void)
{
- char *output_message;
bool caughtup = false;
/*
@@ -638,6 +649,7 @@ WalSndLoop(void)
/* Initialize the last reply timestamp */
last_reply_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
/* Loop forever, unless we get an error */
for (;;)
@@ -672,7 +684,7 @@ WalSndLoop(void)
* caught up.
*/
if (!pq_is_send_pending())
- XLogSend(output_message, &caughtup);
+ XLogSend(&caughtup);
else
caughtup = false;
@@ -708,7 +720,7 @@ WalSndLoop(void)
if (walsender_ready_to_stop)
{
/* ... let's just be real sure we're caught up ... */
- XLogSend(output_message, &caughtup);
+ XLogSend(&caughtup);
if (caughtup && !pq_is_send_pending())
{
/* Inform the standby that XLOG streaming is done */
@@ -738,23 +750,34 @@ WalSndLoop(void)
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- else if (MyWalSnd->sendKeepalive)
+ else if (wal_sender_timeout > 0 && !ping_sent)
{
- WalSndKeepalive(output_message);
- /* Try to flush pending output to the client */
- if (pq_flush_if_writable() != 0)
- break;
+ /*
+ * If half of wal_sender_timeout has lapsed without receiving
+ * any reply from standby, send a keep-alive message to standby
+ * requesting an immediate reply.
+ */
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2);
+ if (GetCurrentTimestamp() >= timeout)
+ {
+ WalSndKeepalive(true);
+ ping_sent = true;
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ break;
+ }
}
/* Determine time until replication timeout */
- if (replication_timeout > 0)
+ if (wal_sender_timeout > 0)
{
timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
- replication_timeout);
- sleeptime = 1 + (replication_timeout / 10);
+ wal_sender_timeout);
+ sleeptime = 1 + (wal_sender_timeout / 10);
}
- /* Sleep until something happens or replication timeout */
+ /* Sleep until something happens or we time out */
ImmediateInterruptOK = true;
CHECK_FOR_INTERRUPTS();
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
@@ -766,8 +789,7 @@ WalSndLoop(void)
* possibility that the client replied just as we reached the
* timeout ... he's supposed to reply *before* that.
*/
- if (replication_timeout > 0 &&
- GetCurrentTimestamp() >= timeout)
+ if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
@@ -1016,15 +1038,11 @@ retry:
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
- * msgbuf is a work area in which the output message is constructed. It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*/
static void
-XLogSend(char *msgbuf, bool *caughtup)
+XLogSend(bool *caughtup)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
@@ -1107,13 +1125,13 @@ XLogSend(char *msgbuf, bool *caughtup)
/*
* OK to read and send the slice.
*/
- msgbuf[0] = 'w';
+ output_message[0] = 'w';
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
- XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+ XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
@@ -1123,9 +1141,9 @@ XLogSend(char *msgbuf, bool *caughtup)
msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp();
- memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+ memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader));
- pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+ pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes);
sentPtr = endptr;
@@ -1492,21 +1510,27 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
return (Datum) 0;
}
+/*
+ * This function is used to send keepalive message to standby.
+ * If requestReply is set, sets a flag in the message requesting the standby
+ * to send a message back to us, for heartbeat purposes.
+ */
static void
-WalSndKeepalive(char *msgbuf)
+WalSndKeepalive(bool requestReply)
{
PrimaryKeepaliveMessage keepalive_message;
/* Construct a new message */
keepalive_message.walEnd = sentPtr;
keepalive_message.sendTime = GetCurrentTimestamp();
+ keepalive_message.replyRequested = requestReply;
elog(DEBUG2, "sending replication keepalive");
/* Prepend with the message type and send it. */
- msgbuf[0] = 'k';
- memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
- pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+ output_message[0] = 'k';
+ memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1);
}
/*