diff options
Diffstat (limited to 'src/backend/access')
| -rw-r--r-- | src/backend/access/transam/xlog.c | 453 | 
1 files changed, 285 insertions, 168 deletions
| diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9208bc21d46..c8ac97fbf7f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -506,12 +506,18 @@ static XLogwrtResult LogwrtResult = {0, 0};  /*   * Codes indicating where we got a WAL file from during recovery, or where - * to attempt to get one.  These are chosen so that they can be OR'd together - * in a bitmask state variable. + * to attempt to get one.   */ -#define XLOG_FROM_ARCHIVE		(1<<0)	/* Restored using restore_command */ -#define XLOG_FROM_PG_XLOG		(1<<1)	/* Existing file in pg_xlog */ -#define XLOG_FROM_STREAM		(1<<2)	/* Streamed from master */ +typedef enum +{ +	XLOG_FROM_ANY = 0,		/* request to read WAL from any source */ +	XLOG_FROM_ARCHIVE,		/* restored using restore_command */ +	XLOG_FROM_PG_XLOG,		/* existing file in pg_xlog */ +	XLOG_FROM_STREAM,		/* streamed from master */ +} XLogSource; + +/* human-readable names for XLogSources, for debugging output */ +static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };  /*   * openLogFile is -1 or a kernel FD for an open log file segment. @@ -536,22 +542,28 @@ static XLogSegNo readSegNo = 0;  static uint32 readOff = 0;  static uint32 readLen = 0;  static bool	readFileHeaderValidated = false; -static int	readSource = 0;		/* XLOG_FROM_* code */ +static XLogSource readSource = 0;		/* XLOG_FROM_* code */  /* - * Keeps track of which sources we've tried to read the current WAL - * record from and failed. + * Keeps track of which source we're currently reading from. This is + * different from readSource in that this is always set, even when we don't + * currently have a WAL file open. If lastSourceFailed is set, our last + * attempt to read from currentSource failed, and we should try another source + * next.   */ -static int	failedSources = 0;	/* OR of XLOG_FROM_* codes */ +static XLogSource currentSource = 0;	/* XLOG_FROM_* code */ +static bool	lastSourceFailed = false;  /*   * These variables track when we last obtained some WAL data to process,   * and where we got it from.  (XLogReceiptSource is initially the same as   * readSource, but readSource gets reset to zero when we don't have data - * to process right now.) + * to process right now.  It is also different from currentSource, which + * also changes when we try to read from a source and fail, while + * XLogReceiptSource tracks where we last successfully read some WAL.)   */  static TimestampTz XLogReceiptTime = 0; -static int	XLogReceiptSource = 0;		/* XLOG_FROM_* code */ +static XLogSource XLogReceiptSource = 0;	/* XLOG_FROM_* code */  /* Buffer for currently read page (XLOG_BLCKSZ bytes) */  static char *readBuf = NULL; @@ -605,7 +617,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,  					   bool use_lock);  static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,  			 int source, bool notexistOk); -static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources); +static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);  static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,  			 bool randAccess);  static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, @@ -2551,7 +2563,7 @@ XLogFileOpen(XLogSegNo segno)  /*   * Open a logfile segment for reading (during recovery).   * - * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive. + * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.   * Otherwise, it's assumed to be already available in pg_xlog.   */  static int @@ -2697,7 +2709,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,   * This version searches for the segment with any TLI listed in expectedTLIs.   */  static int -XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) +XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)  {  	char		path[MAXPGPATH];  	ListCell   *cell; @@ -2720,7 +2732,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)  		if (tli < curFileTLI)  			break;				/* don't bother looking at too-old TLIs */ -		if (sources & XLOG_FROM_ARCHIVE) +		if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)  		{  			fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);  			if (fd != -1) @@ -2730,7 +2742,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)  			}  		} -		if (sources & XLOG_FROM_PG_XLOG) +		if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)  		{  			fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);  			if (fd != -1) @@ -3332,7 +3344,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)  	}  	/* This is the first try to read this page. */ -	failedSources = 0; +	lastSourceFailed = false;  retry:  	/* Read the page containing the record */  	if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) @@ -3545,7 +3557,7 @@ retry:  	return record;  next_record_is_invalid: -	failedSources |= readSource; +	lastSourceFailed = true;  	if (readFile >= 0)  	{ @@ -9162,7 +9174,7 @@ CancelBackup(void)   * In standby mode, if after a successful return of XLogPageRead() the   * caller finds the record it's interested in to be broken, it should   * ereport the error with the level determined by - * emode_for_corrupt_record(), and then set "failedSources |= readSource" + * emode_for_corrupt_record(), and then set lastSourceFailed   * and call XLogPageRead() again with the same arguments. This lets   * XLogPageRead() to try fetching the record from another source, or to   * sleep and retry. @@ -9180,7 +9192,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,  	targetRecOff = (*RecPtr) % XLOG_BLCKSZ;  	/* Fast exit if we have read the record in the current buffer already */ -	if (failedSources == 0 && targetSegNo == readSegNo && +	if (!lastSourceFailed && targetSegNo == readSegNo &&  		targetPageOff == readOff && targetRecOff < readLen)  		return true; @@ -9227,17 +9239,18 @@ retry:  			/* In archive or crash recovery. */  			if (readFile < 0)  			{ -				int			sources; +				int			source;  				/* Reset curFileTLI if random fetch. */  				if (randAccess)  					curFileTLI = 0; -				sources = XLOG_FROM_PG_XLOG;  				if (InArchiveRecovery) -					sources |= XLOG_FROM_ARCHIVE; +					source = XLOG_FROM_ANY; +				else +					source = XLOG_FROM_PG_XLOG; -				readFile = XLogFileReadAnyTLI(readSegNo, emode, sources); +				readFile = XLogFileReadAnyTLI(readSegNo, emode, source);  				if (readFile < 0)  					return false;  			} @@ -9326,7 +9339,7 @@ retry:  	return true;  next_record_is_invalid: -	failedSources |= readSource; +	lastSourceFailed = true;  	if (readFile >= 0)  		close(readFile); @@ -9366,185 +9379,289 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,  							bool fetching_ckpt)  {  	static pg_time_t last_fail_time = 0; +	pg_time_t now; + +	/*------- +	 * Standby mode is implemented by a state machine: +	 * +	 * 1. Read from archive	(XLOG_FROM_ARCHIVE) +	 * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG) +	 * 3. Check trigger file +	 * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM) +	 * 5. Rescan timelines +	 * 6. Sleep 5 seconds, and loop back to 1. +	 * +	 * Failure to read from the current source advances the state machine to +	 * the next state. In addition, successfully reading a file from pg_xlog +	 * moves the state machine from state 2 back to state 1 (we always prefer +	 * files in the archive over files in pg_xlog). +	 * +	 * 'currentSource' indicates the current state. There are no currentSource +	 * values for "check trigger", "rescan timelines", and "sleep" states, +	 * those actions are taken when reading from the previous source fails, as +	 * part of advancing to the next state. +	 *------- +	 */ +	if (currentSource == 0) +		currentSource = XLOG_FROM_ARCHIVE;  	for (;;)  	{ -		if (WalRcvInProgress()) +		int		oldSource = currentSource; + +		/* +		 * First check if we failed to read from the current source, and +		 * advance the state machine if so. The failure to read might've +		 * happened outside this function, e.g when a CRC check fails on a +		 * record, or within this loop. +		 */ +		if (lastSourceFailed)  		{ -			bool		havedata; -			/* -			 * If we find an invalid record in the WAL streamed from master, -			 * something is seriously wrong. There's little chance that the -			 * problem will just go away, but PANIC is not good for -			 * availability either, especially in hot standby mode. -			 * Disconnect, and retry from archive/pg_xlog again. The WAL in -			 * the archive should be identical to what was streamed, so it's -			 * unlikely that it helps, but one can hope... -			 */ -			if (failedSources & XLOG_FROM_STREAM) +			switch (currentSource)  			{ -				ShutdownWalRcv(); -				continue; -			} +				case XLOG_FROM_ARCHIVE: +					currentSource = XLOG_FROM_PG_XLOG; +					break; -			/* -			 * Walreceiver is active, so see if new data has arrived. -			 * -			 * We only advance XLogReceiptTime when we obtain fresh WAL from -			 * walreceiver and observe that we had already processed -			 * everything before the most recent "chunk" that it flushed to -			 * disk.  In steady state where we are keeping up with the -			 * incoming data, XLogReceiptTime will be updated on each cycle. -			 * When we are behind, XLogReceiptTime will not advance, so the -			 * grace time allotted to conflicting queries will decrease. -			 */ -			if (XLByteLT(RecPtr, receivedUpto)) -				havedata = true; -			else -			{ -				XLogRecPtr	latestChunkStart; +				case XLOG_FROM_PG_XLOG: +					/* +					 * Check to see if the trigger file exists. Note that we do +					 * this only after failure, so when you create the trigger +					 * file, we still finish replaying as much as we can from +					 * archive and pg_xlog before failover. +					 */ +					if (CheckForStandbyTrigger()) +						return false; -				receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); -				if (XLByteLT(RecPtr, receivedUpto)) -				{ -					havedata = true; -					if (!XLByteLT(RecPtr, latestChunkStart)) +					/* +					 * If primary_conninfo is set, launch walreceiver to try to +					 * stream the missing WAL. +					 * +					 * If fetching_ckpt is TRUE, RecPtr points to the initial +					 * checkpoint location. In that case, we use RedoStartLSN +					 * as the streaming start position instead of RecPtr, so +					 * that when we later jump backwards to start redo at +					 * RedoStartLSN, we will have the logs streamed already. +					 */ +					if (PrimaryConnInfo)  					{ -						XLogReceiptTime = GetCurrentTimestamp(); -						SetCurrentChunkStartTime(XLogReceiptTime); +						XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; + +						RequestXLogStreaming(ptr, PrimaryConnInfo);  					} -				} -				else -					havedata = false; -			} -			if (havedata) -			{ -				/* -				 * Great, streamed far enough.  Open the file if it's not open -				 * already.  Use XLOG_FROM_STREAM so that source info is set -				 * correctly and XLogReceiptTime isn't changed. -				 */ -				if (readFile < 0) -				{ -					readFile = XLogFileRead(readSegNo, PANIC, -											recoveryTargetTLI, -											XLOG_FROM_STREAM, false); -					Assert(readFile >= 0); -				} -				else -				{ -					/* just make sure source info is correct... */ -					readSource = XLOG_FROM_STREAM; -					XLogReceiptSource = XLOG_FROM_STREAM; -				} -				break; -			} +					/* +					 * Move to XLOG_FROM_STREAM state in either case. We'll get +					 * immediate failure if we didn't launch walreceiver, and +					 * move on to the next state. +					 */ +					currentSource = XLOG_FROM_STREAM; +					break; -			/* -			 * Data not here yet, so check for trigger then sleep for five -			 * seconds like in the WAL file polling case below. -			 */ -			if (CheckForStandbyTrigger()) -				return false; +				case XLOG_FROM_STREAM: +					/* +					 * Failure while streaming. Most likely, we got here because +					 * streaming replication was terminated, or promotion was +					 * triggered. But we also get here if we find an invalid +					 * record in the WAL streamed from master, in which case +					 * something is seriously wrong. There's little chance that +					 * the problem will just go away, but PANIC is not good for +					 * availability either, especially in hot standby mode. So, +					 * we treat that the same as disconnection, and retry from +					 * archive/pg_xlog again. The WAL in the archive should be +					 * identical to what was streamed, so it's unlikely that it +					 * helps, but one can hope... +					 */ +					/* +					 * Before we leave XLOG_FROM_STREAM state, make sure that +					 * walreceiver is not running, so that it won't overwrite +					 * any WAL that we restore from archive. +					 */ +					if (WalRcvInProgress()) +						ShutdownWalRcv(); -			/* -			 * Wait for more WAL to arrive, or timeout to be reached -			 */ -			WaitLatch(&XLogCtl->recoveryWakeupLatch, -					  WL_LATCH_SET | WL_TIMEOUT, -					  5000L); -			ResetLatch(&XLogCtl->recoveryWakeupLatch); +					/* +					 * Before we sleep, re-scan for possible new timelines if +					 * we were requested to recover to the latest timeline. +					 */ +					if (recoveryTargetIsLatest) +					{ +						if (rescanLatestTimeLine()) +						{ +							currentSource = XLOG_FROM_ARCHIVE; +							break; +						} +					} + +					/* +					 * XLOG_FROM_STREAM is the last state in our state machine, +					 * so we've exhausted all the options for obtaining the +					 * requested WAL. We're going to loop back and retry from +					 * the archive, but if it hasn't been long since last +					 * attempt, sleep 5 seconds to avoid busy-waiting. +					 */ +					now = (pg_time_t) time(NULL); +					if ((now - last_fail_time) < 5) +					{ +						pg_usleep(1000000L * (5 - (now - last_fail_time))); +						now = (pg_time_t) time(NULL); +					} +					last_fail_time = now; +					currentSource = XLOG_FROM_ARCHIVE; +					break; + +				default: +					elog(ERROR, "unexpected WAL source %d", currentSource); +			}  		} -		else +		else if (currentSource == XLOG_FROM_PG_XLOG)  		{  			/* -			 * WAL receiver is not active. Poll the archive. +			 * We just successfully read a file in pg_xlog. We prefer files +			 * in the archive over ones in pg_xlog, so try the next file +			 * again from the archive first.  			 */ -			int			sources; -			pg_time_t	now; +			currentSource = XLOG_FROM_ARCHIVE; +		} -			if (readFile >= 0) -			{ -				close(readFile); -				readFile = -1; -			} -			/* Reset curFileTLI if random fetch. */ -			if (randAccess) -				curFileTLI = 0; +		if (currentSource != oldSource) +			elog(LOG, "switched WAL source from %s to %s after %s", +				 xlogSourceNames[oldSource], xlogSourceNames[currentSource], +				 lastSourceFailed ? "failure" : "success"); + +		/* +		 * We've now handled possible failure. Try to read from the chosen +		 * source. +		 */ +		lastSourceFailed = false; + +		switch (currentSource) +		{ +			case XLOG_FROM_ARCHIVE: +			case XLOG_FROM_PG_XLOG: +				/* Close any old file we might have open. */ +				if (readFile >= 0) +				{ +					close(readFile); +					readFile = -1; +				} +				/* Reset curFileTLI if random fetch. */ +				if (randAccess) +					curFileTLI = 0; -			/* -			 * Try to restore the file from archive, or read an existing file -			 * from pg_xlog. -			 */ -			sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; -			if (!(sources & ~failedSources)) -			{  				/* -				 * We've exhausted all options for retrieving the file. Retry. +				 * Try to restore the file from archive, or read an existing +				 * file from pg_xlog.  				 */ -				failedSources = 0; +				readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource); +				if (readFile >= 0) +					return true;	/* success! */  				/* -				 * Before we sleep, re-scan for possible new timelines if we -				 * were requested to recover to the latest timeline. +				 * Nope, not found in archive or pg_xlog.  				 */ -				if (recoveryTargetIsLatest) -				{ -					if (rescanLatestTimeLine()) -						continue; -				} +				lastSourceFailed = true; +				break; + +			case XLOG_FROM_STREAM: +			{ +				bool		havedata;  				/* -				 * If it hasn't been long since last attempt, sleep to avoid -				 * busy-waiting. +				 * Check if WAL receiver is still active.  				 */ -				now = (pg_time_t) time(NULL); -				if ((now - last_fail_time) < 5) +				if (!WalRcvInProgress())  				{ -					pg_usleep(1000000L * (5 - (now - last_fail_time))); -					now = (pg_time_t) time(NULL); +					lastSourceFailed = true; +					break;  				} -				last_fail_time = now;  				/* -				 * If primary_conninfo is set, launch walreceiver to try to -				 * stream the missing WAL, before retrying to restore from -				 * archive/pg_xlog. +				 * Walreceiver is active, so see if new data has arrived.  				 * -				 * If fetching_ckpt is TRUE, RecPtr points to the initial -				 * checkpoint location. In that case, we use RedoStartLSN as -				 * the streaming start position instead of RecPtr, so that -				 * when we later jump backwards to start redo at RedoStartLSN, -				 * we will have the logs streamed already. +				 * We only advance XLogReceiptTime when we obtain fresh WAL +				 * from walreceiver and observe that we had already processed +				 * everything before the most recent "chunk" that it flushed to +				 * disk.  In steady state where we are keeping up with the +				 * incoming data, XLogReceiptTime will be updated on each cycle. +				 * When we are behind, XLogReceiptTime will not advance, so the +				 * grace time allotted to conflicting queries will decrease.  				 */ -				if (PrimaryConnInfo) +				if (XLByteLT(RecPtr, receivedUpto)) +					havedata = true; +				else  				{ -					XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; +					XLogRecPtr	latestChunkStart; -					RequestXLogStreaming(ptr, PrimaryConnInfo); -					continue; +					receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); +					if (XLByteLT(RecPtr, receivedUpto)) +					{ +						havedata = true; +						if (!XLByteLT(RecPtr, latestChunkStart)) +						{ +							XLogReceiptTime = GetCurrentTimestamp(); +							SetCurrentChunkStartTime(XLogReceiptTime); +						} +					} +					else +						havedata = false; +				} +				if (havedata) +				{ +					/* +					 * Great, streamed far enough.  Open the file if it's not +					 * open already.  Use XLOG_FROM_STREAM so that source info +					 * is set correctly and XLogReceiptTime isn't changed. +					 */ +					if (readFile < 0) +					{ +						readFile = XLogFileRead(readSegNo, PANIC, +												recoveryTargetTLI, +												XLOG_FROM_STREAM, false); +						Assert(readFile >= 0); +					} +					else +					{ +						/* just make sure source info is correct... */ +						readSource = XLOG_FROM_STREAM; +						XLogReceiptSource = XLOG_FROM_STREAM; +						return true; +					} +					break;  				} -			} -			/* Don't try to read from a source that just failed */ -			sources &= ~failedSources; -			readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources); -			if (readFile >= 0) -				break; -			/* -			 * Nope, not found in archive and/or pg_xlog. -			 */ -			failedSources |= sources; +				/* +				 * Data not here yet. Check for trigger, then wait for +				 * walreceiver to wake us up when new WAL arrives. +				 */ +				if (CheckForStandbyTrigger()) +				{ +					/* +					 * Note that we don't "return false" immediately here. +					 * After being triggered, we still want to replay all the +					 * WAL that was already streamed. It's in pg_xlog now, so +					 * we just treat this as a failure, and the state machine +					 * will move on to replay the streamed WAL from pg_xlog, +					 * and then recheck the trigger and exit replay. +					 */ +					lastSourceFailed = true; +					break; +				} -			/* -			 * Check to see if the trigger file exists. Note that we do this -			 * only after failure, so when you create the trigger file, we -			 * still finish replaying as much as we can from archive and -			 * pg_xlog before failover. -			 */ -			if (CheckForStandbyTrigger()) -				return false; +				/* +				 * Wait for more WAL to arrive. Time out after 5 seconds, like +				 * when polling the archive, to react to a trigger file +				 * promptly. +				 */ +				WaitLatch(&XLogCtl->recoveryWakeupLatch, +						  WL_LATCH_SET | WL_TIMEOUT, +						  5000L); +				ResetLatch(&XLogCtl->recoveryWakeupLatch); +				break; +			} + +			default: +				elog(ERROR, "unexpected WAL source %d", currentSource);  		}  		/* @@ -9554,7 +9671,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,  		HandleStartupProcInterrupts();  	} -	return true; +	return false; 	/* not reached */  }  /* | 
