diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 188 |
1 files changed, 105 insertions, 83 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d2e37fd0086..e337e7e5a6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -5,11 +5,10 @@ * The WAL sender process (walsender) is new as of Postgres 9.0. It takes * charge of XLOG streaming sender in the primary server. At first, it is * started by the postmaster when the walreceiver in the standby server - * connects to the primary server and requests XLOG streaming replication, - * i.e., unlike any auxiliary process, it is not an always-running process. + * connects to the primary server and requests XLOG streaming replication. * It attempts to keep reading XLOG records from the disk and sending them * to the standby server, as long as the connection is alive (i.e., like - * any backend, there is an one to one relationship between a connection + * any backend, there is a one-to-one relationship between a connection * and a walsender process). * * Normal termination is by SIGTERM, which instructs the walsender to @@ -30,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.24 2010/06/03 21:02:12 petere Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.25 2010/06/03 22:17:32 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -44,6 +43,7 @@ #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "replication/walprotocol.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -80,7 +80,7 @@ static uint32 sendOff = 0; /* * How far have we sent WAL already? This is also advertised in - * MyWalSnd->sentPtr. + * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ static XLogRecPtr sentPtr = {0, 0}; @@ -100,19 +100,9 @@ static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); -static bool XLogSend(StringInfo outMsg, bool *caughtup); +static bool XLogSend(char *msgbuf, bool *caughtup); static void CheckClosedConnection(void); -/* - * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. - * - * We don't have a good idea of what a good value would be; there's some - * overhead per message in both walsender and walreceiver, but on the other - * hand sending large batches makes walsender less responsive to signals - * because signals are checked only between messages. 128kB (with - * default 8k blocks) seems like a reasonable guess for now. - */ -#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* Main entry point for walsender process */ int @@ -157,6 +147,9 @@ WalSenderMain(void) return WalSndLoop(); } +/* + * Execute commands from walreceiver, until we enter streaming mode. + */ static void WalSndHandshake(void) { @@ -173,6 +166,13 @@ WalSndHandshake(void) firstchar = pq_getbyte(); /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + + /* * Check for any other interesting events that happened while we * slept. */ @@ -211,7 +211,7 @@ WalSndHandshake(void) /* * Reply with a result set with one row, two columns. - * First col is system ID, and second if timeline ID + * First col is system ID, and second is timeline ID */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -253,6 +253,7 @@ WalSndHandshake(void) /* Send CommandComplete and ReadyForQuery messages */ EndCommand("SELECT", DestRemote); ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ } else if (sscanf(query_string, "START_REPLICATION %X/%X", &recptr.xlogid, &recptr.xrecoff) == 2) @@ -365,12 +366,17 @@ CheckClosedConnection(void) static int WalSndLoop(void) { - StringInfoData output_message; + char *output_message; bool caughtup = false; - initStringInfo(&output_message); + /* + * Allocate buffer that will be used for each output message. We do this + * just once to reduce palloc overhead. The buffer must be made large + * enough for maximum-sized messages. + */ + output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE); - /* Loop forever */ + /* Loop forever, unless we get an error */ for (;;) { long remain; /* remaining time (us) */ @@ -381,6 +387,7 @@ WalSndLoop(void) */ if (!PostmasterIsAlive(true)) exit(1); + /* Process any requests or signals received recently */ if (got_SIGHUP) { @@ -394,8 +401,8 @@ WalSndLoop(void) */ if (ready_to_stop) { - if (!XLogSend(&output_message, &caughtup)) - goto eof; + if (!XLogSend(output_message, &caughtup)) + break; if (caughtup) shutdown_requested = true; } @@ -435,17 +442,15 @@ WalSndLoop(void) remain -= NAPTIME_PER_CYCLE; } } + /* Attempt to send the log once every loop */ - if (!XLogSend(&output_message, &caughtup)) - goto eof; + if (!XLogSend(output_message, &caughtup)) + break; } - /* can't get here because the above loop never exits */ - return 1; - -eof: - /* + * Get here on send failure. Clean up and exit. + * * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. */ @@ -524,6 +529,9 @@ WalSndKill(int code, Datum arg) /* * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. */ static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) @@ -634,51 +642,46 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) /* * Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed), - * but not yet sent to the client, and send it. If there is no unsent WAL, - * *caughtup is set to true and nothing is sent, otherwise *caughtup is set - * to false. + * but not yet sent to the client, and send it. + * + * msgbuf is a work area in which the output message is constructed. It's + * passed in just so we can avoid re-palloc'ing the buffer on each cycle. + * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE. + * + * If there is no unsent WAL remaining, *caughtup is set to true, otherwise + * *caughtup is set to false. * * Returns true if OK, false if trouble. */ static bool -XLogSend(StringInfo outMsg, bool *caughtup) +XLogSend(char *msgbuf, bool *caughtup) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; - char activitymsg[50]; - - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; + WalDataMessageHeader msghdr; /* Attempt to send all records flushed to the disk already */ SendRqstPtr = GetWriteRecPtr(); /* Quick exit if nothing to do */ - if (!XLByteLT(sentPtr, SendRqstPtr)) + if (XLByteLE(SendRqstPtr, sentPtr)) { *caughtup = true; return true; } - /* - * Otherwise let the caller know that we're not fully caught up. Unless - * there's a huge backlog, we'll be caught up to the current WriteRecPtr - * after we've sent everything below, but more WAL could accumulate while - * we're busy sending. - */ - *caughtup = false; /* - * Figure out how much to send in one message. If there's less than + * Figure out how much to send in one message. If there's no more than * MAX_SEND_SIZE bytes to send, send everything. Otherwise send - * MAX_SEND_SIZE bytes, but round to page boundary. + * MAX_SEND_SIZE bytes, but round to logfile or page boundary. * * The rounding is not only for performance reasons. Walreceiver * relies on the fact that we never split a WAL record across two * messages. Since a long WAL record is split at page boundary into * continuation records, page boundary is always a safe cut-off point. - * We also assume that SendRqstPtr never points in the middle of a WAL + * We also assume that SendRqstPtr never points to the middle of a WAL * record. */ startptr = sentPtr; @@ -694,59 +697,78 @@ XLogSend(StringInfo outMsg, bool *caughtup) endptr = startptr; XLByteAdvance(endptr, MAX_SEND_SIZE); - /* round down to page boundary. */ - endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); - /* if we went beyond SendRqstPtr, back off */ - if (XLByteLT(SendRqstPtr, endptr)) - endptr = SendRqstPtr; - - /* - * OK to read and send the slice. - * - * We don't need to convert the xlogid/xrecoff from host byte order to - * network byte order because the both server can be expected to have - * the same byte order. If they have different byte order, we don't - * reach here. - */ - pq_sendbyte(outMsg, 'w'); - pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); - if (endptr.xlogid != startptr.xlogid) { + /* Don't cross a logfile boundary within one message */ Assert(endptr.xlogid == startptr.xlogid + 1); - nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; + endptr.xlogid = startptr.xlogid; + endptr.xrecoff = XLogFileSize; + } + + /* if we went beyond SendRqstPtr, back off */ + if (XLByteLE(SendRqstPtr, endptr)) + { + endptr = SendRqstPtr; + *caughtup = true; } else - nbytes = endptr.xrecoff - startptr.xrecoff; + { + /* round down to page boundary. */ + endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + *caughtup = false; + } - sentPtr = endptr; + nbytes = endptr.xrecoff - startptr.xrecoff; + Assert(nbytes <= MAX_SEND_SIZE); /* - * Read the log directly into the output buffer to prevent extra - * memcpy calls. + * OK to read and send the slice. */ - enlargeStringInfo(outMsg, nbytes); + msgbuf[0] = 'w'; - XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); - outMsg->len += nbytes; - outMsg->data[outMsg->len] = '\0'; + /* + * Read the log directly into the output buffer to avoid extra memcpy + * calls. + */ + XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes); - pq_putmessage('d', outMsg->data, outMsg->len); - resetStringInfo(outMsg); + /* + * We fill the message header last so that the send timestamp is taken + * as late as possible. + */ + msghdr.dataStart = startptr; + msghdr.walEnd = SendRqstPtr; + msghdr.sendTime = GetCurrentTimestamp(); - /* Update shared memory status */ - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); + memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); + + pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); /* Flush pending output */ if (pq_flush()) return false; + sentPtr = endptr; + + /* Update shared memory status */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + /* Report progress of XLOG streaming in PS display */ - snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", - sentPtr.xlogid, sentPtr.xrecoff); - set_ps_display(activitymsg, false); + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + set_ps_display(activitymsg, false); + } return true; } |