summaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer')
-rw-r--r--src/backend/storage/buffer/bufmgr.c215
-rw-r--r--src/backend/storage/buffer/freelist.c149
2 files changed, 231 insertions, 133 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fe470de63f2..ca62b9cdcf0 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -512,12 +512,12 @@ static BlockNumber ExtendBufferedRelShared(BufferManagerRelation bmr,
BlockNumber extend_upto,
Buffer *buffers,
uint32 *extended_by);
-static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
+static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
+ bool skip_if_not_valid);
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf);
static void UnpinBufferNoOwner(BufferDesc *buf);
static void BufferSync(int flags);
-static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
WritebackContext *wb_context);
static void WaitIO(BufferDesc *buf);
@@ -533,6 +533,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress);
static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
+static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
+ IOObject io_object, IOContext io_context);
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
static void FindAndDropRelationBuffers(RelFileLocator rlocator,
@@ -685,7 +687,6 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN
BufferDesc *bufHdr;
BufferTag tag;
uint32 buf_state;
- bool have_private_ref;
Assert(BufferIsValid(recent_buffer));
@@ -713,38 +714,24 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN
else
{
bufHdr = GetBufferDescriptor(recent_buffer - 1);
- have_private_ref = GetPrivateRefCount(recent_buffer) > 0;
/*
- * Do we already have this buffer pinned with a private reference? If
- * so, it must be valid and it is safe to check the tag without
- * locking. If not, we have to lock the header first and then check.
+ * Is it still valid and holding the right tag? We do an unlocked tag
+ * comparison first, to make it unlikely that we'll increment the
+ * usage counter of the wrong buffer, if someone calls us with a very
+ * out of date recent_buffer. Then we'll check it again if we get the
+ * pin.
*/
- if (have_private_ref)
- buf_state = pg_atomic_read_u32(&bufHdr->state);
- else
- buf_state = LockBufHdr(bufHdr);
-
- if ((buf_state & BM_VALID) && BufferTagsEqual(&tag, &bufHdr->tag))
+ if (BufferTagsEqual(&tag, &bufHdr->tag) &&
+ PinBuffer(bufHdr, NULL, true))
{
- /*
- * It's now safe to pin the buffer. We can't pin first and ask
- * questions later, because it might confuse code paths like
- * InvalidateBuffer() if we pinned a random non-matching buffer.
- */
- if (have_private_ref)
- PinBuffer(bufHdr, NULL); /* bump pin count */
- else
- PinBuffer_Locked(bufHdr); /* pin for first time */
-
- pgBufferUsage.shared_blks_hit++;
-
- return true;
+ if (BufferTagsEqual(&tag, &bufHdr->tag))
+ {
+ pgBufferUsage.shared_blks_hit++;
+ return true;
+ }
+ UnpinBuffer(bufHdr);
}
-
- /* If we locked the header above, now unlock. */
- if (!have_private_ref)
- UnlockBufHdr(bufHdr, buf_state);
}
return false;
@@ -1080,7 +1067,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
* already valid.)
*/
if (!isLocalBuf)
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (isLocalBuf)
@@ -2036,7 +2023,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
buf = GetBufferDescriptor(existing_buf_id);
- valid = PinBuffer(buf, strategy);
+ valid = PinBuffer(buf, strategy, false);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(newPartitionLock);
@@ -2098,7 +2085,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
existing_buf_hdr = GetBufferDescriptor(existing_buf_id);
- valid = PinBuffer(existing_buf_hdr, strategy);
+ valid = PinBuffer(existing_buf_hdr, strategy, false);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(newPartitionLock);
@@ -2338,8 +2325,8 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
bool from_ring;
/*
- * Ensure, while the spinlock's not yet held, that there's a free refcount
- * entry, and a resource owner slot for the pin.
+ * Ensure, before we pin a victim buffer, that there's a free refcount
+ * entry and resource owner slot for the pin.
*/
ReservePrivateRefCountEntry();
ResourceOwnerEnlarge(CurrentResourceOwner);
@@ -2348,17 +2335,12 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
again:
/*
- * Select a victim buffer. The buffer is returned with its header
- * spinlock still held!
+ * Select a victim buffer. The buffer is returned pinned and owned by
+ * this backend.
*/
buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
buf = BufferDescriptorGetBuffer(buf_hdr);
- Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
-
- /* Pin the buffer and then release the buffer spinlock */
- PinBuffer_Locked(buf_hdr);
-
/*
* We shouldn't have any other pins for this buffer.
*/
@@ -2736,7 +2718,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
* Pin the existing buffer before releasing the partition lock,
* preventing it from being evicted.
*/
- valid = PinBuffer(existing_hdr, strategy);
+ valid = PinBuffer(existing_hdr, strategy, false);
LWLockRelease(partition_lock);
UnpinBuffer(victim_buf_hdr);
@@ -2837,7 +2819,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
}
if (lock)
- LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
TerminateBufferIO(buf_hdr, false, BM_VALID, true, false);
}
@@ -2850,14 +2832,40 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
}
/*
- * BufferIsExclusiveLocked
+ * BufferIsLockedByMe
+ *
+ * Checks if this backend has the buffer locked in any mode.
+ *
+ * Buffer must be pinned.
+ */
+bool
+BufferIsLockedByMe(Buffer buffer)
+{
+ BufferDesc *bufHdr;
+
+ Assert(BufferIsPinned(buffer));
+
+ if (BufferIsLocal(buffer))
+ {
+ /* Content locks are not maintained for local buffers. */
+ return true;
+ }
+ else
+ {
+ bufHdr = GetBufferDescriptor(buffer - 1);
+ return LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr));
+ }
+}
+
+/*
+ * BufferIsLockedByMeInMode
*
- * Checks if buffer is exclusive-locked.
+ * Checks if this backend has the buffer locked in the specified mode.
*
* Buffer must be pinned.
*/
bool
-BufferIsExclusiveLocked(Buffer buffer)
+BufferIsLockedByMeInMode(Buffer buffer, int mode)
{
BufferDesc *bufHdr;
@@ -2870,9 +2878,23 @@ BufferIsExclusiveLocked(Buffer buffer)
}
else
{
+ LWLockMode lw_mode;
+
+ switch (mode)
+ {
+ case BUFFER_LOCK_EXCLUSIVE:
+ lw_mode = LW_EXCLUSIVE;
+ break;
+ case BUFFER_LOCK_SHARE:
+ lw_mode = LW_SHARED;
+ break;
+ default:
+ pg_unreachable();
+ }
+
bufHdr = GetBufferDescriptor(buffer - 1);
return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE);
+ lw_mode);
}
}
@@ -2901,8 +2923,7 @@ BufferIsDirty(Buffer buffer)
else
{
bufHdr = GetBufferDescriptor(buffer - 1);
- Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE));
+ Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
}
return pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY;
@@ -2936,8 +2957,7 @@ MarkBufferDirty(Buffer buffer)
bufHdr = GetBufferDescriptor(buffer - 1);
Assert(BufferIsPinned(buffer));
- Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE));
+ Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
old_buf_state = pg_atomic_read_u32(&bufHdr->state);
for (;;)
@@ -3035,10 +3055,13 @@ ReleaseAndReadBuffer(Buffer buffer,
* must have been done already.
*
* Returns true if buffer is BM_VALID, else false. This provision allows
- * some callers to avoid an extra spinlock cycle.
+ * some callers to avoid an extra spinlock cycle. If skip_if_not_valid is
+ * true, then a false return value also indicates that the buffer was
+ * (recently) invalid and has not been pinned.
*/
static bool
-PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
+PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
+ bool skip_if_not_valid)
{
Buffer b = BufferDescriptorGetBuffer(buf);
bool result;
@@ -3054,11 +3077,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
uint32 buf_state;
uint32 old_buf_state;
- ref = NewPrivateRefCountEntry(b);
-
old_buf_state = pg_atomic_read_u32(&buf->state);
for (;;)
{
+ if (unlikely(skip_if_not_valid && !(old_buf_state & BM_VALID)))
+ return false;
+
if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(buf);
@@ -3088,6 +3112,8 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
{
result = (buf_state & BM_VALID) != 0;
+ TrackNewBufferPin(b);
+
/*
* Assume that we acquired a buffer pin for the purposes of
* Valgrind buffer client checks (even in !result case) to
@@ -3118,11 +3144,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
* cannot meddle with that.
*/
result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
+
+ Assert(ref->refcount > 0);
+ ref->refcount++;
+ ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
}
- ref->refcount++;
- Assert(ref->refcount > 0);
- ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
return result;
}
@@ -3151,8 +3178,6 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
static void
PinBuffer_Locked(BufferDesc *buf)
{
- Buffer b;
- PrivateRefCountEntry *ref;
uint32 buf_state;
/*
@@ -3177,12 +3202,7 @@ PinBuffer_Locked(BufferDesc *buf)
buf_state += BUF_REFCOUNT_ONE;
UnlockBufHdr(buf, buf_state);
- b = BufferDescriptorGetBuffer(buf);
-
- ref = NewPrivateRefCountEntry(b);
- ref->refcount++;
-
- ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
+ TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
}
/*
@@ -3265,7 +3285,10 @@ UnpinBufferNoOwner(BufferDesc *buf)
*/
VALGRIND_MAKE_MEM_NOACCESS(BufHdrGetBlock(buf), BLCKSZ);
- /* I'd better not still hold the buffer content lock */
+ /*
+ * I'd better not still hold the buffer content lock. Can't use
+ * BufferIsLockedByMe(), as that asserts the buffer is pinned.
+ */
Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
/*
@@ -3297,6 +3320,17 @@ UnpinBufferNoOwner(BufferDesc *buf)
}
}
+inline void
+TrackNewBufferPin(Buffer buf)
+{
+ PrivateRefCountEntry *ref;
+
+ ref = NewPrivateRefCountEntry(buf);
+ ref->refcount++;
+
+ ResourceOwnerRememberBuffer(CurrentResourceOwner, buf);
+}
+
#define ST_SORT sort_checkpoint_bufferids
#define ST_ELEMENT_TYPE CkptSortItem
#define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b)
@@ -3327,7 +3361,7 @@ BufferSync(int flags)
Oid last_tsid;
binaryheap *ts_heap;
int i;
- int mask = BM_DIRTY;
+ uint32 mask = BM_DIRTY;
WritebackContext wb_context;
/*
@@ -3935,11 +3969,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
* buffer is clean by the time we've locked it.)
*/
PinBuffer_Locked(bufHdr);
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-
- FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
- LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+ FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
tag = bufHdr->tag;
@@ -4387,6 +4418,19 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
}
/*
+ * Convenience wrapper around FlushBuffer() that locks/unlocks the buffer
+ * before/after calling FlushBuffer().
+ */
+static void
+FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
+ IOObject io_object, IOContext io_context)
+{
+ LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
+ FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+ LWLockRelease(BufferDescriptorGetContentLock(buf));
+}
+
+/*
* RelationGetNumberOfBlocksInFork
* Determines the current number of pages in the specified relation fork.
*
@@ -4975,9 +5019,7 @@ FlushRelationBuffers(Relation rel)
(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
- FlushBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
- LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+ FlushUnlockedBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
UnpinBuffer(bufHdr);
}
else
@@ -5072,9 +5114,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
- FlushBuffer(bufHdr, srelent->srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
- LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+ FlushUnlockedBuffer(bufHdr, srelent->srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
UnpinBuffer(bufHdr);
}
else
@@ -5300,9 +5340,7 @@ FlushDatabaseBuffers(Oid dbid)
(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{
PinBuffer_Locked(bufHdr);
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
- FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
- LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+ FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
UnpinBuffer(bufHdr);
}
else
@@ -5326,7 +5364,7 @@ FlushOneBuffer(Buffer buffer)
bufHdr = GetBufferDescriptor(buffer - 1);
- Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
+ Assert(BufferIsLockedByMe(buffer));
FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
}
@@ -5417,7 +5455,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
Assert(GetPrivateRefCount(buffer) > 0);
/* here, either share or exclusive lock is OK */
- Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
+ Assert(BufferIsLockedByMe(buffer));
/*
* This routine might get called many times on the same page, if we are
@@ -5900,8 +5938,7 @@ IsBufferCleanupOK(Buffer buffer)
bufHdr = GetBufferDescriptor(buffer - 1);
/* caller must hold exclusive lock on buffer */
- Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE));
+ Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
buf_state = LockBufHdr(bufHdr);
@@ -6250,7 +6287,7 @@ LockBufHdr(BufferDesc *desc)
* Obviously the buffer could be locked by the time the value is returned, so
* this is primarily useful in CAS style loops.
*/
-static uint32
+pg_noinline uint32
WaitBufHdrUnlocked(BufferDesc *buf)
{
SpinDelayStatus delayStatus;
@@ -6577,10 +6614,8 @@ EvictUnpinnedBufferInternal(BufferDesc *desc, bool *buffer_flushed)
/* If it was dirty, try to clean it once. */
if (buf_state & BM_DIRTY)
{
- LWLockAcquire(BufferDescriptorGetContentLock(desc), LW_SHARED);
- FlushBuffer(desc, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+ FlushUnlockedBuffer(desc, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
*buffer_flushed = true;
- LWLockRelease(BufferDescriptorGetContentLock(desc));
}
/* This will return false if it becomes dirty or someone else pins it. */
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 7d59a92bd1a..7fe34d3ef4c 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -159,13 +159,16 @@ ClockSweepTick(void)
* StrategyGetBuffer
*
* Called by the bufmgr to get the next candidate buffer to use in
- * BufferAlloc(). The only hard requirement BufferAlloc() has is that
+ * GetVictimBuffer(). The only hard requirement GetVictimBuffer() has is that
* the selected buffer must not currently be pinned by anyone.
*
* strategy is a BufferAccessStrategy object, or NULL for default strategy.
*
- * To ensure that no one else can pin the buffer before we do, we must
- * return the buffer with the buffer header spinlock still held.
+ * It is the callers responsibility to ensure the buffer ownership can be
+ * tracked via TrackNewBufferPin().
+ *
+ * The buffer is pinned and marked as owned, using TrackNewBufferPin(),
+ * before returning.
*/
BufferDesc *
StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring)
@@ -173,7 +176,6 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
BufferDesc *buf;
int bgwprocno;
int trycounter;
- uint32 local_buf_state; /* to avoid repeated (de-)referencing */
*from_ring = false;
@@ -228,44 +230,79 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
trycounter = NBuffers;
for (;;)
{
+ uint32 old_buf_state;
+ uint32 local_buf_state;
+
buf = GetBufferDescriptor(ClockSweepTick());
/*
- * If the buffer is pinned or has a nonzero usage_count, we cannot use
- * it; decrement the usage_count (unless pinned) and keep scanning.
+ * Check whether the buffer can be used and pin it if so. Do this
+ * using a CAS loop, to avoid having to lock the buffer header.
*/
- local_buf_state = LockBufHdr(buf);
-
- if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0)
+ old_buf_state = pg_atomic_read_u32(&buf->state);
+ for (;;)
{
+ local_buf_state = old_buf_state;
+
+ /*
+ * If the buffer is pinned or has a nonzero usage_count, we cannot
+ * use it; decrement the usage_count (unless pinned) and keep
+ * scanning.
+ */
+
+ if (BUF_STATE_GET_REFCOUNT(local_buf_state) != 0)
+ {
+ if (--trycounter == 0)
+ {
+ /*
+ * We've scanned all the buffers without making any state
+ * changes, so all the buffers are pinned (or were when we
+ * looked at them). We could hope that someone will free
+ * one eventually, but it's probably better to fail than
+ * to risk getting stuck in an infinite loop.
+ */
+ elog(ERROR, "no unpinned buffers available");
+ }
+ break;
+ }
+
+ if (unlikely(local_buf_state & BM_LOCKED))
+ {
+ old_buf_state = WaitBufHdrUnlocked(buf);
+ continue;
+ }
+
if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0)
{
local_buf_state -= BUF_USAGECOUNT_ONE;
- trycounter = NBuffers;
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ local_buf_state))
+ {
+ trycounter = NBuffers;
+ break;
+ }
}
else
{
- /* Found a usable buffer */
- if (strategy != NULL)
- AddBufferToRing(strategy, buf);
- *buf_state = local_buf_state;
- return buf;
+ /* pin the buffer if the CAS succeeds */
+ local_buf_state += BUF_REFCOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ local_buf_state))
+ {
+ /* Found a usable buffer */
+ if (strategy != NULL)
+ AddBufferToRing(strategy, buf);
+ *buf_state = local_buf_state;
+
+ TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
+
+ return buf;
+ }
}
+
}
- else if (--trycounter == 0)
- {
- /*
- * We've scanned all the buffers without making any state changes,
- * so all the buffers are pinned (or were when we looked at them).
- * We could hope that someone will free one eventually, but it's
- * probably better to fail than to risk getting stuck in an
- * infinite loop.
- */
- UnlockBufHdr(buf, local_buf_state);
- elog(ERROR, "no unpinned buffers available");
- }
- UnlockBufHdr(buf, local_buf_state);
}
}
@@ -614,13 +651,15 @@ FreeAccessStrategy(BufferAccessStrategy strategy)
* GetBufferFromRing -- returns a buffer from the ring, or NULL if the
* ring is empty / not usable.
*
- * The bufhdr spin lock is held on the returned buffer.
+ * The buffer is pinned and marked as owned, using TrackNewBufferPin(), before
+ * returning.
*/
static BufferDesc *
GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
{
BufferDesc *buf;
Buffer bufnum;
+ uint32 old_buf_state;
uint32 local_buf_state; /* to avoid repeated (de-)referencing */
@@ -637,24 +676,48 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
if (bufnum == InvalidBuffer)
return NULL;
+ buf = GetBufferDescriptor(bufnum - 1);
+
/*
- * If the buffer is pinned we cannot use it under any circumstances.
- *
- * If usage_count is 0 or 1 then the buffer is fair game (we expect 1,
- * since our own previous usage of the ring element would have left it
- * there, but it might've been decremented by clock-sweep since then). A
- * higher usage_count indicates someone else has touched the buffer, so we
- * shouldn't re-use it.
+ * Check whether the buffer can be used and pin it if so. Do this using a
+ * CAS loop, to avoid having to lock the buffer header.
*/
- buf = GetBufferDescriptor(bufnum - 1);
- local_buf_state = LockBufHdr(buf);
- if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
- && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1)
+ old_buf_state = pg_atomic_read_u32(&buf->state);
+ for (;;)
{
- *buf_state = local_buf_state;
- return buf;
+ local_buf_state = old_buf_state;
+
+ /*
+ * If the buffer is pinned we cannot use it under any circumstances.
+ *
+ * If usage_count is 0 or 1 then the buffer is fair game (we expect 1,
+ * since our own previous usage of the ring element would have left it
+ * there, but it might've been decremented by clock-sweep since then).
+ * A higher usage_count indicates someone else has touched the buffer,
+ * so we shouldn't re-use it.
+ */
+ if (BUF_STATE_GET_REFCOUNT(local_buf_state) != 0
+ || BUF_STATE_GET_USAGECOUNT(local_buf_state) > 1)
+ break;
+
+ if (unlikely(local_buf_state & BM_LOCKED))
+ {
+ old_buf_state = WaitBufHdrUnlocked(buf);
+ continue;
+ }
+
+ /* pin the buffer if the CAS succeeds */
+ local_buf_state += BUF_REFCOUNT_ONE;
+
+ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
+ local_buf_state))
+ {
+ *buf_state = local_buf_state;
+
+ TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
+ return buf;
+ }
}
- UnlockBufHdr(buf, local_buf_state);
/*
* Tell caller to allocate a new buffer with the normal allocation