summaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2011-07-19 03:40:03 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2011-07-19 03:40:03 +0100
commit5286105800c7d5902f98f32e11b209c471c0c69c (patch)
tree59a5793296a3af901f864a16748e944edbd900ac /src/backend/replication/walsender.c
parent3d4890c0c5d27dfdf7d1a8816d7bdcdba3c39d21 (diff)
Cascading replication feature for streaming log-based replication.
Standby servers can now have WALSender processes, which can work with either WALReceiver or archive_commands to pass data. Fully updated docs, including new conceptual terms of sending server, upstream and downstream servers. WALSenders terminated when promote to master. Fujii Masao, review, rework and doc rewrite by Simon Riggs
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c108
1 files changed, 83 insertions, 25 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5b3300d23..63a63048dbb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -48,6 +48,7 @@
#include "replication/basebackup.h"
#include "replication/replnodes.h"
#include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
@@ -135,10 +137,7 @@ WalSenderMain(void)
{
MemoryContext walsnd_context;
- if (RecoveryInProgress())
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+ am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
@@ -165,6 +164,12 @@ WalSenderMain(void)
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /*
+ * Use the recovery target timeline ID during recovery
+ */
+ if (am_cascading_walsender)
+ ThisTimeLineID = GetRecoveryTargetTLI();
+
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
@@ -290,7 +295,7 @@ IdentifySystem(void)
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
@@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd)
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
- * Check that we're logging enough information in the WAL for
- * log-shipping.
+ * We assume here that we're logging enough information in the WAL for
+ * log-shipping, since this is checked in PostmasterMain().
*
- * NOTE: This only checks the current value of wal_level. Even if the
- * current setting is not 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'. So this is not bulletproof,
- * the purpose is just to give a user-friendly error message that hints
- * how to configure the system correctly.
+ * NOTE: wal_level can only change at shutdown, so in most cases it is
+ * difficult for there to be WAL data that we can still see that was written
+ * at wal_level='minimal'.
*/
- if (wal_level == WAL_LEVEL_MINIMAL)
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=minimal")));
/*
* When we first start replication the standby will be behind the primary.
@@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void)
SpinLockRelease(&walsnd->mutex);
}
- SyncRepReleaseWaiters();
+ if (!am_cascading_walsender)
+ SyncRepReleaseWaiters();
}
/*
@@ -764,6 +764,8 @@ WalSndLoop(void)
/*
* When SIGUSR2 arrives, we send any outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
+ * This may be a normal termination at shutdown, or a promotion,
+ * the walsender is not sure which.
*/
if (walsender_ready_to_stop && !pq_is_send_pending())
{
@@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg)
}
/*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
@@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
{
- XLogRecPtr startRecPtr = recptr;
- char path[MAXPGPATH];
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
uint32 lastRemovedLog;
uint32 lastRemovedSeg;
uint32 log;
uint32 seg;
+retry:
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
+
while (nbytes > 0)
{
uint32 startoff;
@@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
{
+ char path[MAXPGPATH];
+
/* Switch to another logfile segment */
if (sendFile >= 0)
close(sendFile);
@@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
else
segbytes = nbytes;
- readbytes = read(sendFile, buf, segbytes);
+ readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
sendOff += readbytes;
nbytes -= readbytes;
- buf += readbytes;
+ p += readbytes;
}
/*
@@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* already have been overwritten with new WAL records.
*/
XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startRecPtr, log, seg);
+ XLByteToSeg(startptr, log, seg);
if (log < lastRemovedLog ||
(log == lastRemovedLog && seg <= lastRemovedSeg))
{
@@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
errmsg("requested WAL segment %s has already been removed",
filename)));
}
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with
+ * the file of the same name retrieved from archive. So we always need
+ * to check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
+ }
}
/*
@@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
@@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup)
return;
}
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->needreload = true;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)