diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2010-01-27 15:27:51 +0000 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2010-01-27 15:27:51 +0000 |
commit | 1bb2558046cc8b8cb0c8f5563e8d32b3b120c9ec (patch) | |
tree | 61f6838c88e2a60a3f133d10c2e6352129aaf321 /src/backend/replication/walreceiverfuncs.c | |
parent | ab13d1e925691c37bb6fa13cb748867c5ae2a91c (diff) |
Make standby server continuously retry restoring the next WAL segment with
restore_command, if the connection to the primary server is lost. This
ensures that the standby can recover automatically, if the connection is
lost for a long time and standby falls behind so much that the required
WAL segments have been archived and deleted in the master.
This also makes standby_mode useful without streaming replication; the
server will keep retrying restore_command every few seconds until the
trigger file is found. That's the same basic functionality pg_standby
offers, but without the bells and whistles.
To implement that, refactor the ReadRecord/FetchRecord functions. The
FetchRecord() function introduced in the original streaming replication
patch is removed, and all the retry logic is now in a new function called
XLogReadPage(). XLogReadPage() is now responsible for executing
restore_command, launching walreceiver, and waiting for new WAL to arrive
from primary, as required.
This also changes the life cycle of walreceiver. When launched, it now only
tries to connect to the master once, and exits if the connection fails, or
is lost during streaming for any reason. The startup process detects the
death, and re-launches walreceiver if necessary.
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 172 |
1 files changed, 69 insertions, 103 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index c1d7b558874..4fb132dcd4e 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -18,6 +18,8 @@ #include <sys/types.h> #include <sys/stat.h> +#include <sys/time.h> +#include <time.h> #include <unistd.h> #include <signal.h> @@ -30,8 +32,11 @@ WalRcvData *WalRcv = NULL; -static bool CheckForStandbyTrigger(void); -static void ShutdownWalRcv(void); +/* + * How long to wait for walreceiver to start up after requesting + * postmaster to launch it. In seconds. + */ +#define WALRCV_STARTUP_TIMEOUT 10 /* Report shared memory space needed by WalRcvShmemInit */ Size @@ -62,7 +67,7 @@ WalRcvShmemInit(void) /* Initialize the data structures */ MemSet(WalRcv, 0, WalRcvShmemSize()); - WalRcv->walRcvState = WALRCV_NOT_STARTED; + WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); } @@ -73,90 +78,51 @@ WalRcvInProgress(void) /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; + pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); - state = walrcv->walRcvState; - SpinLockRelease(&walrcv->mutex); - if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) - return true; - else - return false; -} - -/* - * Wait for the XLOG record at given position to become available. - * - * 'recptr' indicates the byte position which caller wants to read the - * XLOG record up to. The byte position actually written and flushed - * by walreceiver is returned. It can be higher than the requested - * location, and the caller can safely read up to that point without - * calling WaitNextXLogAvailable() again. - * - * If WAL streaming is ended (because a trigger file is found), *finished - * is set to true and function returns immediately. The returned position - * can be lower than requested in that case. - * - * Called by the startup process during streaming recovery. - */ -XLogRecPtr -WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished) -{ - static XLogRecPtr receivedUpto = {0, 0}; - - *finished = false; + state = walrcv->walRcvState; + startTime = walrcv->startTime; - /* Quick exit if already known available */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; + SpinLockRelease(&walrcv->mutex); - for (;;) + /* + * If it has taken too long for walreceiver to start up, give up. + * Setting the state to STOPPED ensures that if walreceiver later + * does start up after all, it will see that it's not supposed to be + * running and die without doing anything. + */ + if (state == WALRCV_STARTING) { - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* Update local status */ - SpinLockAcquire(&walrcv->mutex); - receivedUpto = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); + pg_time_t now = (pg_time_t) time(NULL); - /* If available already, leave here */ - if (XLByteLT(recptr, receivedUpto)) - return receivedUpto; - - /* Check to see if the trigger file exists */ - if (CheckForStandbyTrigger()) + if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - *finished = true; - return receivedUpto; - } + SpinLockAcquire(&walrcv->mutex); - pg_usleep(100000L); /* 100ms */ - - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); + if (walrcv->walRcvState == WALRCV_STARTING) + state = walrcv->walRcvState = WALRCV_STOPPED; - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive(true)) - exit(1); + SpinLockRelease(&walrcv->mutex); + } } + + if (state != WALRCV_STOPPED) + return true; + else + return false; } /* - * Stop walreceiver and wait for it to die. + * Stop walreceiver (if running) and wait for it to die. */ -static void +void ShutdownWalRcv(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - pid_t walrcvpid; + pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -164,15 +130,25 @@ ShutdownWalRcv(void) * restart itself. */ SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_RUNNING); - walrcv->walRcvState = WALRCV_STOPPING; - walrcvpid = walrcv->pid; + switch(walrcv->walRcvState) + { + case WALRCV_STOPPED: + break; + case WALRCV_STARTING: + walrcv->walRcvState = WALRCV_STOPPED; + break; + + case WALRCV_RUNNING: + walrcv->walRcvState = WALRCV_STOPPING; + /* fall through */ + case WALRCV_STOPPING: + walrcvpid = walrcv->pid; + break; + } SpinLockRelease(&walrcv->mutex); /* - * Pid can be 0, if no walreceiver process is active right now. - * Postmaster should restart it, and when it does, it will see the - * STOPPING state. + * Signal walreceiver process if it was still running. */ if (walrcvpid != 0) kill(walrcvpid, SIGTERM); @@ -194,30 +170,6 @@ ShutdownWalRcv(void) } /* - * Check to see if the trigger file exists. If it does, request postmaster - * to shut down walreceiver and wait for it to exit, and remove the trigger - * file. - */ -static bool -CheckForStandbyTrigger(void) -{ - struct stat stat_buf; - - if (TriggerFile == NULL) - return false; - - if (stat(TriggerFile, &stat_buf) == 0) - { - ereport(LOG, - (errmsg("trigger file found: %s", TriggerFile))); - ShutdownWalRcv(); - unlink(TriggerFile); - return true; - } - return false; -} - -/* * Request postmaster to start walreceiver. * * recptr indicates the position where streaming should begin, and conninfo @@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + pg_time_t now = (pg_time_t) time(NULL); - Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); + /* + * We always start at the beginning of the segment. + * That prevents a broken segment (i.e., with no records in the + * first half of a segment) from being created by XLOG streaming, + * which might cause trouble later on if the segment is e.g + * archived. + */ + if (recptr.xrecoff % XLogSegSize != 0) + recptr.xrecoff -= recptr.xrecoff % XLogSegSize; + + /* It better be stopped before we try to restart it */ + Assert(walrcv->walRcvState == WALRCV_STOPPED); - /* locking is just pro forma here; walreceiver isn't started yet */ SpinLockAcquire(&walrcv->mutex); - walrcv->receivedUpto = recptr; if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; - walrcv->walRcvState = WALRCV_RUNNING; + walrcv->walRcvState = WALRCV_STARTING; + walrcv->startTime = now; + + walrcv->receivedUpto = recptr; SpinLockRelease(&walrcv->mutex); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); @@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void) return recptr; } + |