diff options
| -rw-r--r-- | contrib/pg_buffercache/pg_buffercache_pages.c | 7 | ||||
| -rw-r--r-- | contrib/pg_prewarm/autoprewarm.c | 2 | ||||
| -rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 203 | ||||
| -rw-r--r-- | src/backend/storage/buffer/freelist.c | 2 | ||||
| -rw-r--r-- | src/include/storage/buf_internals.h | 119 | ||||
| -rw-r--r-- | src/test/modules/test_aio/test_aio.c | 10 |
6 files changed, 224 insertions, 119 deletions
diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index 3df04c98959..ab790533ff6 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -220,7 +220,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) else fctx->record[i].isvalid = false; - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } } @@ -460,7 +460,6 @@ pg_buffercache_numa_pages(PG_FUNCTION_ARGS) { char *buffptr = (char *) BufferGetBlock(i + 1); BufferDesc *bufHdr; - uint32 buf_state; uint32 bufferid; int32 page_num; char *startptr_buff, @@ -471,9 +470,9 @@ pg_buffercache_numa_pages(PG_FUNCTION_ARGS) bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ - buf_state = LockBufHdr(bufHdr); + LockBufHdr(bufHdr); bufferid = BufferDescriptorGetBuffer(bufHdr); - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); /* start of the first page of this buffer */ startptr_buff = (char *) TYPEALIGN_DOWN(os_page_size, buffptr); diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 8b68dafc261..5ba1240d51f 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -730,7 +730,7 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged) ++num_blocks; } - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f830f2c6ff3..4b9fd76294d 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1990,6 +1990,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, Buffer victim_buffer; BufferDesc *victim_buf_hdr; uint32 victim_buf_state; + uint32 set_bits = 0; /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlarge(CurrentResourceOwner); @@ -2116,11 +2117,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * checkpoints, except for their "init" forks, which need to be treated * just like permanent relations. */ - victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; + set_bits |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM) - victim_buf_state |= BM_PERMANENT; + set_bits |= BM_PERMANENT; - UnlockBufHdr(victim_buf_hdr, victim_buf_state); + UnlockBufHdrExt(victim_buf_hdr, victim_buf_state, + set_bits, 0, 0); LWLockRelease(newPartitionLock); @@ -2160,9 +2162,7 @@ InvalidateBuffer(BufferDesc *buf) /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; - buf_state = pg_atomic_read_u32(&buf->state); - Assert(buf_state & BM_LOCKED); - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); /* * Need to compute the old tag's hashcode and partition lock ID. XXX is it @@ -2186,7 +2186,7 @@ retry: /* If it's changed while we were waiting for lock, do nothing */ if (!BufferTagsEqual(&buf->tag, &oldTag)) { - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); return; } @@ -2203,7 +2203,7 @@ retry: */ if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); LWLockRelease(oldPartitionLock); /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) @@ -2218,8 +2218,11 @@ retry: */ oldFlags = buf_state & BUF_FLAG_MASK; ClearBufferTag(&buf->tag); - buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); - UnlockBufHdr(buf, buf_state); + + UnlockBufHdrExt(buf, buf_state, + 0, + BUF_FLAG_MASK | BUF_USAGECOUNT_MASK, + 0); /* * Remove the buffer from the lookup hashtable, if it was in there. @@ -2279,7 +2282,7 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr) { Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); - UnlockBufHdr(buf_hdr, buf_state); + UnlockBufHdr(buf_hdr); LWLockRelease(partition_lock); return false; @@ -2293,8 +2296,10 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr) * tag (see e.g. FlushDatabaseBuffers()). */ ClearBufferTag(&buf_hdr->tag); - buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); - UnlockBufHdr(buf_hdr, buf_state); + UnlockBufHdrExt(buf_hdr, buf_state, + 0, + BUF_FLAG_MASK | BUF_USAGECOUNT_MASK, + 0); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); @@ -2303,6 +2308,7 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr) LWLockRelease(partition_lock); + buf_state = pg_atomic_read_u32(&buf_hdr->state); Assert(!(buf_state & (BM_DIRTY | BM_VALID | BM_TAG_VALID))); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u32(&buf_hdr->state)) > 0); @@ -2393,7 +2399,7 @@ again: /* Read the LSN while holding buffer header lock */ buf_state = LockBufHdr(buf_hdr); lsn = BufferGetLSN(buf_hdr); - UnlockBufHdr(buf_hdr, buf_state); + UnlockBufHdr(buf_hdr); if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf_hdr, from_ring)) @@ -2739,15 +2745,13 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, */ do { - uint32 buf_state = LockBufHdr(existing_hdr); - - buf_state &= ~BM_VALID; - UnlockBufHdr(existing_hdr, buf_state); + pg_atomic_fetch_and_u32(&existing_hdr->state, ~BM_VALID); } while (!StartBufferIO(existing_hdr, true, false)); } else { uint32 buf_state; + uint32 set_bits = 0; buf_state = LockBufHdr(victim_buf_hdr); @@ -2757,11 +2761,13 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, victim_buf_hdr->tag = tag; - buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; + set_bits |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; if (bmr.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM) - buf_state |= BM_PERMANENT; + set_bits |= BM_PERMANENT; - UnlockBufHdr(victim_buf_hdr, buf_state); + UnlockBufHdrExt(victim_buf_hdr, buf_state, + set_bits, 0, + 0); LWLockRelease(partition_lock); @@ -2954,6 +2960,10 @@ MarkBufferDirty(Buffer buffer) Assert(BufferIsPinned(buffer)); Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE)); + /* + * NB: We have to wait for the buffer header spinlock to be not held, as + * TerminateBufferIO() relies on the spinlock. + */ old_buf_state = pg_atomic_read_u32(&bufHdr->state); for (;;) { @@ -3078,6 +3088,10 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, if (unlikely(skip_if_not_valid && !(old_buf_state & BM_VALID))) return false; + /* + * We're not allowed to increase the refcount while the buffer + * header spinlock is held. Wait for the lock to be released. + */ if (old_buf_state & BM_LOCKED) old_buf_state = WaitBufHdrUnlocked(buf); @@ -3164,7 +3178,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, static void PinBuffer_Locked(BufferDesc *buf) { - uint32 buf_state; + uint32 old_buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -3176,10 +3190,10 @@ PinBuffer_Locked(BufferDesc *buf) * Since we hold the buffer spinlock, we can update the buffer state and * release the lock in one operation. */ - buf_state = pg_atomic_read_u32(&buf->state); - Assert(buf_state & BM_LOCKED); - buf_state += BUF_REFCOUNT_ONE; - UnlockBufHdr(buf, buf_state); + old_buf_state = pg_atomic_read_u32(&buf->state); + + UnlockBufHdrExt(buf, old_buf_state, + 0, 0, 1); TrackNewBufferPin(BufferDescriptorGetBuffer(buf)); } @@ -3214,12 +3228,13 @@ WakePinCountWaiter(BufferDesc *buf) /* we just released the last pin other than the waiter's */ int wait_backend_pgprocno = buf->wait_backend_pgprocno; - buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf, buf_state); + UnlockBufHdrExt(buf, buf_state, + 0, BM_PIN_COUNT_WAITER, + 0); ProcSendSignal(wait_backend_pgprocno); } else - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); } /* @@ -3275,6 +3290,10 @@ UnpinBufferNoOwner(BufferDesc *buf) * * Since buffer spinlock holder can update status using just write, * it's not safe to use atomic decrement here; thus use a CAS loop. + * + * TODO: The above requirement does not hold anymore, in a future + * commit this will be rewritten to release the pin in a single atomic + * operation. */ old_buf_state = pg_atomic_read_u32(&buf->state); for (;;) @@ -3388,6 +3407,7 @@ BufferSync(int flags) for (buf_id = 0; buf_id < NBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); + uint32 set_bits = 0; /* * Header spinlock is enough to examine BM_DIRTY, see comment in @@ -3399,7 +3419,7 @@ BufferSync(int flags) { CkptSortItem *item; - buf_state |= BM_CHECKPOINT_NEEDED; + set_bits = BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; @@ -3409,7 +3429,9 @@ BufferSync(int flags) item->blockNum = bufHdr->tag.blockNum; } - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdrExt(bufHdr, buf_state, + set_bits, 0, + 0); /* Check for barrier events in case NBuffers is large. */ if (ProcSignalBarrierPending) @@ -3948,14 +3970,14 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); return result; } if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); return result; } @@ -4324,8 +4346,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf_state &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf, buf_state); + UnlockBufHdrExt(buf, buf_state, + 0, BM_JUST_DIRTIED, + 0); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL @@ -4501,7 +4524,6 @@ BufferGetLSNAtomic(Buffer buffer) char *page = BufferGetPage(buffer); BufferDesc *bufHdr; XLogRecPtr lsn; - uint32 buf_state; /* * If we don't need locking for correctness, fastpath out. @@ -4514,9 +4536,9 @@ BufferGetLSNAtomic(Buffer buffer) Assert(BufferIsPinned(buffer)); bufHdr = GetBufferDescriptor(buffer - 1); - buf_state = LockBufHdr(bufHdr); + LockBufHdr(bufHdr); lsn = PageGetLSN(page); - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); return lsn; } @@ -4617,7 +4639,6 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); - uint32 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before @@ -4638,7 +4659,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, if (!BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator.locator)) continue; - buf_state = LockBufHdr(bufHdr); + LockBufHdr(bufHdr); for (j = 0; j < nforks; j++) { @@ -4651,7 +4672,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, } } if (j >= nforks) - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } } @@ -4780,7 +4801,6 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) { RelFileLocator *rlocator = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); - uint32 buf_state; /* * As in DropRelationBuffers, an unlocked precheck should be safe and @@ -4814,11 +4834,11 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) if (rlocator == NULL) continue; - buf_state = LockBufHdr(bufHdr); + LockBufHdr(bufHdr); if (BufTagMatchesRelFileLocator(&bufHdr->tag, rlocator)) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } pfree(locators); @@ -4848,7 +4868,6 @@ FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, LWLock *bufPartitionLock; /* buffer partition lock for it */ int buf_id; BufferDesc *bufHdr; - uint32 buf_state; /* create a tag so we can lookup the buffer */ InitBufferTag(&bufTag, &rlocator, forkNum, curBlock); @@ -4873,14 +4892,14 @@ FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, * evicted by some other backend loading blocks for a different * relation after we release lock on the BufMapping table. */ - buf_state = LockBufHdr(bufHdr); + LockBufHdr(bufHdr); if (BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) && BufTagGetForkNum(&bufHdr->tag) == forkNum && bufHdr->tag.blockNum >= firstDelBlock) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } } @@ -4908,7 +4927,6 @@ DropDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); - uint32 buf_state; /* * As in DropRelationBuffers, an unlocked precheck should be safe and @@ -4917,11 +4935,11 @@ DropDatabaseBuffers(Oid dbid) if (bufHdr->tag.dbOid != dbid) continue; - buf_state = LockBufHdr(bufHdr); + LockBufHdr(bufHdr); if (bufHdr->tag.dbOid == dbid) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } } @@ -5018,7 +5036,7 @@ FlushRelationBuffers(Relation rel) UnpinBuffer(bufHdr); } else - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } } @@ -5113,7 +5131,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) UnpinBuffer(bufHdr); } else - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } pfree(srels); @@ -5339,7 +5357,7 @@ FlushDatabaseBuffers(Oid dbid) UnpinBuffer(bufHdr); } else - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); } } @@ -5549,8 +5567,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) PageSetLSN(page, lsn); } - buf_state |= BM_DIRTY | BM_JUST_DIRTIED; - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdrExt(bufHdr, buf_state, + BM_DIRTY | BM_JUST_DIRTIED, + 0, 0); if (delayChkptFlags) MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; @@ -5581,6 +5600,7 @@ UnlockBuffers(void) if (buf) { uint32 buf_state; + uint32 unset_bits = 0; buf_state = LockBufHdr(buf); @@ -5590,9 +5610,11 @@ UnlockBuffers(void) */ if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pgprocno == MyProcNumber) - buf_state &= ~BM_PIN_COUNT_WAITER; + unset_bits = BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf, buf_state); + UnlockBufHdrExt(buf, buf_state, + 0, unset_bits, + 0); PinCountWaitBuf = NULL; } @@ -5710,6 +5732,7 @@ LockBufferForCleanup(Buffer buffer) for (;;) { uint32 buf_state; + uint32 unset_bits = 0; /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -5719,7 +5742,7 @@ LockBufferForCleanup(Buffer buffer) if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); /* * Emit the log message if recovery conflict on buffer pin was @@ -5742,14 +5765,15 @@ LockBufferForCleanup(Buffer buffer) /* Failed, so mark myself as waiting for pincount 1 */ if (buf_state & BM_PIN_COUNT_WAITER) { - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pgprocno = MyProcNumber; PinCountWaitBuf = bufHdr; - buf_state |= BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdrExt(bufHdr, buf_state, + BM_PIN_COUNT_WAITER, 0, + 0); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Wait to be signaled by UnpinBuffer() */ @@ -5811,8 +5835,11 @@ LockBufferForCleanup(Buffer buffer) buf_state = LockBufHdr(bufHdr); if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pgprocno == MyProcNumber) - buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr, buf_state); + unset_bits |= BM_PIN_COUNT_WAITER; + + UnlockBufHdrExt(bufHdr, buf_state, + 0, unset_bits, + 0); PinCountWaitBuf = NULL; /* Loop back and try again */ @@ -5889,12 +5916,12 @@ ConditionalLockBufferForCleanup(Buffer buffer) if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); return true; } /* Failed, so release the lock */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); return false; } @@ -5941,11 +5968,11 @@ IsBufferCleanupOK(Buffer buffer) if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* pincount is OK. */ - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); return true; } - UnlockBufHdr(bufHdr, buf_state); + UnlockBufHdr(bufHdr); return false; } @@ -5983,7 +6010,7 @@ WaitIO(BufferDesc *buf) * clearing the wref while it's being read. */ iow = buf->io_wref; - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); /* no IO in progress, we don't need to wait */ if (!(buf_state & BM_IO_IN_PROGRESS)) @@ -6051,7 +6078,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) if (!(buf_state & BM_IO_IN_PROGRESS)) break; - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); if (nowait) return false; WaitIO(buf); @@ -6062,12 +6089,13 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) /* Check if someone else already did the I/O */ if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { - UnlockBufHdr(buf, buf_state); + UnlockBufHdr(buf); return false; } - buf_state |= BM_IO_IN_PROGRESS; - UnlockBufHdr(buf, buf_state); + UnlockBufHdrExt(buf, buf_state, + BM_IO_IN_PROGRESS, 0, + 0); ResourceOwnerRememberBufferIO(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); @@ -6100,28 +6128,31 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner, bool release_aio) { uint32 buf_state; + uint32 unset_flag_bits = 0; + int refcount_change = 0; buf_state = LockBufHdr(buf); Assert(buf_state & BM_IO_IN_PROGRESS); - buf_state &= ~BM_IO_IN_PROGRESS; + unset_flag_bits |= BM_IO_IN_PROGRESS; /* Clear earlier errors, if this IO failed, it'll be marked again */ - buf_state &= ~BM_IO_ERROR; + unset_flag_bits |= BM_IO_ERROR; if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) - buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); + unset_flag_bits |= BM_DIRTY | BM_CHECKPOINT_NEEDED; if (release_aio) { /* release ownership by the AIO subsystem */ Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); - buf_state -= BUF_REFCOUNT_ONE; + refcount_change = -1; pgaio_wref_clear(&buf->io_wref); } - buf_state |= set_flag_bits; - UnlockBufHdr(buf, buf_state); + buf_state = UnlockBufHdrExt(buf, buf_state, + set_flag_bits, unset_flag_bits, + refcount_change); if (forget_owner) ResourceOwnerForgetBufferIO(CurrentResourceOwner, @@ -6166,12 +6197,12 @@ AbortBufferIO(Buffer buffer) if (!(buf_state & BM_VALID)) { Assert(!(buf_state & BM_DIRTY)); - UnlockBufHdr(buf_hdr, buf_state); + UnlockBufHdr(buf_hdr); } else { Assert(buf_state & BM_DIRTY); - UnlockBufHdr(buf_hdr, buf_state); + UnlockBufHdr(buf_hdr); /* Issue notice if this is not the first failure... */ if (buf_state & BM_IO_ERROR) @@ -6593,14 +6624,14 @@ EvictUnpinnedBufferInternal(BufferDesc *desc, bool *buffer_flushed) if ((buf_state & BM_VALID) == 0) { - UnlockBufHdr(desc, buf_state); + UnlockBufHdr(desc); return false; } /* Check that it's not pinned already. */ if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) { - UnlockBufHdr(desc, buf_state); + UnlockBufHdr(desc); return false; } @@ -6754,7 +6785,7 @@ EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted, if ((buf_state & BM_VALID) == 0 || !BufTagMatchesRelFileLocator(&desc->tag, &rel->rd_locator)) { - UnlockBufHdr(desc, buf_state); + UnlockBufHdr(desc); continue; } @@ -6852,13 +6883,15 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp) * * This pin is released again in TerminateBufferIO(). */ - buf_state += BUF_REFCOUNT_ONE; buf_hdr->io_wref = io_ref; if (is_temp) + { + buf_state += BUF_REFCOUNT_ONE; pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state); + } else - UnlockBufHdr(buf_hdr, buf_state); + UnlockBufHdrExt(buf_hdr, buf_state, 0, 0, 1); /* * Ensure the content lock that prevents buffer modifications while diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 7fe34d3ef4c..53668b92400 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -266,6 +266,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r break; } + /* See equivalent code in PinBuffer() */ if (unlikely(local_buf_state & BM_LOCKED)) { old_buf_state = WaitBufHdrUnlocked(buf); @@ -700,6 +701,7 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) || BUF_STATE_GET_USAGECOUNT(local_buf_state) > 1) break; + /* See equivalent code in PinBuffer() */ if (unlikely(local_buf_state & BM_LOCKED)) { old_buf_state = WaitBufHdrUnlocked(buf); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index c1206a46aba..5400c56a965 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -211,28 +211,36 @@ BufMappingPartitionLockByIndex(uint32 index) /* * BufferDesc -- shared descriptor/state data for a single shared buffer. * - * Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change - * tag, state or wait_backend_pgprocno fields. In general, buffer header lock - * is a spinlock which is combined with flags, refcount and usagecount into - * single atomic variable. This layout allow us to do some operations in a - * single atomic operation, without actually acquiring and releasing spinlock; - * for instance, increase or decrease refcount. buf_id field never changes - * after initialization, so does not need locking. The LWLock can take care - * of itself. The buffer header lock is *not* used to control access to the - * data in the buffer! + * The state of the buffer is controlled by the, drumroll, state variable. It + * only may be modified using atomic operations. The state variable combines + * various flags, the buffer's refcount and usage count. See comment above + * BUF_REFCOUNT_BITS for details about the division. This layout allow us to + * do some operations in a single atomic operation, without actually acquiring + * and releasing the spinlock; for instance, increasing or decreasing the + * refcount. * - * It's assumed that nobody changes the state field while buffer header lock - * is held. Thus buffer header lock holder can do complex updates of the - * state variable in single write, simultaneously with lock release (cleaning - * BM_LOCKED flag). On the other hand, updating of state without holding - * buffer header lock is restricted to CAS, which ensures that BM_LOCKED flag - * is not set. Atomic increment/decrement, OR/AND etc. are not allowed. + * One of the aforementioned flags is BM_LOCKED, used to implement the buffer + * header lock. See the following paragraphs, as well as the documentation for + * individual fields, for more details. * - * An exception is that if we have the buffer pinned, its tag can't change - * underneath us, so we can examine the tag without locking the buffer header. - * Also, in places we do one-time reads of the flags without bothering to - * lock the buffer header; this is generally for situations where we don't - * expect the flag bit being tested to be changing. + * The identity of the buffer (BufferDesc.tag) can only be changed by the + * backend holding the buffer header lock. + * + * If the lock is held by another backend, neither additional buffer pins may + * be established (we would like to relax this eventually), nor can flags be + * set/cleared. These operations either need to acquire the buffer header + * spinlock, or need to use a CAS loop, waiting for the lock to be released if + * it is held. However, existing buffer pins may be released while the buffer + * header spinlock is held, using an atomic subtraction. + * + * The LWLock can take care of itself. The buffer header lock is *not* used + * to control access to the data in the buffer! + * + * If we have the buffer pinned, its tag can't change underneath us, so we can + * examine the tag without locking the buffer header. Also, in places we do + * one-time reads of the flags without bothering to lock the buffer header; + * this is generally for situations where we don't expect the flag bit being + * tested to be changing. * * We can't physically remove items from a disk page if another backend has * the buffer pinned. Hence, a backend may need to wait for all other pins @@ -256,13 +264,29 @@ BufMappingPartitionLockByIndex(uint32 index) */ typedef struct BufferDesc { - BufferTag tag; /* ID of page contained in buffer */ - int buf_id; /* buffer's index number (from 0) */ + /* + * ID of page contained in buffer. The buffer header spinlock needs to be + * held to modify this field. + */ + BufferTag tag; + + /* + * Buffer's index number (from 0). The field never changes after + * initialization, so does not need locking. + */ + int buf_id; - /* state of the tag, containing flags, refcount and usagecount */ + /* + * State of the buffer, containing flags, refcount and usagecount. See + * BUF_* and BM_* defines at the top of this file. + */ pg_atomic_uint32 state; - int wait_backend_pgprocno; /* backend of pin-count waiter */ + /* + * Backend of pin-count waiter. The buffer header spinlock needs to be + * held to modify this field. + */ + int wait_backend_pgprocno; PgAioWaitRef io_wref; /* set iff AIO is in progress */ LWLock content_lock; /* to lock access to buffer contents */ @@ -364,11 +388,52 @@ BufferDescriptorGetContentLock(const BufferDesc *bdesc) */ extern uint32 LockBufHdr(BufferDesc *desc); +/* + * Unlock the buffer header. + * + * This can only be used if the caller did not modify BufferDesc.state. To + * set/unset flag bits or change the refcount use UnlockBufHdrExt(). + */ static inline void -UnlockBufHdr(BufferDesc *desc, uint32 buf_state) +UnlockBufHdr(BufferDesc *desc) { - pg_write_barrier(); - pg_atomic_write_u32(&desc->state, buf_state & (~BM_LOCKED)); + Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED); + + pg_atomic_fetch_sub_u32(&desc->state, BM_LOCKED); +} + +/* + * Unlock the buffer header, while atomically adding the flags in set_bits, + * unsetting the ones in unset_bits and changing the refcount by + * refcount_change. + * + * Note that this approach would not work for usagecount, since we need to cap + * the usagecount at BM_MAX_USAGE_COUNT. + */ +static inline uint32 +UnlockBufHdrExt(BufferDesc *desc, uint32 old_buf_state, + uint32 set_bits, uint32 unset_bits, + int refcount_change) +{ + for (;;) + { + uint32 buf_state = old_buf_state; + + Assert(buf_state & BM_LOCKED); + + buf_state |= set_bits; + buf_state &= ~unset_bits; + buf_state &= ~BM_LOCKED; + + if (refcount_change != 0) + buf_state += BUF_REFCOUNT_ONE * refcount_change; + + if (pg_atomic_compare_exchange_u32(&desc->state, &old_buf_state, + buf_state)) + { + return old_buf_state; + } + } } extern uint32 WaitBufHdrUnlocked(BufferDesc *buf); diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index c55cf6c0aac..d7eadeab256 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -310,6 +310,7 @@ create_toy_buffer(Relation rel, BlockNumber blkno) BufferDesc *buf_hdr; uint32 buf_state; bool was_pinned = false; + uint32 unset_bits = 0; /* place buffer in shared buffers without erroring out */ buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL); @@ -334,12 +335,17 @@ create_toy_buffer(Relation rel, BlockNumber blkno) if (BUF_STATE_GET_REFCOUNT(buf_state) > 1) was_pinned = true; else - buf_state &= ~(BM_VALID | BM_DIRTY); + unset_bits |= BM_VALID | BM_DIRTY; if (RelationUsesLocalBuffers(rel)) + { + buf_state &= ~unset_bits; pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state); + } else - UnlockBufHdr(buf_hdr, buf_state); + { + UnlockBufHdrExt(buf_hdr, buf_state, 0, unset_bits, 0); + } if (was_pinned) elog(ERROR, "toy buffer %d was already pinned", |
