diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/access/transam/xlog.c | 216 | ||||
| -rw-r--r-- | src/backend/postmaster/walwriter.c | 7 | ||||
| -rw-r--r-- | src/include/access/xlog.h | 1 | ||||
| -rw-r--r-- | src/include/access/xlogdefs.h | 1 | 
4 files changed, 215 insertions, 10 deletions
| diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6345d0746c0..29604851218 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -709,6 +709,18 @@ typedef struct XLogCtlData  	XLogRecPtr	lastFpwDisableRecPtr;  	slock_t		info_lck;		/* locks shared variables shown above */ + +	/* +	 * Variables used to track segment-boundary-crossing WAL records.  See +	 * RegisterSegmentBoundary.  Protected by segtrack_lck. +	 */ +	XLogSegNo	lastNotifiedSeg; +	XLogSegNo	earliestSegBoundary; +	XLogRecPtr	earliestSegBoundaryEndPtr; +	XLogSegNo	latestSegBoundary; +	XLogRecPtr	latestSegBoundaryEndPtr; + +	slock_t		segtrack_lck;	/* locks shared variables shown above */  } XLogCtlData;  static XLogCtlData *XLogCtl = NULL; @@ -899,6 +911,7 @@ static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecP  static void RemoveXlogFile(const char *segname, XLogRecPtr lastredoptr, XLogRecPtr endptr);  static void UpdateLastRemovedPtr(char *filename);  static void ValidateXLOGDirectoryStructure(void); +static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);  static void CleanupBackupHistory(void);  static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);  static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, @@ -1129,23 +1142,56 @@ XLogInsertRecord(XLogRecData *rdata,  	END_CRIT_SECTION();  	/* -	 * Update shared LogwrtRqst.Write, if we crossed page boundary. +	 * If we crossed page boundary, update LogwrtRqst.Write; if we crossed +	 * segment boundary, register that and wake up walwriter.  	 */  	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)  	{ +		XLogSegNo	StartSeg; +		XLogSegNo	EndSeg; + +		XLByteToSeg(StartPos, StartSeg, wal_segment_size); +		XLByteToSeg(EndPos, EndSeg, wal_segment_size); + +		/* +		 * Register our crossing the segment boundary if that occurred. +		 * +		 * Note that we did not use XLByteToPrevSeg() for determining the +		 * ending segment.  This is so that a record that fits perfectly into +		 * the end of the segment causes the latter to get marked ready for +		 * archival immediately. +		 */ +		if (StartSeg != EndSeg && XLogArchivingActive()) +			RegisterSegmentBoundary(EndSeg, EndPos); + +		/* +		 * Advance LogwrtRqst.Write so that it includes new block(s). +		 * +		 * We do this after registering the segment boundary so that the +		 * comparison with the flushed pointer below can use the latest value +		 * known globally. +		 */  		SpinLockAcquire(&XLogCtl->info_lck); -		/* advance global request to include new block(s) */  		if (XLogCtl->LogwrtRqst.Write < EndPos)  			XLogCtl->LogwrtRqst.Write = EndPos;  		/* update local result copy while I have the chance */  		LogwrtResult = XLogCtl->LogwrtResult;  		SpinLockRelease(&XLogCtl->info_lck); + +		/* +		 * There's a chance that the record was already flushed to disk and we +		 * missed marking segments as ready for archive.  If this happens, we +		 * nudge the WALWriter, which will take care of notifying segments as +		 * needed. +		 */ +		if (StartSeg != EndSeg && XLogArchivingActive() && +			LogwrtResult.Flush >= EndPos && ProcGlobal->walwriterLatch) +			SetLatch(ProcGlobal->walwriterLatch);  	}  	/*  	 * If this was an XLOG_SWITCH record, flush the record and the empty -	 * padding space that fills the rest of the segment, and perform -	 * end-of-segment actions (eg, notifying archiver). +	 * padding space that fills the rest of the segment.  	 */  	if (isLogSwitch)  	{ @@ -2388,6 +2434,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)  	/* We should always be inside a critical section here */  	Assert(CritSectionCount > 0); +	Assert(LWLockHeldByMe(WALWriteLock));  	/*  	 * Update local LogwrtResult (caller probably did this already, but...) @@ -2524,11 +2571,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)  			 * later. Doing it here ensures that one and only one backend will  			 * perform this fsync.  			 * -			 * This is also the right place to notify the Archiver that the -			 * segment is ready to copy to archival storage, and to update the -			 * timer for archive_timeout, and to signal for a checkpoint if -			 * too many logfile segments have been used since the last -			 * checkpoint. +			 * If WAL archiving is active, we attempt to notify the archiver +			 * of any segments that are now ready for archival. +			 * +			 * This is also the right place to update the timer for +			 * archive_timeout and to signal for a checkpoint if too many +			 * logfile segments have been used since the last checkpoint.  			 */  			if (finishing_seg)  			{ @@ -2540,7 +2588,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)  				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */  				if (XLogArchivingActive()) -					XLogArchiveNotifySeg(openLogSegNo); +					NotifySegmentsReadyForArchive(LogwrtResult.Flush);  				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);  				XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; @@ -2627,6 +2675,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)  			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;  		SpinLockRelease(&XLogCtl->info_lck);  	} + +	if (XLogArchivingActive()) +		NotifySegmentsReadyForArchive(LogwrtResult.Flush);  }  /* @@ -4228,6 +4279,129 @@ ValidateXLOGDirectoryStructure(void)  }  /* + * RegisterSegmentBoundary + * + * WAL records that are split across a segment boundary require special + * treatment for archiving: the initial segment must not be archived until + * the end segment has been flushed, in case we crash before we have + * the chance to flush the end segment (because after recovery we would + * overwrite that WAL record with a different one, and so the file we + * archived no longer represents truth.)  This also applies to streaming + * physical replication. + * + * To handle this, we keep track of the LSN of WAL records that cross + * segment boundaries.  Two such are sufficient: the ones with the + * earliest and the latest end pointers we know about, since the flush + * position advances monotonically.  WAL record writers register + * boundary-crossing records here, which is used by .ready file creation + * to delay until the end segment is known flushed. + */ +static void +RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos) +{ +	XLogSegNo	segno PG_USED_FOR_ASSERTS_ONLY; + +	/* verify caller computed segment number correctly */ +	AssertArg((XLByteToSeg(endpos, segno, wal_segment_size), segno == seg)); + +	SpinLockAcquire(&XLogCtl->segtrack_lck); + +	/* +	 * If no segment boundaries are registered, store the new segment boundary +	 * in earliestSegBoundary.  Otherwise, store the greater segment +	 * boundaries in latestSegBoundary. +	 */ +	if (XLogCtl->earliestSegBoundary == MaxXLogSegNo) +	{ +		XLogCtl->earliestSegBoundary = seg; +		XLogCtl->earliestSegBoundaryEndPtr = endpos; +	} +	else if (seg > XLogCtl->earliestSegBoundary && +			 (XLogCtl->latestSegBoundary == MaxXLogSegNo || +			  seg > XLogCtl->latestSegBoundary)) +	{ +		XLogCtl->latestSegBoundary = seg; +		XLogCtl->latestSegBoundaryEndPtr = endpos; +	} + +	SpinLockRelease(&XLogCtl->segtrack_lck); +} + +/* + * NotifySegmentsReadyForArchive + * + * Mark segments as ready for archival, given that it is safe to do so. + * This function is idempotent. + */ +void +NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) +{ +	XLogSegNo	latest_boundary_seg; +	XLogSegNo	last_notified; +	XLogSegNo	flushed_seg; +	XLogSegNo	seg; +	bool		keep_latest; + +	XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size); + +	SpinLockAcquire(&XLogCtl->segtrack_lck); + +	if (XLogCtl->latestSegBoundary <= flushed_seg && +		XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr) +	{ +		latest_boundary_seg = XLogCtl->latestSegBoundary; +		keep_latest = false; +	} +	else if (XLogCtl->earliestSegBoundary <= flushed_seg && +			 XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr) +	{ +		latest_boundary_seg = XLogCtl->earliestSegBoundary; +		keep_latest = true; +	} +	else +	{ +		SpinLockRelease(&XLogCtl->segtrack_lck); +		return; +	} + +	last_notified = XLogCtl->lastNotifiedSeg; + +	/* +	 * Update shared memory and discard segment boundaries that are no longer +	 * needed. +	 * +	 * It is safe to update shared memory before we attempt to create the +	 * .ready files.  If our calls to XLogArchiveNotifySeg() fail, +	 * RemoveOldXlogFiles() will retry it as needed. +	 */ +	if (last_notified < latest_boundary_seg - 1) +		XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1; + +	if (keep_latest) +	{ +		XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary; +		XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr; +	} +	else +	{ +		XLogCtl->earliestSegBoundary = MaxXLogSegNo; +		XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; +	} + +	XLogCtl->latestSegBoundary = MaxXLogSegNo; +	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr; + +	SpinLockRelease(&XLogCtl->segtrack_lck); + +	/* +	 * Notify archiver about segments that are ready for archival (by creating +	 * the corresponding .ready files). +	 */ +	for (seg = last_notified + 1; seg < latest_boundary_seg; seg++) +		XLogArchiveNotifySeg(seg); +} + +/*   * Remove previous backup history files.  This also retries creation of   * .ready files for any backup history files for which XLogArchiveNotify   * failed earlier. @@ -5112,8 +5286,16 @@ XLOGShmemInit(void)  	SpinLockInit(&XLogCtl->Insert.insertpos_lck);  	SpinLockInit(&XLogCtl->info_lck); +	SpinLockInit(&XLogCtl->segtrack_lck);  	SpinLockInit(&XLogCtl->ulsn_lck);  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch); + +	/* Initialize stuff for marking segments as ready for archival. */ +	XLogCtl->lastNotifiedSeg = MaxXLogSegNo; +	XLogCtl->earliestSegBoundary = MaxXLogSegNo; +	XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr; +	XLogCtl->latestSegBoundary = MaxXLogSegNo; +	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;  }  /* @@ -7606,6 +7788,20 @@ StartupXLOG(void)  	XLogCtl->LogwrtRqst.Flush = EndOfLog;  	/* +	 * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file. +	 */ +	if (XLogArchivingActive()) +	{ +		XLogSegNo	EndOfLogSeg; + +		XLByteToSeg(EndOfLog, EndOfLogSeg, wal_segment_size); + +		SpinLockAcquire(&XLogCtl->segtrack_lck); +		XLogCtl->lastNotifiedSeg = EndOfLogSeg - 1; +		SpinLockRelease(&XLogCtl->segtrack_lck); +	} + +	/*  	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE  	 * record before resource manager writes cleanup WAL records or checkpoint  	 * record is written. diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index a6fdba3f413..6cf6434fee4 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -257,6 +257,13 @@ WalWriterMain(void)  		}  		/* +		 * Notify the archiver of any WAL segments that are ready.  We do this +		 * here to handle a race condition where WAL is flushed to disk prior +		 * to registering the segment boundary. +		 */ +		NotifySegmentsReadyForArchive(GetFlushRecPtr()); + +		/*  		 * Do what we're here for; then, if XLogBackgroundFlush() found useful  		 * work to do, reset hibernation counter.  		 */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0bed7e337ce..910200337b1 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -322,6 +322,7 @@ extern XLogRecPtr GetInsertRecPtr(void);  extern XLogRecPtr GetFlushRecPtr(void);  extern XLogRecPtr GetLastImportantRecPtr(void);  extern void RemovePromoteSignalFiles(void); +extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);  extern bool CheckPromoteSignal(void);  extern void WakeupRecovery(void); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index daded3dca05..6088ac87c74 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -39,6 +39,7 @@ typedef uint64 XLogRecPtr;   * XLogSegNo - physical log file sequence number.   */  typedef uint64 XLogSegNo; +#define MaxXLogSegNo	((XLogSegNo) 0xFFFFFFFFFFFFFFFF)  /*   * TimeLineID (TLI) - identifies different database histories to prevent | 
