diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 538 |
1 files changed, 480 insertions, 58 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9a13a98cbc1..a0dddcad813 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.357 2010/01/04 12:50:49 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.358 2010/01/15 09:19:00 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -41,6 +41,8 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -142,6 +144,16 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; /* + * Are we doing recovery from XLOG stream? If so, we recover without using + * offline XLOG archives even though InArchiveRecovery==true. This flag is + * used only in standby mode. + */ +static bool InStreamingRecovery = false; + +/* The current log page is partially-filled, and so needs to be read again? */ +static bool needReread = false; + +/* * Local copy of SharedRecoveryInProgress variable. True actually means "not * known, need to check the shared state". */ @@ -165,7 +177,7 @@ static bool InArchiveRecovery = false; /* Was the last xlog file restored from archive, or local? */ static bool restoredFromArchive = false; -/* options taken from recovery.conf */ +/* options taken from recovery.conf for archive recovery */ static char *recoveryRestoreCommand = NULL; static char *recoveryEndCommand = NULL; static bool recoveryTarget = false; @@ -175,6 +187,11 @@ static TransactionId recoveryTargetXid; static TimestampTz recoveryTargetTime; static TimestampTz recoveryLastXTime = 0; +/* options taken from recovery.conf for XLOG streaming */ +static bool StandbyMode = false; +static char *PrimaryConnInfo = NULL; +char *TriggerFile = NULL; + /* if recoveryStopsHere returns true, it saves actual stop xid/time here */ static TransactionId recoveryStopXid; static TimestampTz recoveryStopTime; @@ -229,6 +246,18 @@ XLogRecPtr XactLastRecEnd = {0, 0}; */ static XLogRecPtr RedoRecPtr; +/* + * RedoStartLSN points to the checkpoint's REDO location which is specified + * in a backup label file, backup history file or control file. In standby + * mode, XLOG streaming usually starts from the position where an invalid + * record was found. But if we fail to read even the initial checkpoint + * record, we use the REDO location instead of the checkpoint location as + * the start position of XLOG streaming. Otherwise we would have to jump + * backwards to the REDO location after reading the checkpoint record, + * because the REDO record can precede the checkpoint record. + */ +static XLogRecPtr RedoStartLSN = {0, 0}; + /*---------- * Shared-memory data structures for XLOG control * @@ -349,6 +378,7 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ TimeLineID ThisTimeLineID; + TimeLineID RecoveryTargetTLI; /* * SharedRecoveryInProgress indicates if we're still in crash or archive @@ -369,6 +399,8 @@ typedef struct XLogCtlData XLogRecPtr replayEndRecPtr; /* timestamp of last record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* end+1 of the last record replayed */ + XLogRecPtr recoveryLastRecPtr; slock_t info_lck; /* locks shared variables shown above */ } XLogCtlData; @@ -481,12 +513,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); static bool AdvanceXLInsertBuffer(bool new_segment); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); -static int XLogFileInit(uint32 log, uint32 seg, - bool *use_existent, bool use_lock); static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock); -static int XLogFileOpen(uint32 log, uint32 seg); static int XLogFileRead(uint32 log, uint32 seg, int emode); static void XLogFileClose(void); static bool RestoreArchivedFile(char *path, const char *xlogfname, @@ -497,6 +526,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); +static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode); static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); @@ -513,7 +543,6 @@ static char *str_time(pg_time_t tnow); #ifdef WAL_DEBUG static void xlog_outrec(StringInfo buf, XLogRecord *record); #endif -static void issue_xlog_fsync(void); static void pg_start_backup_callback(int code, Datum arg); static bool read_backup_label(XLogRecPtr *checkPointLoc); static void rm_redo_error_callback(void *arg); @@ -1690,7 +1719,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) */ if (finishing_seg || (xlog_switch && last_iteration)) { - issue_xlog_fsync(); + issue_xlog_fsync(openLogFile, openLogId, openLogSeg); LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) @@ -1754,7 +1783,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } - issue_xlog_fsync(); + issue_xlog_fsync(openLogFile, openLogId, openLogSeg); } LogwrtResult.Flush = LogwrtResult.Write; } @@ -2189,7 +2218,7 @@ XLogNeedsFlush(XLogRecPtr record) * take down the system on failure). They will promote to PANIC if we are * in a critical section. */ -static int +int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock) { @@ -2536,7 +2565,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, /* * Open a pre-existing logfile segment for writing. */ -static int +int XLogFileOpen(uint32 log, uint32 seg) { char path[MAXPGPATH]; @@ -2586,7 +2615,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode) XLogFileName(xlogfname, tli, log, seg); - if (InArchiveRecovery) + if (InArchiveRecovery && !InStreamingRecovery) { /* Report recovery progress in PS display */ snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", @@ -2641,12 +2670,13 @@ XLogFileClose(void) /* * WAL segment files will not be re-read in normal operation, so we advise * the OS to release any cached pages. But do not do so if WAL archiving - * is active, because archiver process could use the cache to read the WAL - * segment. Also, don't bother with it if we are using O_DIRECT, since - * the kernel is presumably not caching in that case. + * or streaming is active, because archiver and walsender process could use + * the cache to read the WAL segment. Also, don't bother with it if we + * are using O_DIRECT, since the kernel is presumably not caching in that + * case. */ #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) - if (!XLogArchivingActive() && + if (!XLogIsNeeded() && (get_sync_bit(sync_method) & PG_O_DIRECT) == 0) (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED); #endif @@ -2689,6 +2719,10 @@ RestoreArchivedFile(char *path, const char *xlogfname, uint32 restartLog; uint32 restartSeg; + /* In standby mode, restore_command might not be supplied */ + if (StandbyMode && recoveryRestoreCommand == NULL) + goto not_available; + /* * When doing archive recovery, we always prefer an archived log file even * if a file of the same name exists in XLOGDIR. The reason is that the @@ -2913,6 +2947,7 @@ RestoreArchivedFile(char *path, const char *xlogfname, (errmsg("could not restore file \"%s\" from archive: return code %d", xlogfname, rc))); +not_available: /* * if an archived file is not available, there might still be a version of * this file in XLOGDIR, so return that as the filename to open. @@ -3117,7 +3152,18 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) strspn(xlde->d_name, "0123456789ABCDEF") == 24 && strcmp(xlde->d_name + 8, lastoff + 8) <= 0) { - if (XLogArchiveCheckDone(xlde->d_name)) + /* + * Normally we don't delete old XLOG files during recovery to + * avoid accidentally deleting a file that looks stale due to a + * bug or hardware issue, but in fact contains important data. + * During streaming recovery, however, we will eventually fill the + * disk if we never clean up, so we have to. That's not an issue + * with file-based archive recovery because in that case we + * restore one XLOG file at a time, on-demand, and with a + * different filename that can't be confused with regular XLOG + * files. + */ + if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); @@ -3428,19 +3474,92 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) } /* + * Attempt to fetch an XLOG record. + * + * If RecPtr is not NULL, try to fetch a record at that position. Otherwise + * try to fetch a record just after the last one previously read. + * + * In standby mode, if we failed in reading a valid record and are not doing + * recovery from XLOG stream yet, we ignore the failure and start walreceiver + * process to fetch the record from the primary. Otherwise, returns NULL, + * or fails if emode is PANIC. (emode must be either PANIC or LOG.) + * + * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In + * this case, if we have to start XLOG streaming, we use RedoStartLSN as the + * streaming start position instead of RecPtr. + * + * The record is copied into readRecordBuf, so that on successful return, + * the returned record pointer always points there. + */ +static XLogRecord * +FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) +{ + if (StandbyMode && !InStreamingRecovery) + { + XLogRecord *record; + XLogRecPtr startlsn; + bool haveNextRecord = (nextRecord != NULL); + + /* An invalid record is OK here, so we set emode to DEBUG2 */ + record = ReadRecord(RecPtr, DEBUG2); + if (record != NULL) + return record; + + /* + * Start XLOG streaming if there is no more valid records available + * in the archive. + * + * We need to calculate the start position of XLOG streaming. If we + * read a record in the middle of a segment which doesn't exist in + * pg_xlog, we use the start of the segment as the start position. + * That prevents a broken segment (i.e., with no records in the + * first half of a segment) from being created by XLOG streaming, + * which might cause trouble later on if the segment is e.g + * archived. + */ + startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr; + if (startlsn.xrecoff % XLogSegSize != 0) + { + char xlogpath[MAXPGPATH]; + struct stat stat_buf; + uint32 log; + uint32 seg; + + XLByteToSeg(startlsn, log, seg); + XLogFilePath(xlogpath, recoveryTargetTLI, log, seg); + + if (stat(xlogpath, &stat_buf) != 0) + startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize; + } + RequestXLogStreaming(startlsn, PrimaryConnInfo); + + /* Needs to read the current page again if the next record is in it */ + needReread = haveNextRecord; + nextRecord = NULL; + + InStreamingRecovery = true; + ereport(LOG, + (errmsg("starting streaming recovery at %X/%X", + startlsn.xlogid, startlsn.xrecoff))); + } + + return ReadRecord(RecPtr, emode); +} + +/* * Attempt to read an XLOG record. * * If RecPtr is not NULL, try to read a record at that position. Otherwise * try to read a record just after the last one previously read. * * If no valid record is available, returns NULL, or fails if emode is PANIC. - * (emode must be either PANIC or LOG.) + * (emode must be either PANIC, LOG or DEBUG2.) * * The record is copied into readRecordBuf, so that on successful return, * the returned record pointer always points there. */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, int emode) +ReadRecord(XLogRecPtr *RecPtr, int emode_arg) { XLogRecord *record; char *buffer; @@ -3451,6 +3570,19 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) uint32 targetPageOff; uint32 targetRecOff; uint32 pageHeaderSize; + XLogRecPtr receivedUpto = {0,0}; + bool finished; + int emode; + + /* + * We don't expect any invalid records during streaming recovery: we + * should never hit the end of WAL because we wait for it to be streamed. + * Therefore treat any broken WAL as PANIC, instead of failing over. + */ + if (InStreamingRecovery) + emode = PANIC; + else + emode = emode_arg; if (readBuf == NULL) { @@ -3474,14 +3606,13 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) record = nextRecord; goto got_record; } - /* align old recptr to next page */ - if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0) - tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ); - if (tmpRecPtr.xrecoff >= XLogFileSize) - { - (tmpRecPtr.xlogid)++; - tmpRecPtr.xrecoff = 0; - } + + /* + * Align old recptr to next page if the current page is filled and + * doesn't need to be read again. + */ + if (!needReread) + NextLogPage(tmpRecPtr); /* We will account for page header size below */ } else @@ -3507,6 +3638,21 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) close(readFile); readFile = -1; } + + /* Is the target record ready yet? */ + if (InStreamingRecovery) + { + receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished); + if (finished) + { + if (emode_arg == PANIC) + ereport(PANIC, + (errmsg("streaming recovery ended"))); + else + return NULL; + } + } + XLByteToSeg(*RecPtr, readId, readSeg); if (readFile < 0) { @@ -3539,9 +3685,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) } targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - if (readOff != targetPageOff) + if (readOff != targetPageOff || needReread) { readOff = targetPageOff; + needReread = false; if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) { ereport(emode, @@ -3697,6 +3844,7 @@ got_record:; { /* Need to reassemble record */ XLogContRecord *contrecord; + XLogRecPtr nextpagelsn = *RecPtr; uint32 gotlen = len; memcpy(buffer, record, len); @@ -3704,6 +3852,23 @@ got_record:; buffer += len; for (;;) { + /* Is the next page ready yet? */ + if (InStreamingRecovery) + { + if (gotlen != len) + nextpagelsn.xrecoff += XLOG_BLCKSZ; + NextLogPage(nextpagelsn); + receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished); + if (finished) + { + if (emode_arg == PANIC) + ereport(PANIC, + (errmsg("streaming recovery ended"))); + else + return NULL; + } + } + readOff += XLOG_BLCKSZ; if (readOff >= XLogSegSize) { @@ -3768,6 +3933,21 @@ got_record:; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + pageHeaderSize + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len); + + /* + * Check whether the current page needs to be read again. If there is no + * unread record in the current page (nextRecord == NULL), obviously we + * don't need to reread it. If we're not in streaming recovery mode yet, + * partially-filled page doesn't need to be reread because it is the + * last valid page. + */ + if (nextRecord != NULL && InStreamingRecovery && + XLByteLE(receivedUpto, EndRecPtr)) + { + nextRecord = NULL; + needReread = true; + } + ReadRecPtr = *RecPtr; /* needn't worry about XLOG SWITCH, it can't cross page boundaries */ return record; @@ -3781,6 +3961,21 @@ got_record:; nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len)); EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); + + /* + * Check whether the current page needs to be read again. If there is no + * unread record in the current page (nextRecord == NULL), obviously we + * don't need to reread it. If we're not in streaming recovery mode yet, + * partially-filled page doesn't need to be reread because it is the last + * valid page. + */ + if (nextRecord != NULL && InStreamingRecovery && + XLByteLE(receivedUpto, EndRecPtr)) + { + nextRecord = NULL; + needReread = true; + } + ReadRecPtr = *RecPtr; memcpy(buffer, record, total_len); @@ -3793,6 +3988,7 @@ got_record:; EndRecPtr.xrecoff += XLogSegSize - 1; EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize; nextRecord = NULL; /* definitely not on same page */ + needReread = false; /* * Pretend that readBuf contains the last page of the segment. This is @@ -4587,6 +4783,16 @@ UpdateControlFile(void) } /* + * Returns the unique system identifier from control file. + */ +uint64 +GetSystemIdentifier(void) +{ + Assert(ControlFile != NULL); + return ControlFile->system_identifier; +} + +/* * Initialization of shared memory for XLOG */ Size @@ -4822,7 +5028,7 @@ str_time(pg_time_t tnow) /* * See if there is a recovery command file (recovery.conf), and if so - * read in parameters for archive recovery. + * read in parameters for archive recovery and XLOG streaming. * * XXX longer term intention is to expand this to * cater for additional parameters and controls @@ -4974,6 +5180,29 @@ readRecoveryCommandFile(void) ereport(LOG, (errmsg("recovery_target_inclusive = %s", tok2))); } + else if (strcmp(tok1, "standby_mode") == 0) + { + if (!parse_bool(tok2, &StandbyMode)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("parameter \"standby_mode\" requires a Boolean value"))); + ereport(LOG, + (errmsg("standby_mode = '%s'", tok2))); + } + else if (strcmp(tok1, "primary_conninfo") == 0) + { + PrimaryConnInfo = pstrdup(tok2); + ereport(LOG, + (errmsg("primary_conninfo = '%s'", + PrimaryConnInfo))); + } + else if (strcmp(tok1, "trigger_file") == 0) + { + TriggerFile = pstrdup(tok2); + ereport(LOG, + (errmsg("trigger_file = '%s'", + TriggerFile))); + } else ereport(FATAL, (errmsg("unrecognized recovery parameter \"%s\"", @@ -4988,10 +5217,10 @@ readRecoveryCommandFile(void) cmdline), errhint("Lines should have the format parameter = 'value'."))); - /* Check that required parameters were supplied */ - if (recoveryRestoreCommand == NULL) + /* If not in standby mode, restore_command must be supplied */ + if (!StandbyMode && recoveryRestoreCommand == NULL) ereport(FATAL, - (errmsg("recovery command file \"%s\" did not specify restore_command", + (errmsg("recovery command file \"%s\" did not specify restore_command nor standby_mode", RECOVERY_COMMAND_FILE))); /* Enable fetching from archive recovery area */ @@ -5452,6 +5681,9 @@ StartupXLOG(void) recoveryTargetTLI, ControlFile->checkPointCopy.ThisTimeLineID))); + /* Save the selected recovery target timeline ID in shared memory */ + XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; + if (read_backup_label(&checkPointLoc)) { /* @@ -5482,6 +5714,7 @@ StartupXLOG(void) * to pg_control is broken, try the next-to-last one. */ checkPointLoc = ControlFile->checkPoint; + RedoStartLSN = ControlFile->checkPointCopy.redo; record = ReadCheckpointRecord(checkPointLoc, 1); if (record != NULL) { @@ -5489,6 +5722,15 @@ StartupXLOG(void) (errmsg("checkpoint record is at %X/%X", checkPointLoc.xlogid, checkPointLoc.xrecoff))); } + else if (InStreamingRecovery) + { + /* + * The last valid checkpoint record required for a streaming + * recovery exists in neither standby nor the primary. + */ + ereport(PANIC, + (errmsg("could not locate a valid checkpoint record"))); + } else { checkPointLoc = ControlFile->prevCheckPoint; @@ -5688,12 +5930,12 @@ StartupXLOG(void) if (XLByteLT(checkPoint.redo, RecPtr)) { /* back up to find the record */ - record = ReadRecord(&(checkPoint.redo), PANIC); + record = FetchRecord(&(checkPoint.redo), PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = ReadRecord(NULL, LOG); + record = FetchRecord(NULL, LOG, false); } if (record != NULL) @@ -5706,9 +5948,10 @@ StartupXLOG(void) /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; - /* initialize shared replayEndRecPtr */ + /* initialize shared replayEndRecPtr and recoveryLastRecPtr */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->replayEndRecPtr = ReadRecPtr; + xlogctl->recoveryLastRecPtr = ReadRecPtr; SpinLockRelease(&xlogctl->info_lck); InRedo = true; @@ -5762,14 +6005,8 @@ StartupXLOG(void) } #endif - /* - * Check if we were requested to re-read config file. - */ - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } + /* Handle interrupt signals of startup process */ + HandleStartupProcInterrupts(); /* * Have we passed our safe starting point? @@ -5841,9 +6078,17 @@ StartupXLOG(void) /* Pop the error context stack */ error_context_stack = errcontext.previous; + /* + * Update shared recoveryLastRecPtr after this record has been + * replayed. + */ + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->recoveryLastRecPtr = EndRecPtr; + SpinLockRelease(&xlogctl->info_lck); + LastRec = ReadRecPtr; - record = ReadRecord(NULL, LOG); + record = FetchRecord(NULL, LOG, false); } while (record != NULL && recoveryContinue); /* @@ -5868,6 +6113,27 @@ StartupXLOG(void) } /* + * If we launched a WAL receiver, it should be gone by now. It will trump + * over the startup checkpoint and subsequent records if it's still alive, + * so be extra sure that it's gone. + */ + if (WalRcvInProgress()) + elog(PANIC, "wal receiver still active"); + + /* + * We are now done reading the xlog from stream. Turn off streaming + * recovery, and restart fetching the files (which would be required + * at end of recovery, e.g., timeline history file) from archive. + */ + if (InStreamingRecovery) + { + /* We are no longer in streaming recovery state */ + InStreamingRecovery = false; + ereport(LOG, + (errmsg("streaming recovery complete"))); + } + + /* * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ @@ -6241,7 +6507,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) return NULL; } - record = ReadRecord(&RecPtr, LOG); + record = FetchRecord(&RecPtr, LOG, true); if (record == NULL) { @@ -6388,6 +6654,26 @@ GetInsertRecPtr(void) } /* + * GetWriteRecPtr -- Returns the current write position. + * + * NOTE: The value returned lags behind the real write position. But, + * an approximation is enough for the current usage of this function. + */ +XLogRecPtr +GetWriteRecPtr(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr recptr; + + SpinLockAcquire(&xlogctl->info_lck); + recptr = xlogctl->LogwrtResult.Write; + SpinLockRelease(&xlogctl->info_lck); + + return recptr; +} + +/* * Get the time of the last xlog segment switch */ pg_time_t @@ -6444,6 +6730,16 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) } /* + * GetRecoveryTargetTLI - get the recovery target timeline ID + */ +TimeLineID +GetRecoveryTargetTLI(void) +{ + /* RecoveryTargetTLI doesn't change so we need no lock to copy it */ + return XLogCtl->RecoveryTargetTLI; +} + +/* * This must be called ONCE during postmaster or standalone-backend shutdown */ void @@ -6917,8 +7213,34 @@ CreateCheckPoint(int flags) smgrpostckpt(); /* - * Delete old log files (those no longer needed even for previous - * checkpoint). + * If there's connected standby servers doing XLOG streaming, don't + * delete XLOG files that have not been streamed to all of them yet. + * This does nothing to prevent them from being deleted when the + * standby is disconnected (e.g because of network problems), but at + * least it avoids an open replication connection from failing because + * of that. + */ + if ((_logId || _logSeg) && MaxWalSenders > 0) + { + XLogRecPtr oldest; + uint32 log; + uint32 seg; + + oldest = GetOldestWALSendPointer(); + if (oldest.xlogid != 0 || oldest.xrecoff != 0) + { + XLByteToSeg(oldest, log, seg); + if (log < _logId || (log == _logId && seg < _logSeg)) + { + _logId = log; + _logSeg = seg; + } + } + } + + /* + * Delete old log files (those no longer needed even for + * previous checkpoint or the standbys in XLOG streaming). */ if (_logId || _logSeg) { @@ -7036,6 +7358,8 @@ CreateRestartPoint(int flags) { XLogRecPtr lastCheckPointRecPtr; CheckPoint lastCheckPoint; + uint32 _logId; + uint32 _logSeg; /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -7106,6 +7430,12 @@ CreateRestartPoint(int flags) CheckPointGuts(lastCheckPoint.redo, flags); /* + * Select point at which we can truncate the xlog, which we base on the + * prior checkpoint's earliest info. + */ + XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); + + /* * Update pg_control, using current time. Check that it still shows * IN_ARCHIVE_RECOVERY state and an older checkpoint, else do nothing; * this is a quick hack to make sure nothing really bad happens if @@ -7123,6 +7453,34 @@ CreateRestartPoint(int flags) } LWLockRelease(ControlFileLock); + /* Are we doing recovery from XLOG stream? */ + if (!InStreamingRecovery) + InStreamingRecovery = WalRcvInProgress(); + + /* + * Delete old log files (those no longer needed even for previous + * checkpoint/restartpoint) to prevent the disk holding the xlog from + * growing full. We don't need do this during normal recovery, but during + * streaming recovery we have to or the disk will eventually fill up from + * old log files streamed from master. + */ + if (InStreamingRecovery && (_logId || _logSeg)) + { + XLogRecPtr endptr; + + /* Get the current (or recent) end of xlog */ + endptr = GetWalRcvWriteRecPtr(); + + PrevLogSeg(_logId, _logSeg); + RemoveOldXlogFiles(_logId, _logSeg, endptr); + + /* + * Make more log segments if needed. (Do this after recycling old log + * segments, since that may supply some of the needed files.) + */ + PreallocXlogFiles(endptr); + } + /* * Currently, there is no need to truncate pg_subtrans during recovery. If * we did do that, we will need to have called StartupSUBTRANS() already @@ -7495,36 +7853,39 @@ assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source) /* - * Issue appropriate kind of fsync (if any) on the current XLOG output file + * Issue appropriate kind of fsync (if any) for an XLOG output file. + * + * 'fd' is a file descriptor for the XLOG file to be fsync'd. + * 'log' and 'seg' are for error reporting purposes. */ -static void -issue_xlog_fsync(void) +void +issue_xlog_fsync(int fd, uint32 log, uint32 seg) { switch (sync_method) { case SYNC_METHOD_FSYNC: - if (pg_fsync_no_writethrough(openLogFile) != 0) + if (pg_fsync_no_writethrough(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fsync log file %u, segment %u: %m", - openLogId, openLogSeg))); + log, seg))); break; #ifdef HAVE_FSYNC_WRITETHROUGH case SYNC_METHOD_FSYNC_WRITETHROUGH: - if (pg_fsync_writethrough(openLogFile) != 0) + if (pg_fsync_writethrough(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fsync write-through log file %u, segment %u: %m", - openLogId, openLogSeg))); + log, seg))); break; #endif #ifdef HAVE_FDATASYNC case SYNC_METHOD_FDATASYNC: - if (pg_fdatasync(openLogFile) != 0) + if (pg_fdatasync(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fdatasync log file %u, segment %u: %m", - openLogId, openLogSeg))); + log, seg))); break; #endif case SYNC_METHOD_OPEN: @@ -8021,6 +8382,48 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS) } /* + * Report the last WAL receive location (same format as pg_start_backup etc) + * + * This is useful for determining how much of WAL is guaranteed to be received + * and synced to disk by walreceiver. + */ +Datum +pg_last_xlog_receive_location(PG_FUNCTION_ARGS) +{ + XLogRecPtr recptr; + char location[MAXFNAMELEN]; + + recptr = GetWalRcvWriteRecPtr(); + + snprintf(location, sizeof(location), "%X/%X", + recptr.xlogid, recptr.xrecoff); + PG_RETURN_TEXT_P(cstring_to_text(location)); +} + +/* + * Report the last WAL replay location (same format as pg_start_backup etc) + * + * This is useful for determining how much of WAL is visible to read-only + * connections during recovery. + */ +Datum +pg_last_xlog_replay_location(PG_FUNCTION_ARGS) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr recptr; + char location[MAXFNAMELEN]; + + SpinLockAcquire(&xlogctl->info_lck); + recptr = xlogctl->recoveryLastRecPtr; + SpinLockRelease(&xlogctl->info_lck); + + snprintf(location, sizeof(location), "%X/%X", + recptr.xlogid, recptr.xrecoff); + PG_RETURN_TEXT_P(cstring_to_text(location)); +} + +/* * Compute an xlog file name and decimal byte offset given a WAL location, * such as is returned by pg_stop_backup() or pg_xlog_switch(). * @@ -8143,12 +8546,12 @@ pg_xlogfile_name(PG_FUNCTION_ARGS) * point, we will fail to restore a consistent database state. * * Returns TRUE if a backup_label was found (and fills the checkpoint - * location into *checkPointLoc); returns FALSE if not. + * location and its REDO location into *checkPointLoc and RedoStartLSN, + * respectively); returns FALSE if not. */ static bool read_backup_label(XLogRecPtr *checkPointLoc) { - XLogRecPtr startpoint; char startxlogfilename[MAXFNAMELEN]; TimeLineID tli; FILE *lfp; @@ -8174,7 +8577,7 @@ read_backup_label(XLogRecPtr *checkPointLoc) * format). */ if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c", - &startpoint.xlogid, &startpoint.xrecoff, &tli, + &RedoStartLSN.xlogid, &RedoStartLSN.xrecoff, &tli, startxlogfilename, &ch) != 5 || ch != '\n') ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -8319,6 +8722,25 @@ StartupProcShutdownHandler(SIGNAL_ARGS) shutdown_requested = true; } +/* Handle SIGHUP and SIGTERM signals of startup process */ +void +HandleStartupProcInterrupts(void) +{ + /* + * Check if we were requested to re-read config file. + */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + /* + * Check if we were requested to exit without finishing recovery. + */ + if (shutdown_requested) + proc_exit(1); +} + /* Main entry point for startup process */ void StartupProcessMain(void) |