summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c234
1 files changed, 187 insertions, 47 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fc30a52d496..ec40c0b7c42 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -302,11 +302,6 @@ static bool doPageWrites;
* so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are:
*
- * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
- * It is only held while initializing and changing the mapping. If the
- * contents of the buffer being replaced haven't been written yet, the mapping
- * lock is released while the write is done, and reacquired afterwards.
- *
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush).
*
@@ -473,21 +468,37 @@ typedef struct XLogCtlData
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
/*
- * Latest initialized page in the cache (last byte position + 1).
+ * First initialized page in the cache (first byte position).
+ */
+ XLogRecPtr InitializedFrom;
+
+ /*
+ * Latest reserved for inititalization page in the cache (last byte
+ * position + 1).
*
- * To change the identity of a buffer (and InitializedUpTo), you need to
- * hold WALBufMappingLock. To change the identity of a buffer that's
+ * To change the identity of a buffer, you need to advance
+ * InitializeReserved first. To change the identity of a buffer that's
* still dirty, the old page needs to be written out first, and for that
* you need WALWriteLock, and you need to ensure that there are no
* in-progress insertions to the page by calling
* WaitXLogInsertionsToFinish().
*/
- XLogRecPtr InitializedUpTo;
+ pg_atomic_uint64 InitializeReserved;
+
+ /*
+ * Latest initialized page in the cache (last byte position + 1).
+ *
+ * InitializedUpTo is updated after the buffer initialization. After
+ * update, waiters got notification using InitializedUpToCondVar.
+ */
+ pg_atomic_uint64 InitializedUpTo;
+ ConditionVariable InitializedUpToCondVar;
/*
* These values do not change after startup, although the pointed-to pages
- * and xlblocks values certainly do. xlblocks values are protected by
- * WALBufMappingLock.
+ * and xlblocks values certainly do. xlblocks values are changed
+ * lock-free according to the check for the xlog write position and are
+ * accompanied by changes of InitializeReserved and InitializedUpTo.
*/
char *pages; /* buffers for unwritten XLOG pages */
pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +821,9 @@ XLogInsertRecord(XLogRecData *rdata,
* fullPageWrites from changing until the insertion is finished.
*
* Step 2 can usually be done completely in parallel. If the required WAL
- * page is not initialized yet, you have to grab WALBufMappingLock to
- * initialize it, but the WAL writer tries to do that ahead of insertions
- * to avoid that from happening in the critical path.
+ * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
+ * which will ensure it is initialized. But the WAL writer tries to do that
+ * ahead of insertions to avoid that from happening in the critical path.
*
*----------
*/
@@ -1991,32 +2002,79 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
XLogRecPtr NewPageBeginPtr;
XLogPageHeader NewPage;
+ XLogRecPtr ReservedPtr;
int npages pg_attribute_unused() = 0;
- LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-
/*
- * Now that we have the lock, check if someone initialized the page
- * already.
+ * We must run the loop below inside the critical section as we expect
+ * XLogCtl->InitializedUpTo to eventually keep up. The most of callers
+ * already run inside the critical section. Except for WAL writer, which
+ * passed 'opportunistic == true', and therefore we don't perform
+ * operations that could error out.
+ *
+ * Start an explicit critical section anyway though.
+ */
+ Assert(CritSectionCount > 0 || opportunistic);
+ START_CRIT_SECTION();
+
+ /*--
+ * Loop till we get all the pages in WAL buffer before 'upto' reserved for
+ * initialization. Multiple process can initialize different buffers with
+ * this loop in parallel as following.
+ *
+ * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
+ * 2. Initialize the reserved page.
+ * 3. Attempt to advance XLogCtl->InitializedUpTo,
*/
- while (upto >= XLogCtl->InitializedUpTo || opportunistic)
+ ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
+ while (upto >= ReservedPtr || opportunistic)
{
- nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
+ Assert(ReservedPtr % XLOG_BLCKSZ == 0);
/*
- * Get ending-offset of the buffer page we need to replace (this may
- * be zero if the buffer hasn't been used yet). Fall through if it's
- * already written out.
+ * Get ending-offset of the buffer page we need to replace.
+ *
+ * We don't lookup into xlblocks, but rather calculate position we
+ * must wait to be written. If it was written, xlblocks will have this
+ * position (or uninitialized)
*/
- OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
- if (LogwrtResult.Write < OldPageRqstPtr)
+ if (ReservedPtr + XLOG_BLCKSZ > XLogCtl->InitializedFrom + XLOG_BLCKSZ * XLOGbuffers)
+ OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers;
+ else
+ OldPageRqstPtr = InvalidXLogRecPtr;
+
+ if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
{
/*
- * Nope, got work to do. If we just want to pre-initialize as much
- * as we can without flushing, give up now.
+ * If we just want to pre-initialize as much as we can without
+ * flushing, give up now.
*/
- if (opportunistic)
- break;
+ upto = ReservedPtr - 1;
+ break;
+ }
+
+ /*
+ * Attempt to reserve the page for initialization. Failure means that
+ * this page got reserved by another process.
+ */
+ if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
+ &ReservedPtr,
+ ReservedPtr + XLOG_BLCKSZ))
+ continue;
+
+ /*
+ * Wait till page gets correctly initialized up to OldPageRqstPtr.
+ */
+ nextidx = XLogRecPtrToBufIdx(ReservedPtr);
+ while (pg_atomic_read_u64(&XLogCtl->InitializedUpTo) < OldPageRqstPtr)
+ ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
+ ConditionVariableCancelSleep();
+ Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageRqstPtr);
+
+ /* Fall through if it's already written out. */
+ if (LogwrtResult.Write < OldPageRqstPtr)
+ {
+ /* Nope, got work to do. */
/* Advance shared memory write request position */
SpinLockAcquire(&XLogCtl->info_lck);
@@ -2031,14 +2089,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write < OldPageRqstPtr)
{
- /*
- * Must acquire write lock. Release WALBufMappingLock first,
- * to make sure that all insertions that we need to wait for
- * can finish (up to this same position). Otherwise we risk
- * deadlock.
- */
- LWLockRelease(WALBufMappingLock);
-
WaitXLogInsertionsToFinish(OldPageRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@@ -2060,9 +2110,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
pgWalUsage.wal_buffers_full++;
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
}
- /* Re-acquire WALBufMappingLock and retry */
- LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
- continue;
}
}
@@ -2070,11 +2117,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
* Now the next buffer slot is free and we can set it up to be the
* next output page.
*/
- NewPageBeginPtr = XLogCtl->InitializedUpTo;
+ NewPageBeginPtr = ReservedPtr;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
- Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
-
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
/*
@@ -2138,12 +2183,100 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
*/
pg_write_barrier();
+ /*-----
+ * Update the value of XLogCtl->xlblocks[nextidx] and try to advance
+ * XLogCtl->InitializedUpTo in a lock-less manner.
+ *
+ * First, let's provide a formal proof of the algorithm. Let it be 'n'
+ * process with the following variables in shared memory:
+ * f - an array of 'n' boolean flags,
+ * v - atomic integer variable.
+ *
+ * Also, let
+ * i - a number of a process,
+ * j - local integer variable,
+ * CAS(var, oldval, newval) - compare-and-swap atomic operation
+ * returning true on success,
+ * write_barrier()/read_barrier() - memory barriers.
+ *
+ * The pseudocode for each process is the following.
+ *
+ * j := i
+ * f[i] := true
+ * write_barrier()
+ * while CAS(v, j, j + 1):
+ * j := j + 1
+ * read_barrier()
+ * if not f[j]:
+ * break
+ *
+ * Let's prove that v eventually reaches the value of n.
+ * 1. Prove by contradiction. Assume v doesn't reach n and stucks
+ * on k, where k < n.
+ * 2. Process k attempts CAS(v, k, k + 1). 1). If, as we assumed, v
+ * gets stuck at k, then this CAS operation must fail. Therefore,
+ * v < k when process k attempts CAS(v, k, k + 1).
+ * 3. If, as we assumed, v gets stuck at k, then the value k of v
+ * must be achieved by some process m, where m < k. The process
+ * m must observe f[k] == false. Otherwise, it will later attempt
+ * CAS(v, k, k + 1) with success.
+ * 4. Therefore, corresponding read_barrier() (while j == k) on
+ * process m happend before write_barrier() of process k. But then
+ * process k attempts CAS(v, k, k + 1) after process m successfully
+ * incremented v to k, and that CAS operation must succeed.
+ * That leads to a contradiction. So, there is no such k (k < n)
+ * where v gets stuck. Q.E.D.
+ *
+ * To apply this proof to the code below, we assume
+ * XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ
+ * granularity. We also assume setting XLogCtl->xlblocks[nextidx] to
+ * NewPageEndPtr to play the role of setting f[i] to true. Also, note
+ * that processes can't concurrently map different xlog locations to
+ * the same nextidx because we previously requested that
+ * XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can
+ * be taken for initialization only once the previous initialization
+ * takes effect on XLogCtl->InitializedUpTo.
+ */
+
pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
- XLogCtl->InitializedUpTo = NewPageEndPtr;
+
+ pg_write_barrier();
+
+ while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
+ {
+ NewPageBeginPtr = NewPageEndPtr;
+ NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
+ nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
+
+ pg_read_barrier();
+
+ if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
+ {
+ /*
+ * Page at nextidx wasn't initialized yet, so we cann't move
+ * InitializedUpto further. It will be moved by backend which
+ * will initialize nextidx.
+ */
+ ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
+ break;
+ }
+ }
npages++;
}
- LWLockRelease(WALBufMappingLock);
+
+ END_CRIT_SECTION();
+
+ /*
+ * All the pages in WAL buffer before 'upto' were reserved for
+ * initialization. However, some pages might be reserved by concurrent
+ * processes. Wait till they finish initialization.
+ */
+ while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
+ ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
+ ConditionVariableCancelSleep();
+
+ pg_read_barrier();
#ifdef WAL_DEBUG
if (XLOG_DEBUG && npages > 0)
@@ -5071,6 +5204,10 @@ XLOGShmemInit(void)
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
+
+ pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
+ pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
+ ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
}
/*
@@ -6090,7 +6227,8 @@ StartupXLOG(void)
memset(page + len, 0, XLOG_BLCKSZ - len);
pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
- XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
+ pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
+ XLogCtl->InitializedFrom = endOfRecoveryInfo->lastPageBeginPtr;
}
else
{
@@ -6099,8 +6237,10 @@ StartupXLOG(void)
* let the first attempt to insert a log record to initialize the next
* buffer.
*/
- XLogCtl->InitializedUpTo = EndOfLog;
+ pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
+ XLogCtl->InitializedFrom = EndOfLog;
}
+ pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
/*
* Update local and shared status. This is OK to do without any locks