summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walreceiver.c17
-rw-r--r--src/backend/replication/walsender.c188
-rw-r--r--src/include/replication/walprotocol.h53
-rw-r--r--src/include/replication/walreceiver.h3
4 files changed, 169 insertions, 92 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ecb2c3a6d39..b31cfb4147d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.10 2010/04/20 22:55:03 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.11 2010/06/03 22:17:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -41,6 +41,7 @@
#include "access/xlog_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -393,18 +394,18 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
case 'w': /* WAL records */
{
- XLogRecPtr recptr;
+ WalDataMessageHeader msghdr;
- if (len < sizeof(XLogRecPtr))
+ if (len < sizeof(WalDataMessageHeader))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
+ /* memcpy is required here for alignment reasons */
+ memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+ buf += sizeof(WalDataMessageHeader);
+ len -= sizeof(WalDataMessageHeader);
- memcpy(&recptr, buf, sizeof(XLogRecPtr));
- buf += sizeof(XLogRecPtr);
- len -= sizeof(XLogRecPtr);
-
- XLogWalRcvWrite(buf, len, recptr);
+ XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
default:
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d2e37fd0086..e337e7e5a6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -5,11 +5,10 @@
* The WAL sender process (walsender) is new as of Postgres 9.0. It takes
* charge of XLOG streaming sender in the primary server. At first, it is
* started by the postmaster when the walreceiver in the standby server
- * connects to the primary server and requests XLOG streaming replication,
- * i.e., unlike any auxiliary process, it is not an always-running process.
+ * connects to the primary server and requests XLOG streaming replication.
* It attempts to keep reading XLOG records from the disk and sending them
* to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is an one to one relationship between a connection
+ * any backend, there is a one-to-one relationship between a connection
* and a walsender process).
*
* Normal termination is by SIGTERM, which instructs the walsender to
@@ -30,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.24 2010/06/03 21:02:12 petere Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.25 2010/06/03 22:17:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -44,6 +43,7 @@
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "replication/walprotocol.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -80,7 +80,7 @@ static uint32 sendOff = 0;
/*
* How far have we sent WAL already? This is also advertised in
- * MyWalSnd->sentPtr.
+ * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = {0, 0};
@@ -100,19 +100,9 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg, bool *caughtup);
+static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
-/*
- * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
- *
- * We don't have a good idea of what a good value would be; there's some
- * overhead per message in both walsender and walreceiver, but on the other
- * hand sending large batches makes walsender less responsive to signals
- * because signals are checked only between messages. 128kB (with
- * default 8k blocks) seems like a reasonable guess for now.
- */
-#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
/* Main entry point for walsender process */
int
@@ -157,6 +147,9 @@ WalSenderMain(void)
return WalSndLoop();
}
+/*
+ * Execute commands from walreceiver, until we enter streaming mode.
+ */
static void
WalSndHandshake(void)
{
@@ -173,6 +166,13 @@ WalSndHandshake(void)
firstchar = pq_getbyte();
/*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+
+ /*
* Check for any other interesting events that happened while we
* slept.
*/
@@ -211,7 +211,7 @@ WalSndHandshake(void)
/*
* Reply with a result set with one row, two columns.
- * First col is system ID, and second if timeline ID
+ * First col is system ID, and second is timeline ID
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -253,6 +253,7 @@ WalSndHandshake(void)
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
+ /* ReadyForQuery did pq_flush for us */
}
else if (sscanf(query_string, "START_REPLICATION %X/%X",
&recptr.xlogid, &recptr.xrecoff) == 2)
@@ -365,12 +366,17 @@ CheckClosedConnection(void)
static int
WalSndLoop(void)
{
- StringInfoData output_message;
+ char *output_message;
bool caughtup = false;
- initStringInfo(&output_message);
+ /*
+ * Allocate buffer that will be used for each output message. We do this
+ * just once to reduce palloc overhead. The buffer must be made large
+ * enough for maximum-sized messages.
+ */
+ output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
- /* Loop forever */
+ /* Loop forever, unless we get an error */
for (;;)
{
long remain; /* remaining time (us) */
@@ -381,6 +387,7 @@ WalSndLoop(void)
*/
if (!PostmasterIsAlive(true))
exit(1);
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
@@ -394,8 +401,8 @@ WalSndLoop(void)
*/
if (ready_to_stop)
{
- if (!XLogSend(&output_message, &caughtup))
- goto eof;
+ if (!XLogSend(output_message, &caughtup))
+ break;
if (caughtup)
shutdown_requested = true;
}
@@ -435,17 +442,15 @@ WalSndLoop(void)
remain -= NAPTIME_PER_CYCLE;
}
}
+
/* Attempt to send the log once every loop */
- if (!XLogSend(&output_message, &caughtup))
- goto eof;
+ if (!XLogSend(output_message, &caughtup))
+ break;
}
- /* can't get here because the above loop never exits */
- return 1;
-
-eof:
-
/*
+ * Get here on send failure. Clean up and exit.
+ *
* Reset whereToSendOutput to prevent ereport from attempting to send any
* more messages to the standby.
*/
@@ -524,6 +529,9 @@ WalSndKill(int code, Datum arg)
/*
* Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
*/
static void
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
@@ -634,51 +642,46 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
- * but not yet sent to the client, and send it. If there is no unsent WAL,
- * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
- * to false.
+ * but not yet sent to the client, and send it.
+ *
+ * msgbuf is a work area in which the output message is constructed. It's
+ * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ *
+ * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
+ * *caughtup is set to false.
*
* Returns true if OK, false if trouble.
*/
static bool
-XLogSend(StringInfo outMsg, bool *caughtup)
+XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
- char activitymsg[50];
-
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
+ WalDataMessageHeader msghdr;
/* Attempt to send all records flushed to the disk already */
SendRqstPtr = GetWriteRecPtr();
/* Quick exit if nothing to do */
- if (!XLByteLT(sentPtr, SendRqstPtr))
+ if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
return true;
}
- /*
- * Otherwise let the caller know that we're not fully caught up. Unless
- * there's a huge backlog, we'll be caught up to the current WriteRecPtr
- * after we've sent everything below, but more WAL could accumulate while
- * we're busy sending.
- */
- *caughtup = false;
/*
- * Figure out how much to send in one message. If there's less than
+ * Figure out how much to send in one message. If there's no more than
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
- * MAX_SEND_SIZE bytes, but round to page boundary.
+ * MAX_SEND_SIZE bytes, but round to logfile or page boundary.
*
* The rounding is not only for performance reasons. Walreceiver
* relies on the fact that we never split a WAL record across two
* messages. Since a long WAL record is split at page boundary into
* continuation records, page boundary is always a safe cut-off point.
- * We also assume that SendRqstPtr never points in the middle of a WAL
+ * We also assume that SendRqstPtr never points to the middle of a WAL
* record.
*/
startptr = sentPtr;
@@ -694,59 +697,78 @@ XLogSend(StringInfo outMsg, bool *caughtup)
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
- /* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
- /* if we went beyond SendRqstPtr, back off */
- if (XLByteLT(SendRqstPtr, endptr))
- endptr = SendRqstPtr;
-
- /*
- * OK to read and send the slice.
- *
- * We don't need to convert the xlogid/xrecoff from host byte order to
- * network byte order because the both server can be expected to have
- * the same byte order. If they have different byte order, we don't
- * reach here.
- */
- pq_sendbyte(outMsg, 'w');
- pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
-
if (endptr.xlogid != startptr.xlogid)
{
+ /* Don't cross a logfile boundary within one message */
Assert(endptr.xlogid == startptr.xlogid + 1);
- nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
+ endptr.xlogid = startptr.xlogid;
+ endptr.xrecoff = XLogFileSize;
+ }
+
+ /* if we went beyond SendRqstPtr, back off */
+ if (XLByteLE(SendRqstPtr, endptr))
+ {
+ endptr = SendRqstPtr;
+ *caughtup = true;
}
else
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ {
+ /* round down to page boundary. */
+ endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ *caughtup = false;
+ }
- sentPtr = endptr;
+ nbytes = endptr.xrecoff - startptr.xrecoff;
+ Assert(nbytes <= MAX_SEND_SIZE);
/*
- * Read the log directly into the output buffer to prevent extra
- * memcpy calls.
+ * OK to read and send the slice.
*/
- enlargeStringInfo(outMsg, nbytes);
+ msgbuf[0] = 'w';
- XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
- outMsg->len += nbytes;
- outMsg->data[outMsg->len] = '\0';
+ /*
+ * Read the log directly into the output buffer to avoid extra memcpy
+ * calls.
+ */
+ XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
- pq_putmessage('d', outMsg->data, outMsg->len);
- resetStringInfo(outMsg);
+ /*
+ * We fill the message header last so that the send timestamp is taken
+ * as late as possible.
+ */
+ msghdr.dataStart = startptr;
+ msghdr.walEnd = SendRqstPtr;
+ msghdr.sendTime = GetCurrentTimestamp();
- /* Update shared memory status */
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
+ memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+
+ pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
/* Flush pending output */
if (pq_flush())
return false;
+ sentPtr = endptr;
+
+ /* Update shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
/* Report progress of XLOG streaming in PS display */
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
- set_ps_display(activitymsg, false);
+ if (update_process_title)
+ {
+ char activitymsg[50];
+
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+ set_ps_display(activitymsg, false);
+ }
return true;
}
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
new file mode 100644
index 00000000000..15025a277c0
--- /dev/null
+++ b/src/include/replication/walprotocol.h
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * walprotocol.h
+ * Definitions relevant to the streaming WAL transmission protocol.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL: pgsql/src/include/replication/walprotocol.h,v 1.1 2010/06/03 22:17:32 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _WALPROTOCOL_H
+#define _WALPROTOCOL_H
+
+#include "access/xlogdefs.h"
+#include "utils/timestamp.h"
+
+
+/*
+ * Header for a WAL data message (message type 'w'). This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * The header is followed by actual WAL data. Note that the data length is
+ * not specified in the header --- it's just whatever remains in the message.
+ *
+ * walEnd and sendTime are not essential data, but are provided in case
+ * the receiver wants to adjust its behavior depending on how far behind
+ * it is.
+ */
+typedef struct
+{
+ /* WAL start location of the data included in this message */
+ XLogRecPtr dataStart;
+
+ /* Current end of WAL on the sender */
+ XLogRecPtr walEnd;
+
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} WalDataMessageHeader;
+
+/*
+ * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
+ *
+ * We don't have a good idea of what a good value would be; there's some
+ * overhead per message in both walsender and walreceiver, but on the other
+ * hand sending large batches makes walsender less responsive to signals
+ * because signals are checked only between messages. 128kB (with
+ * default 8k blocks) seems like a reasonable guess for now.
+ */
+#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
+
+#endif /* _WALPROTOCOL_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4300b80b278..5dcaeba3f33 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -5,7 +5,7 @@
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.8 2010/02/26 02:01:27 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.9 2010/06/03 22:17:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -14,6 +14,7 @@
#include "access/xlogdefs.h"
#include "storage/spin.h"
+#include "pgtime.h"
extern bool am_walreceiver;