summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c538
1 files changed, 480 insertions, 58 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9a13a98cbc1..a0dddcad813 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.357 2010/01/04 12:50:49 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.358 2010/01/15 09:19:00 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -41,6 +41,8 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
+#include "replication/walreceiver.h"
+#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -142,6 +144,16 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec;
/*
+ * Are we doing recovery from XLOG stream? If so, we recover without using
+ * offline XLOG archives even though InArchiveRecovery==true. This flag is
+ * used only in standby mode.
+ */
+static bool InStreamingRecovery = false;
+
+/* The current log page is partially-filled, and so needs to be read again? */
+static bool needReread = false;
+
+/*
* Local copy of SharedRecoveryInProgress variable. True actually means "not
* known, need to check the shared state".
*/
@@ -165,7 +177,7 @@ static bool InArchiveRecovery = false;
/* Was the last xlog file restored from archive, or local? */
static bool restoredFromArchive = false;
-/* options taken from recovery.conf */
+/* options taken from recovery.conf for archive recovery */
static char *recoveryRestoreCommand = NULL;
static char *recoveryEndCommand = NULL;
static bool recoveryTarget = false;
@@ -175,6 +187,11 @@ static TransactionId recoveryTargetXid;
static TimestampTz recoveryTargetTime;
static TimestampTz recoveryLastXTime = 0;
+/* options taken from recovery.conf for XLOG streaming */
+static bool StandbyMode = false;
+static char *PrimaryConnInfo = NULL;
+char *TriggerFile = NULL;
+
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
static TransactionId recoveryStopXid;
static TimestampTz recoveryStopTime;
@@ -229,6 +246,18 @@ XLogRecPtr XactLastRecEnd = {0, 0};
*/
static XLogRecPtr RedoRecPtr;
+/*
+ * RedoStartLSN points to the checkpoint's REDO location which is specified
+ * in a backup label file, backup history file or control file. In standby
+ * mode, XLOG streaming usually starts from the position where an invalid
+ * record was found. But if we fail to read even the initial checkpoint
+ * record, we use the REDO location instead of the checkpoint location as
+ * the start position of XLOG streaming. Otherwise we would have to jump
+ * backwards to the REDO location after reading the checkpoint record,
+ * because the REDO record can precede the checkpoint record.
+ */
+static XLogRecPtr RedoStartLSN = {0, 0};
+
/*----------
* Shared-memory data structures for XLOG control
*
@@ -349,6 +378,7 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */
TimeLineID ThisTimeLineID;
+ TimeLineID RecoveryTargetTLI;
/*
* SharedRecoveryInProgress indicates if we're still in crash or archive
@@ -369,6 +399,8 @@ typedef struct XLogCtlData
XLogRecPtr replayEndRecPtr;
/* timestamp of last record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* end+1 of the last record replayed */
+ XLogRecPtr recoveryLastRecPtr;
slock_t info_lck; /* locks shared variables shown above */
} XLogCtlData;
@@ -481,12 +513,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
static bool AdvanceXLInsertBuffer(bool new_segment);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
-static int XLogFileInit(uint32 log, uint32 seg,
- bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock);
-static int XLogFileOpen(uint32 log, uint32 seg);
static int XLogFileRead(uint32 log, uint32 seg, int emode);
static void XLogFileClose(void);
static bool RestoreArchivedFile(char *path, const char *xlogfname,
@@ -497,6 +526,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
+static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
@@ -513,7 +543,6 @@ static char *str_time(pg_time_t tnow);
#ifdef WAL_DEBUG
static void xlog_outrec(StringInfo buf, XLogRecord *record);
#endif
-static void issue_xlog_fsync(void);
static void pg_start_backup_callback(int code, Datum arg);
static bool read_backup_label(XLogRecPtr *checkPointLoc);
static void rm_redo_error_callback(void *arg);
@@ -1690,7 +1719,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
*/
if (finishing_seg || (xlog_switch && last_iteration))
{
- issue_xlog_fsync();
+ issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
@@ -1754,7 +1783,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
openLogFile = XLogFileOpen(openLogId, openLogSeg);
openLogOff = 0;
}
- issue_xlog_fsync();
+ issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
}
LogwrtResult.Flush = LogwrtResult.Write;
}
@@ -2189,7 +2218,7 @@ XLogNeedsFlush(XLogRecPtr record)
* take down the system on failure). They will promote to PANIC if we are
* in a critical section.
*/
-static int
+int
XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock)
{
@@ -2536,7 +2565,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
/*
* Open a pre-existing logfile segment for writing.
*/
-static int
+int
XLogFileOpen(uint32 log, uint32 seg)
{
char path[MAXPGPATH];
@@ -2586,7 +2615,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
XLogFileName(xlogfname, tli, log, seg);
- if (InArchiveRecovery)
+ if (InArchiveRecovery && !InStreamingRecovery)
{
/* Report recovery progress in PS display */
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
@@ -2641,12 +2670,13 @@ XLogFileClose(void)
/*
* WAL segment files will not be re-read in normal operation, so we advise
* the OS to release any cached pages. But do not do so if WAL archiving
- * is active, because archiver process could use the cache to read the WAL
- * segment. Also, don't bother with it if we are using O_DIRECT, since
- * the kernel is presumably not caching in that case.
+ * or streaming is active, because archiver and walsender process could use
+ * the cache to read the WAL segment. Also, don't bother with it if we
+ * are using O_DIRECT, since the kernel is presumably not caching in that
+ * case.
*/
#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
- if (!XLogArchivingActive() &&
+ if (!XLogIsNeeded() &&
(get_sync_bit(sync_method) & PG_O_DIRECT) == 0)
(void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
#endif
@@ -2689,6 +2719,10 @@ RestoreArchivedFile(char *path, const char *xlogfname,
uint32 restartLog;
uint32 restartSeg;
+ /* In standby mode, restore_command might not be supplied */
+ if (StandbyMode && recoveryRestoreCommand == NULL)
+ goto not_available;
+
/*
* When doing archive recovery, we always prefer an archived log file even
* if a file of the same name exists in XLOGDIR. The reason is that the
@@ -2913,6 +2947,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
(errmsg("could not restore file \"%s\" from archive: return code %d",
xlogfname, rc)));
+not_available:
/*
* if an archived file is not available, there might still be a version of
* this file in XLOGDIR, so return that as the filename to open.
@@ -3117,7 +3152,18 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
{
- if (XLogArchiveCheckDone(xlde->d_name))
+ /*
+ * Normally we don't delete old XLOG files during recovery to
+ * avoid accidentally deleting a file that looks stale due to a
+ * bug or hardware issue, but in fact contains important data.
+ * During streaming recovery, however, we will eventually fill the
+ * disk if we never clean up, so we have to. That's not an issue
+ * with file-based archive recovery because in that case we
+ * restore one XLOG file at a time, on-demand, and with a
+ * different filename that can't be confused with regular XLOG
+ * files.
+ */
+ if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name))
{
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
@@ -3428,19 +3474,92 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
}
/*
+ * Attempt to fetch an XLOG record.
+ *
+ * If RecPtr is not NULL, try to fetch a record at that position. Otherwise
+ * try to fetch a record just after the last one previously read.
+ *
+ * In standby mode, if we failed in reading a valid record and are not doing
+ * recovery from XLOG stream yet, we ignore the failure and start walreceiver
+ * process to fetch the record from the primary. Otherwise, returns NULL,
+ * or fails if emode is PANIC. (emode must be either PANIC or LOG.)
+ *
+ * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
+ * this case, if we have to start XLOG streaming, we use RedoStartLSN as the
+ * streaming start position instead of RecPtr.
+ *
+ * The record is copied into readRecordBuf, so that on successful return,
+ * the returned record pointer always points there.
+ */
+static XLogRecord *
+FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
+{
+ if (StandbyMode && !InStreamingRecovery)
+ {
+ XLogRecord *record;
+ XLogRecPtr startlsn;
+ bool haveNextRecord = (nextRecord != NULL);
+
+ /* An invalid record is OK here, so we set emode to DEBUG2 */
+ record = ReadRecord(RecPtr, DEBUG2);
+ if (record != NULL)
+ return record;
+
+ /*
+ * Start XLOG streaming if there is no more valid records available
+ * in the archive.
+ *
+ * We need to calculate the start position of XLOG streaming. If we
+ * read a record in the middle of a segment which doesn't exist in
+ * pg_xlog, we use the start of the segment as the start position.
+ * That prevents a broken segment (i.e., with no records in the
+ * first half of a segment) from being created by XLOG streaming,
+ * which might cause trouble later on if the segment is e.g
+ * archived.
+ */
+ startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
+ if (startlsn.xrecoff % XLogSegSize != 0)
+ {
+ char xlogpath[MAXPGPATH];
+ struct stat stat_buf;
+ uint32 log;
+ uint32 seg;
+
+ XLByteToSeg(startlsn, log, seg);
+ XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
+
+ if (stat(xlogpath, &stat_buf) != 0)
+ startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
+ }
+ RequestXLogStreaming(startlsn, PrimaryConnInfo);
+
+ /* Needs to read the current page again if the next record is in it */
+ needReread = haveNextRecord;
+ nextRecord = NULL;
+
+ InStreamingRecovery = true;
+ ereport(LOG,
+ (errmsg("starting streaming recovery at %X/%X",
+ startlsn.xlogid, startlsn.xrecoff)));
+ }
+
+ return ReadRecord(RecPtr, emode);
+}
+
+/*
* Attempt to read an XLOG record.
*
* If RecPtr is not NULL, try to read a record at that position. Otherwise
* try to read a record just after the last one previously read.
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
- * (emode must be either PANIC or LOG.)
+ * (emode must be either PANIC, LOG or DEBUG2.)
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
*/
static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode)
+ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
{
XLogRecord *record;
char *buffer;
@@ -3451,6 +3570,19 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
uint32 targetPageOff;
uint32 targetRecOff;
uint32 pageHeaderSize;
+ XLogRecPtr receivedUpto = {0,0};
+ bool finished;
+ int emode;
+
+ /*
+ * We don't expect any invalid records during streaming recovery: we
+ * should never hit the end of WAL because we wait for it to be streamed.
+ * Therefore treat any broken WAL as PANIC, instead of failing over.
+ */
+ if (InStreamingRecovery)
+ emode = PANIC;
+ else
+ emode = emode_arg;
if (readBuf == NULL)
{
@@ -3474,14 +3606,13 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
record = nextRecord;
goto got_record;
}
- /* align old recptr to next page */
- if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
- tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
- if (tmpRecPtr.xrecoff >= XLogFileSize)
- {
- (tmpRecPtr.xlogid)++;
- tmpRecPtr.xrecoff = 0;
- }
+
+ /*
+ * Align old recptr to next page if the current page is filled and
+ * doesn't need to be read again.
+ */
+ if (!needReread)
+ NextLogPage(tmpRecPtr);
/* We will account for page header size below */
}
else
@@ -3507,6 +3638,21 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
close(readFile);
readFile = -1;
}
+
+ /* Is the target record ready yet? */
+ if (InStreamingRecovery)
+ {
+ receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
+ if (finished)
+ {
+ if (emode_arg == PANIC)
+ ereport(PANIC,
+ (errmsg("streaming recovery ended")));
+ else
+ return NULL;
+ }
+ }
+
XLByteToSeg(*RecPtr, readId, readSeg);
if (readFile < 0)
{
@@ -3539,9 +3685,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
}
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
- if (readOff != targetPageOff)
+ if (readOff != targetPageOff || needReread)
{
readOff = targetPageOff;
+ needReread = false;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
ereport(emode,
@@ -3697,6 +3844,7 @@ got_record:;
{
/* Need to reassemble record */
XLogContRecord *contrecord;
+ XLogRecPtr nextpagelsn = *RecPtr;
uint32 gotlen = len;
memcpy(buffer, record, len);
@@ -3704,6 +3852,23 @@ got_record:;
buffer += len;
for (;;)
{
+ /* Is the next page ready yet? */
+ if (InStreamingRecovery)
+ {
+ if (gotlen != len)
+ nextpagelsn.xrecoff += XLOG_BLCKSZ;
+ NextLogPage(nextpagelsn);
+ receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
+ if (finished)
+ {
+ if (emode_arg == PANIC)
+ ereport(PANIC,
+ (errmsg("streaming recovery ended")));
+ else
+ return NULL;
+ }
+ }
+
readOff += XLOG_BLCKSZ;
if (readOff >= XLogSegSize)
{
@@ -3768,6 +3933,21 @@ got_record:;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
+
+ /*
+ * Check whether the current page needs to be read again. If there is no
+ * unread record in the current page (nextRecord == NULL), obviously we
+ * don't need to reread it. If we're not in streaming recovery mode yet,
+ * partially-filled page doesn't need to be reread because it is the
+ * last valid page.
+ */
+ if (nextRecord != NULL && InStreamingRecovery &&
+ XLByteLE(receivedUpto, EndRecPtr))
+ {
+ nextRecord = NULL;
+ needReread = true;
+ }
+
ReadRecPtr = *RecPtr;
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record;
@@ -3781,6 +3961,21 @@ got_record:;
nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
+
+ /*
+ * Check whether the current page needs to be read again. If there is no
+ * unread record in the current page (nextRecord == NULL), obviously we
+ * don't need to reread it. If we're not in streaming recovery mode yet,
+ * partially-filled page doesn't need to be reread because it is the last
+ * valid page.
+ */
+ if (nextRecord != NULL && InStreamingRecovery &&
+ XLByteLE(receivedUpto, EndRecPtr))
+ {
+ nextRecord = NULL;
+ needReread = true;
+ }
+
ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len);
@@ -3793,6 +3988,7 @@ got_record:;
EndRecPtr.xrecoff += XLogSegSize - 1;
EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
nextRecord = NULL; /* definitely not on same page */
+ needReread = false;
/*
* Pretend that readBuf contains the last page of the segment. This is
@@ -4587,6 +4783,16 @@ UpdateControlFile(void)
}
/*
+ * Returns the unique system identifier from control file.
+ */
+uint64
+GetSystemIdentifier(void)
+{
+ Assert(ControlFile != NULL);
+ return ControlFile->system_identifier;
+}
+
+/*
* Initialization of shared memory for XLOG
*/
Size
@@ -4822,7 +5028,7 @@ str_time(pg_time_t tnow)
/*
* See if there is a recovery command file (recovery.conf), and if so
- * read in parameters for archive recovery.
+ * read in parameters for archive recovery and XLOG streaming.
*
* XXX longer term intention is to expand this to
* cater for additional parameters and controls
@@ -4974,6 +5180,29 @@ readRecoveryCommandFile(void)
ereport(LOG,
(errmsg("recovery_target_inclusive = %s", tok2)));
}
+ else if (strcmp(tok1, "standby_mode") == 0)
+ {
+ if (!parse_bool(tok2, &StandbyMode))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("parameter \"standby_mode\" requires a Boolean value")));
+ ereport(LOG,
+ (errmsg("standby_mode = '%s'", tok2)));
+ }
+ else if (strcmp(tok1, "primary_conninfo") == 0)
+ {
+ PrimaryConnInfo = pstrdup(tok2);
+ ereport(LOG,
+ (errmsg("primary_conninfo = '%s'",
+ PrimaryConnInfo)));
+ }
+ else if (strcmp(tok1, "trigger_file") == 0)
+ {
+ TriggerFile = pstrdup(tok2);
+ ereport(LOG,
+ (errmsg("trigger_file = '%s'",
+ TriggerFile)));
+ }
else
ereport(FATAL,
(errmsg("unrecognized recovery parameter \"%s\"",
@@ -4988,10 +5217,10 @@ readRecoveryCommandFile(void)
cmdline),
errhint("Lines should have the format parameter = 'value'.")));
- /* Check that required parameters were supplied */
- if (recoveryRestoreCommand == NULL)
+ /* If not in standby mode, restore_command must be supplied */
+ if (!StandbyMode && recoveryRestoreCommand == NULL)
ereport(FATAL,
- (errmsg("recovery command file \"%s\" did not specify restore_command",
+ (errmsg("recovery command file \"%s\" did not specify restore_command nor standby_mode",
RECOVERY_COMMAND_FILE)));
/* Enable fetching from archive recovery area */
@@ -5452,6 +5681,9 @@ StartupXLOG(void)
recoveryTargetTLI,
ControlFile->checkPointCopy.ThisTimeLineID)));
+ /* Save the selected recovery target timeline ID in shared memory */
+ XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
+
if (read_backup_label(&checkPointLoc))
{
/*
@@ -5482,6 +5714,7 @@ StartupXLOG(void)
* to pg_control is broken, try the next-to-last one.
*/
checkPointLoc = ControlFile->checkPoint;
+ RedoStartLSN = ControlFile->checkPointCopy.redo;
record = ReadCheckpointRecord(checkPointLoc, 1);
if (record != NULL)
{
@@ -5489,6 +5722,15 @@ StartupXLOG(void)
(errmsg("checkpoint record is at %X/%X",
checkPointLoc.xlogid, checkPointLoc.xrecoff)));
}
+ else if (InStreamingRecovery)
+ {
+ /*
+ * The last valid checkpoint record required for a streaming
+ * recovery exists in neither standby nor the primary.
+ */
+ ereport(PANIC,
+ (errmsg("could not locate a valid checkpoint record")));
+ }
else
{
checkPointLoc = ControlFile->prevCheckPoint;
@@ -5688,12 +5930,12 @@ StartupXLOG(void)
if (XLByteLT(checkPoint.redo, RecPtr))
{
/* back up to find the record */
- record = ReadRecord(&(checkPoint.redo), PANIC);
+ record = FetchRecord(&(checkPoint.redo), PANIC, false);
}
else
{
/* just have to read next record after CheckPoint */
- record = ReadRecord(NULL, LOG);
+ record = FetchRecord(NULL, LOG, false);
}
if (record != NULL)
@@ -5706,9 +5948,10 @@ StartupXLOG(void)
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
- /* initialize shared replayEndRecPtr */
+ /* initialize shared replayEndRecPtr and recoveryLastRecPtr */
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->replayEndRecPtr = ReadRecPtr;
+ xlogctl->recoveryLastRecPtr = ReadRecPtr;
SpinLockRelease(&xlogctl->info_lck);
InRedo = true;
@@ -5762,14 +6005,8 @@ StartupXLOG(void)
}
#endif
- /*
- * Check if we were requested to re-read config file.
- */
- if (got_SIGHUP)
- {
- got_SIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ /* Handle interrupt signals of startup process */
+ HandleStartupProcInterrupts();
/*
* Have we passed our safe starting point?
@@ -5841,9 +6078,17 @@ StartupXLOG(void)
/* Pop the error context stack */
error_context_stack = errcontext.previous;
+ /*
+ * Update shared recoveryLastRecPtr after this record has been
+ * replayed.
+ */
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->recoveryLastRecPtr = EndRecPtr;
+ SpinLockRelease(&xlogctl->info_lck);
+
LastRec = ReadRecPtr;
- record = ReadRecord(NULL, LOG);
+ record = FetchRecord(NULL, LOG, false);
} while (record != NULL && recoveryContinue);
/*
@@ -5868,6 +6113,27 @@ StartupXLOG(void)
}
/*
+ * If we launched a WAL receiver, it should be gone by now. It will trump
+ * over the startup checkpoint and subsequent records if it's still alive,
+ * so be extra sure that it's gone.
+ */
+ if (WalRcvInProgress())
+ elog(PANIC, "wal receiver still active");
+
+ /*
+ * We are now done reading the xlog from stream. Turn off streaming
+ * recovery, and restart fetching the files (which would be required
+ * at end of recovery, e.g., timeline history file) from archive.
+ */
+ if (InStreamingRecovery)
+ {
+ /* We are no longer in streaming recovery state */
+ InStreamingRecovery = false;
+ ereport(LOG,
+ (errmsg("streaming recovery complete")));
+ }
+
+ /*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
*/
@@ -6241,7 +6507,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
return NULL;
}
- record = ReadRecord(&RecPtr, LOG);
+ record = FetchRecord(&RecPtr, LOG, true);
if (record == NULL)
{
@@ -6388,6 +6654,26 @@ GetInsertRecPtr(void)
}
/*
+ * GetWriteRecPtr -- Returns the current write position.
+ *
+ * NOTE: The value returned lags behind the real write position. But,
+ * an approximation is enough for the current usage of this function.
+ */
+XLogRecPtr
+GetWriteRecPtr(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ recptr = xlogctl->LogwrtResult.Write;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return recptr;
+}
+
+/*
* Get the time of the last xlog segment switch
*/
pg_time_t
@@ -6444,6 +6730,16 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
}
/*
+ * GetRecoveryTargetTLI - get the recovery target timeline ID
+ */
+TimeLineID
+GetRecoveryTargetTLI(void)
+{
+ /* RecoveryTargetTLI doesn't change so we need no lock to copy it */
+ return XLogCtl->RecoveryTargetTLI;
+}
+
+/*
* This must be called ONCE during postmaster or standalone-backend shutdown
*/
void
@@ -6917,8 +7213,34 @@ CreateCheckPoint(int flags)
smgrpostckpt();
/*
- * Delete old log files (those no longer needed even for previous
- * checkpoint).
+ * If there's connected standby servers doing XLOG streaming, don't
+ * delete XLOG files that have not been streamed to all of them yet.
+ * This does nothing to prevent them from being deleted when the
+ * standby is disconnected (e.g because of network problems), but at
+ * least it avoids an open replication connection from failing because
+ * of that.
+ */
+ if ((_logId || _logSeg) && MaxWalSenders > 0)
+ {
+ XLogRecPtr oldest;
+ uint32 log;
+ uint32 seg;
+
+ oldest = GetOldestWALSendPointer();
+ if (oldest.xlogid != 0 || oldest.xrecoff != 0)
+ {
+ XLByteToSeg(oldest, log, seg);
+ if (log < _logId || (log == _logId && seg < _logSeg))
+ {
+ _logId = log;
+ _logSeg = seg;
+ }
+ }
+ }
+
+ /*
+ * Delete old log files (those no longer needed even for
+ * previous checkpoint or the standbys in XLOG streaming).
*/
if (_logId || _logSeg)
{
@@ -7036,6 +7358,8 @@ CreateRestartPoint(int flags)
{
XLogRecPtr lastCheckPointRecPtr;
CheckPoint lastCheckPoint;
+ uint32 _logId;
+ uint32 _logSeg;
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
@@ -7106,6 +7430,12 @@ CreateRestartPoint(int flags)
CheckPointGuts(lastCheckPoint.redo, flags);
/*
+ * Select point at which we can truncate the xlog, which we base on the
+ * prior checkpoint's earliest info.
+ */
+ XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
+
+ /*
* Update pg_control, using current time. Check that it still shows
* IN_ARCHIVE_RECOVERY state and an older checkpoint, else do nothing;
* this is a quick hack to make sure nothing really bad happens if
@@ -7123,6 +7453,34 @@ CreateRestartPoint(int flags)
}
LWLockRelease(ControlFileLock);
+ /* Are we doing recovery from XLOG stream? */
+ if (!InStreamingRecovery)
+ InStreamingRecovery = WalRcvInProgress();
+
+ /*
+ * Delete old log files (those no longer needed even for previous
+ * checkpoint/restartpoint) to prevent the disk holding the xlog from
+ * growing full. We don't need do this during normal recovery, but during
+ * streaming recovery we have to or the disk will eventually fill up from
+ * old log files streamed from master.
+ */
+ if (InStreamingRecovery && (_logId || _logSeg))
+ {
+ XLogRecPtr endptr;
+
+ /* Get the current (or recent) end of xlog */
+ endptr = GetWalRcvWriteRecPtr();
+
+ PrevLogSeg(_logId, _logSeg);
+ RemoveOldXlogFiles(_logId, _logSeg, endptr);
+
+ /*
+ * Make more log segments if needed. (Do this after recycling old log
+ * segments, since that may supply some of the needed files.)
+ */
+ PreallocXlogFiles(endptr);
+ }
+
/*
* Currently, there is no need to truncate pg_subtrans during recovery. If
* we did do that, we will need to have called StartupSUBTRANS() already
@@ -7495,36 +7853,39 @@ assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
/*
- * Issue appropriate kind of fsync (if any) on the current XLOG output file
+ * Issue appropriate kind of fsync (if any) for an XLOG output file.
+ *
+ * 'fd' is a file descriptor for the XLOG file to be fsync'd.
+ * 'log' and 'seg' are for error reporting purposes.
*/
-static void
-issue_xlog_fsync(void)
+void
+issue_xlog_fsync(int fd, uint32 log, uint32 seg)
{
switch (sync_method)
{
case SYNC_METHOD_FSYNC:
- if (pg_fsync_no_writethrough(openLogFile) != 0)
+ if (pg_fsync_no_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync log file %u, segment %u: %m",
- openLogId, openLogSeg)));
+ log, seg)));
break;
#ifdef HAVE_FSYNC_WRITETHROUGH
case SYNC_METHOD_FSYNC_WRITETHROUGH:
- if (pg_fsync_writethrough(openLogFile) != 0)
+ if (pg_fsync_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync write-through log file %u, segment %u: %m",
- openLogId, openLogSeg)));
+ log, seg)));
break;
#endif
#ifdef HAVE_FDATASYNC
case SYNC_METHOD_FDATASYNC:
- if (pg_fdatasync(openLogFile) != 0)
+ if (pg_fdatasync(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fdatasync log file %u, segment %u: %m",
- openLogId, openLogSeg)));
+ log, seg)));
break;
#endif
case SYNC_METHOD_OPEN:
@@ -8021,6 +8382,48 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
}
/*
+ * Report the last WAL receive location (same format as pg_start_backup etc)
+ *
+ * This is useful for determining how much of WAL is guaranteed to be received
+ * and synced to disk by walreceiver.
+ */
+Datum
+pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr recptr;
+ char location[MAXFNAMELEN];
+
+ recptr = GetWalRcvWriteRecPtr();
+
+ snprintf(location, sizeof(location), "%X/%X",
+ recptr.xlogid, recptr.xrecoff);
+ PG_RETURN_TEXT_P(cstring_to_text(location));
+}
+
+/*
+ * Report the last WAL replay location (same format as pg_start_backup etc)
+ *
+ * This is useful for determining how much of WAL is visible to read-only
+ * connections during recovery.
+ */
+Datum
+pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr recptr;
+ char location[MAXFNAMELEN];
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ recptr = xlogctl->recoveryLastRecPtr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ snprintf(location, sizeof(location), "%X/%X",
+ recptr.xlogid, recptr.xrecoff);
+ PG_RETURN_TEXT_P(cstring_to_text(location));
+}
+
+/*
* Compute an xlog file name and decimal byte offset given a WAL location,
* such as is returned by pg_stop_backup() or pg_xlog_switch().
*
@@ -8143,12 +8546,12 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
* point, we will fail to restore a consistent database state.
*
* Returns TRUE if a backup_label was found (and fills the checkpoint
- * location into *checkPointLoc); returns FALSE if not.
+ * location and its REDO location into *checkPointLoc and RedoStartLSN,
+ * respectively); returns FALSE if not.
*/
static bool
read_backup_label(XLogRecPtr *checkPointLoc)
{
- XLogRecPtr startpoint;
char startxlogfilename[MAXFNAMELEN];
TimeLineID tli;
FILE *lfp;
@@ -8174,7 +8577,7 @@ read_backup_label(XLogRecPtr *checkPointLoc)
* format).
*/
if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
- &startpoint.xlogid, &startpoint.xrecoff, &tli,
+ &RedoStartLSN.xlogid, &RedoStartLSN.xrecoff, &tli,
startxlogfilename, &ch) != 5 || ch != '\n')
ereport(FATAL,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -8319,6 +8722,25 @@ StartupProcShutdownHandler(SIGNAL_ARGS)
shutdown_requested = true;
}
+/* Handle SIGHUP and SIGTERM signals of startup process */
+void
+HandleStartupProcInterrupts(void)
+{
+ /*
+ * Check if we were requested to re-read config file.
+ */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+ /*
+ * Check if we were requested to exit without finishing recovery.
+ */
+ if (shutdown_requested)
+ proc_exit(1);
+}
+
/* Main entry point for startup process */
void
StartupProcessMain(void)