diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 234 |
1 files changed, 187 insertions, 47 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fc30a52d496..ec40c0b7c42 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -302,11 +302,6 @@ static bool doPageWrites; * so it's a plain spinlock. The other locks are held longer (potentially * over I/O operations), so we use LWLocks for them. These locks are: * - * WALBufMappingLock: must be held to replace a page in the WAL buffer cache. - * It is only held while initializing and changing the mapping. If the - * contents of the buffer being replaced haven't been written yet, the mapping - * lock is released while the write is done, and reacquired afterwards. - * * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or * XLogFlush). * @@ -473,21 +468,37 @@ typedef struct XLogCtlData pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ /* - * Latest initialized page in the cache (last byte position + 1). + * First initialized page in the cache (first byte position). + */ + XLogRecPtr InitializedFrom; + + /* + * Latest reserved for inititalization page in the cache (last byte + * position + 1). * - * To change the identity of a buffer (and InitializedUpTo), you need to - * hold WALBufMappingLock. To change the identity of a buffer that's + * To change the identity of a buffer, you need to advance + * InitializeReserved first. To change the identity of a buffer that's * still dirty, the old page needs to be written out first, and for that * you need WALWriteLock, and you need to ensure that there are no * in-progress insertions to the page by calling * WaitXLogInsertionsToFinish(). */ - XLogRecPtr InitializedUpTo; + pg_atomic_uint64 InitializeReserved; + + /* + * Latest initialized page in the cache (last byte position + 1). + * + * InitializedUpTo is updated after the buffer initialization. After + * update, waiters got notification using InitializedUpToCondVar. + */ + pg_atomic_uint64 InitializedUpTo; + ConditionVariable InitializedUpToCondVar; /* * These values do not change after startup, although the pointed-to pages - * and xlblocks values certainly do. xlblocks values are protected by - * WALBufMappingLock. + * and xlblocks values certainly do. xlblocks values are changed + * lock-free according to the check for the xlog write position and are + * accompanied by changes of InitializeReserved and InitializedUpTo. */ char *pages; /* buffers for unwritten XLOG pages */ pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ @@ -810,9 +821,9 @@ XLogInsertRecord(XLogRecData *rdata, * fullPageWrites from changing until the insertion is finished. * * Step 2 can usually be done completely in parallel. If the required WAL - * page is not initialized yet, you have to grab WALBufMappingLock to - * initialize it, but the WAL writer tries to do that ahead of insertions - * to avoid that from happening in the critical path. + * page is not initialized yet, you have to go through AdvanceXLInsertBuffer, + * which will ensure it is initialized. But the WAL writer tries to do that + * ahead of insertions to avoid that from happening in the critical path. * *---------- */ @@ -1991,32 +2002,79 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageBeginPtr; XLogPageHeader NewPage; + XLogRecPtr ReservedPtr; int npages pg_attribute_unused() = 0; - LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); - /* - * Now that we have the lock, check if someone initialized the page - * already. + * We must run the loop below inside the critical section as we expect + * XLogCtl->InitializedUpTo to eventually keep up. The most of callers + * already run inside the critical section. Except for WAL writer, which + * passed 'opportunistic == true', and therefore we don't perform + * operations that could error out. + * + * Start an explicit critical section anyway though. + */ + Assert(CritSectionCount > 0 || opportunistic); + START_CRIT_SECTION(); + + /*-- + * Loop till we get all the pages in WAL buffer before 'upto' reserved for + * initialization. Multiple process can initialize different buffers with + * this loop in parallel as following. + * + * 1. Reserve page for initialization using XLogCtl->InitializeReserved. + * 2. Initialize the reserved page. + * 3. Attempt to advance XLogCtl->InitializedUpTo, */ - while (upto >= XLogCtl->InitializedUpTo || opportunistic) + ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved); + while (upto >= ReservedPtr || opportunistic) { - nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); + Assert(ReservedPtr % XLOG_BLCKSZ == 0); /* - * Get ending-offset of the buffer page we need to replace (this may - * be zero if the buffer hasn't been used yet). Fall through if it's - * already written out. + * Get ending-offset of the buffer page we need to replace. + * + * We don't lookup into xlblocks, but rather calculate position we + * must wait to be written. If it was written, xlblocks will have this + * position (or uninitialized) */ - OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]); - if (LogwrtResult.Write < OldPageRqstPtr) + if (ReservedPtr + XLOG_BLCKSZ > XLogCtl->InitializedFrom + XLOG_BLCKSZ * XLOGbuffers) + OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers; + else + OldPageRqstPtr = InvalidXLogRecPtr; + + if (LogwrtResult.Write < OldPageRqstPtr && opportunistic) { /* - * Nope, got work to do. If we just want to pre-initialize as much - * as we can without flushing, give up now. + * If we just want to pre-initialize as much as we can without + * flushing, give up now. */ - if (opportunistic) - break; + upto = ReservedPtr - 1; + break; + } + + /* + * Attempt to reserve the page for initialization. Failure means that + * this page got reserved by another process. + */ + if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved, + &ReservedPtr, + ReservedPtr + XLOG_BLCKSZ)) + continue; + + /* + * Wait till page gets correctly initialized up to OldPageRqstPtr. + */ + nextidx = XLogRecPtrToBufIdx(ReservedPtr); + while (pg_atomic_read_u64(&XLogCtl->InitializedUpTo) < OldPageRqstPtr) + ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT); + ConditionVariableCancelSleep(); + Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageRqstPtr); + + /* Fall through if it's already written out. */ + if (LogwrtResult.Write < OldPageRqstPtr) + { + /* Nope, got work to do. */ /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); @@ -2031,14 +2089,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) RefreshXLogWriteResult(LogwrtResult); if (LogwrtResult.Write < OldPageRqstPtr) { - /* - * Must acquire write lock. Release WALBufMappingLock first, - * to make sure that all insertions that we need to wait for - * can finish (up to this same position). Otherwise we risk - * deadlock. - */ - LWLockRelease(WALBufMappingLock); - WaitXLogInsertionsToFinish(OldPageRqstPtr); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); @@ -2060,9 +2110,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) pgWalUsage.wal_buffers_full++; TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); } - /* Re-acquire WALBufMappingLock and retry */ - LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); - continue; } } @@ -2070,11 +2117,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Now the next buffer slot is free and we can set it up to be the * next output page. */ - NewPageBeginPtr = XLogCtl->InitializedUpTo; + NewPageBeginPtr = ReservedPtr; NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; - Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); - NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); /* @@ -2138,12 +2183,100 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) */ pg_write_barrier(); + /*----- + * Update the value of XLogCtl->xlblocks[nextidx] and try to advance + * XLogCtl->InitializedUpTo in a lock-less manner. + * + * First, let's provide a formal proof of the algorithm. Let it be 'n' + * process with the following variables in shared memory: + * f - an array of 'n' boolean flags, + * v - atomic integer variable. + * + * Also, let + * i - a number of a process, + * j - local integer variable, + * CAS(var, oldval, newval) - compare-and-swap atomic operation + * returning true on success, + * write_barrier()/read_barrier() - memory barriers. + * + * The pseudocode for each process is the following. + * + * j := i + * f[i] := true + * write_barrier() + * while CAS(v, j, j + 1): + * j := j + 1 + * read_barrier() + * if not f[j]: + * break + * + * Let's prove that v eventually reaches the value of n. + * 1. Prove by contradiction. Assume v doesn't reach n and stucks + * on k, where k < n. + * 2. Process k attempts CAS(v, k, k + 1). 1). If, as we assumed, v + * gets stuck at k, then this CAS operation must fail. Therefore, + * v < k when process k attempts CAS(v, k, k + 1). + * 3. If, as we assumed, v gets stuck at k, then the value k of v + * must be achieved by some process m, where m < k. The process + * m must observe f[k] == false. Otherwise, it will later attempt + * CAS(v, k, k + 1) with success. + * 4. Therefore, corresponding read_barrier() (while j == k) on + * process m happend before write_barrier() of process k. But then + * process k attempts CAS(v, k, k + 1) after process m successfully + * incremented v to k, and that CAS operation must succeed. + * That leads to a contradiction. So, there is no such k (k < n) + * where v gets stuck. Q.E.D. + * + * To apply this proof to the code below, we assume + * XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ + * granularity. We also assume setting XLogCtl->xlblocks[nextidx] to + * NewPageEndPtr to play the role of setting f[i] to true. Also, note + * that processes can't concurrently map different xlog locations to + * the same nextidx because we previously requested that + * XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can + * be taken for initialization only once the previous initialization + * takes effect on XLogCtl->InitializedUpTo. + */ + pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr); - XLogCtl->InitializedUpTo = NewPageEndPtr; + + pg_write_barrier(); + + while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr)) + { + NewPageBeginPtr = NewPageEndPtr; + NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; + nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr); + + pg_read_barrier(); + + if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr) + { + /* + * Page at nextidx wasn't initialized yet, so we cann't move + * InitializedUpto further. It will be moved by backend which + * will initialize nextidx. + */ + ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar); + break; + } + } npages++; } - LWLockRelease(WALBufMappingLock); + + END_CRIT_SECTION(); + + /* + * All the pages in WAL buffer before 'upto' were reserved for + * initialization. However, some pages might be reserved by concurrent + * processes. Wait till they finish initialization. + */ + while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo)) + ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT); + ConditionVariableCancelSleep(); + + pg_read_barrier(); #ifdef WAL_DEBUG if (XLOG_DEBUG && npages > 0) @@ -5071,6 +5204,10 @@ XLOGShmemInit(void) pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); + + pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr); + pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr); + ConditionVariableInit(&XLogCtl->InitializedUpToCondVar); } /* @@ -6090,7 +6227,8 @@ StartupXLOG(void) memset(page + len, 0, XLOG_BLCKSZ - len); pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); - XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; + pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); + XLogCtl->InitializedFrom = endOfRecoveryInfo->lastPageBeginPtr; } else { @@ -6099,8 +6237,10 @@ StartupXLOG(void) * let the first attempt to insert a log record to initialize the next * buffer. */ - XLogCtl->InitializedUpTo = EndOfLog; + pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog); + XLogCtl->InitializedFrom = EndOfLog; } + pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo)); /* * Update local and shared status. This is OK to do without any locks |