summaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/buffer/README44
-rw-r--r--src/backend/storage/buffer/buf_init.c9
-rw-r--r--src/backend/storage/buffer/bufmgr.c35
-rw-r--r--src/backend/storage/buffer/freelist.c127
-rw-r--r--src/backend/storage/buffer/localbuf.c2
-rw-r--r--src/backend/storage/ipc/dsm_registry.c12
-rw-r--r--src/backend/storage/lmgr/lwlock.c200
7 files changed, 130 insertions, 299 deletions
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index a182fcd660c..119f31b5d65 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -128,11 +128,11 @@ independently. If it is necessary to lock more than one partition at a time,
they must be locked in partition-number order to avoid risk of deadlock.
* A separate system-wide spinlock, buffer_strategy_lock, provides mutual
-exclusion for operations that access the buffer free list or select
-buffers for replacement. A spinlock is used here rather than a lightweight
-lock for efficiency; no other locks of any sort should be acquired while
-buffer_strategy_lock is held. This is essential to allow buffer replacement
-to happen in multiple backends with reasonable concurrency.
+exclusion for operations that select buffers for replacement. A spinlock is
+used here rather than a lightweight lock for efficiency; no other locks of any
+sort should be acquired while buffer_strategy_lock is held. This is essential
+to allow buffer replacement to happen in multiple backends with reasonable
+concurrency.
* Each buffer header contains a spinlock that must be taken when examining
or changing fields of that buffer header. This allows operations such as
@@ -158,18 +158,8 @@ unset by sleeping on the buffer's condition variable.
Normal Buffer Replacement Strategy
----------------------------------
-There is a "free list" of buffers that are prime candidates for replacement.
-In particular, buffers that are completely free (contain no valid page) are
-always in this list. We could also throw buffers into this list if we
-consider their pages unlikely to be needed soon; however, the current
-algorithm never does that. The list is singly-linked using fields in the
-buffer headers; we maintain head and tail pointers in global variables.
-(Note: although the list links are in the buffer headers, they are
-considered to be protected by the buffer_strategy_lock, not the buffer-header
-spinlocks.) To choose a victim buffer to recycle when there are no free
-buffers available, we use a simple clock-sweep algorithm, which avoids the
-need to take system-wide locks during common operations. It works like
-this:
+To choose a victim buffer to recycle we use a simple clock-sweep algorithm. It
+works like this:
Each buffer header contains a usage counter, which is incremented (up to a
small limit value) whenever the buffer is pinned. (This requires only the
@@ -184,20 +174,14 @@ The algorithm for a process that needs to obtain a victim buffer is:
1. Obtain buffer_strategy_lock.
-2. If buffer free list is nonempty, remove its head buffer. Release
-buffer_strategy_lock. If the buffer is pinned or has a nonzero usage count,
-it cannot be used; ignore it go back to step 1. Otherwise, pin the buffer,
-and return it.
+2. Select the buffer pointed to by nextVictimBuffer, and circularly advance
+nextVictimBuffer for next time. Release buffer_strategy_lock.
-3. Otherwise, the buffer free list is empty. Select the buffer pointed to by
-nextVictimBuffer, and circularly advance nextVictimBuffer for next time.
-Release buffer_strategy_lock.
-
-4. If the selected buffer is pinned or has a nonzero usage count, it cannot
+3. If the selected buffer is pinned or has a nonzero usage count, it cannot
be used. Decrement its usage count (if nonzero), reacquire
buffer_strategy_lock, and return to step 3 to examine the next buffer.
-5. Pin the selected buffer, and return.
+4. Pin the selected buffer, and return.
(Note that if the selected buffer is dirty, we will have to write it out
before we can recycle it; if someone else pins the buffer meanwhile we will
@@ -211,9 +195,9 @@ Buffer Ring Replacement Strategy
When running a query that needs to access a large number of pages just once,
such as VACUUM or a large sequential scan, a different strategy is used.
A page that has been touched only by such a scan is unlikely to be needed
-again soon, so instead of running the normal clock sweep algorithm and
+again soon, so instead of running the normal clock-sweep algorithm and
blowing out the entire buffer cache, a small ring of buffers is allocated
-using the normal clock sweep algorithm and those buffers are reused for the
+using the normal clock-sweep algorithm and those buffers are reused for the
whole scan. This also implies that much of the write traffic caused by such
a statement will be done by the backend itself and not pushed off onto other
processes.
@@ -234,7 +218,7 @@ the ring strategy effectively degrades to the normal strategy.
VACUUM uses a ring like sequential scans, however, the size of this ring is
controlled by the vacuum_buffer_usage_limit GUC. Dirty pages are not removed
-from the ring. Instead, WAL is flushed if needed to allow reuse of the
+from the ring. Instead, the WAL is flushed if needed to allow reuse of the
buffers. Before introducing the buffer ring strategy in 8.3, VACUUM's buffers
were sent to the freelist, which was effectively a buffer ring of 1 buffer,
resulting in excessive WAL flushing.
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index ed1dc488a42..6fd3a6bbac5 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -128,20 +128,11 @@ BufferManagerShmemInit(void)
pgaio_wref_clear(&buf->io_wref);
- /*
- * Initially link all the buffers together as unused. Subsequent
- * management of this list is done by freelist.c.
- */
- buf->freeNext = i + 1;
-
LWLockInitialize(BufferDescriptorGetContentLock(buf),
LWTRANCHE_BUFFER_CONTENT);
ConditionVariableInit(BufferDescriptorGetIOCV(buf));
}
-
- /* Correct last entry of linked list */
- GetBufferDescriptor(NBuffers - 1)->freeNext = FREENEXT_END_OF_LIST;
}
/* Init other shared buffer-management stuff */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 350cc0402aa..fe470de63f2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2094,12 +2094,6 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
UnpinBuffer(victim_buf_hdr);
- /*
- * The victim buffer we acquired previously is clean and unused, let
- * it be found again quickly
- */
- StrategyFreeBuffer(victim_buf_hdr);
-
/* remaining code should match code at top of routine */
existing_buf_hdr = GetBufferDescriptor(existing_buf_id);
@@ -2158,8 +2152,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
}
/*
- * InvalidateBuffer -- mark a shared buffer invalid and return it to the
- * freelist.
+ * InvalidateBuffer -- mark a shared buffer invalid.
*
* The buffer header spinlock must be held at entry. We drop it before
* returning. (This is sane because the caller must have locked the
@@ -2257,11 +2250,6 @@ retry:
* Done with mapping lock.
*/
LWLockRelease(oldPartitionLock);
-
- /*
- * Insert the buffer at the head of the list of free buffers.
- */
- StrategyFreeBuffer(buf);
}
/*
@@ -2679,11 +2667,6 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
{
BufferDesc *buf_hdr = GetBufferDescriptor(buffers[i] - 1);
- /*
- * The victim buffer we acquired previously is clean and unused,
- * let it be found again quickly
- */
- StrategyFreeBuffer(buf_hdr);
UnpinBuffer(buf_hdr);
}
@@ -2756,12 +2739,6 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
valid = PinBuffer(existing_hdr, strategy);
LWLockRelease(partition_lock);
-
- /*
- * The victim buffer we acquired previously is clean and unused,
- * let it be found again quickly
- */
- StrategyFreeBuffer(victim_buf_hdr);
UnpinBuffer(victim_buf_hdr);
buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
@@ -3608,7 +3585,7 @@ BufferSync(int flags)
* This is called periodically by the background writer process.
*
* Returns true if it's appropriate for the bgwriter process to go into
- * low-power hibernation mode. (This happens if the strategy clock sweep
+ * low-power hibernation mode. (This happens if the strategy clock-sweep
* has been "lapped" and no buffer allocations have occurred recently,
* or if the bgwriter has been effectively disabled by setting
* bgwriter_lru_maxpages to 0.)
@@ -3658,8 +3635,8 @@ BgBufferSync(WritebackContext *wb_context)
uint32 new_recent_alloc;
/*
- * Find out where the freelist clock sweep currently is, and how many
- * buffer allocations have happened since our last call.
+ * Find out where the clock-sweep currently is, and how many buffer
+ * allocations have happened since our last call.
*/
strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
@@ -3679,8 +3656,8 @@ BgBufferSync(WritebackContext *wb_context)
/*
* Compute strategy_delta = how many buffers have been scanned by the
- * clock sweep since last time. If first time through, assume none. Then
- * see if we are still ahead of the clock sweep, and if so, how many
+ * clock-sweep since last time. If first time through, assume none. Then
+ * see if we are still ahead of the clock-sweep, and if so, how many
* buffers we could scan before we'd catch up with it and "lap" it. Note:
* weird-looking coding of xxx_passes comparisons are to avoid bogus
* behavior when the passes counts wrap around.
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 01909be0272..7d59a92bd1a 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -33,25 +33,17 @@ typedef struct
slock_t buffer_strategy_lock;
/*
- * Clock sweep hand: index of next buffer to consider grabbing. Note that
+ * clock-sweep hand: index of next buffer to consider grabbing. Note that
* this isn't a concrete buffer - we only ever increase the value. So, to
* get an actual buffer, it needs to be used modulo NBuffers.
*/
pg_atomic_uint32 nextVictimBuffer;
- int firstFreeBuffer; /* Head of list of unused buffers */
- int lastFreeBuffer; /* Tail of list of unused buffers */
-
- /*
- * NOTE: lastFreeBuffer is undefined when firstFreeBuffer is -1 (that is,
- * when the list is empty)
- */
-
/*
* Statistics. These counters should be wide enough that they can't
* overflow during a single bgwriter cycle.
*/
- uint32 completePasses; /* Complete cycles of the clock sweep */
+ uint32 completePasses; /* Complete cycles of the clock-sweep */
pg_atomic_uint32 numBufferAllocs; /* Buffers allocated since last reset */
/*
@@ -164,23 +156,6 @@ ClockSweepTick(void)
}
/*
- * have_free_buffer -- a lockless check to see if there is a free buffer in
- * buffer pool.
- *
- * If the result is true that will become stale once free buffers are moved out
- * by other operations, so the caller who strictly want to use a free buffer
- * should not call this.
- */
-bool
-have_free_buffer(void)
-{
- if (StrategyControl->firstFreeBuffer >= 0)
- return true;
- else
- return false;
-}
-
-/*
* StrategyGetBuffer
*
* Called by the bufmgr to get the next candidate buffer to use in
@@ -249,69 +224,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
*/
pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1);
- /*
- * First check, without acquiring the lock, whether there's buffers in the
- * freelist. Since we otherwise don't require the spinlock in every
- * StrategyGetBuffer() invocation, it'd be sad to acquire it here -
- * uselessly in most cases. That obviously leaves a race where a buffer is
- * put on the freelist but we don't see the store yet - but that's pretty
- * harmless, it'll just get used during the next buffer acquisition.
- *
- * If there's buffers on the freelist, acquire the spinlock to pop one
- * buffer of the freelist. Then check whether that buffer is usable and
- * repeat if not.
- *
- * Note that the freeNext fields are considered to be protected by the
- * buffer_strategy_lock not the individual buffer spinlocks, so it's OK to
- * manipulate them without holding the spinlock.
- */
- if (StrategyControl->firstFreeBuffer >= 0)
- {
- while (true)
- {
- /* Acquire the spinlock to remove element from the freelist */
- SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
-
- if (StrategyControl->firstFreeBuffer < 0)
- {
- SpinLockRelease(&StrategyControl->buffer_strategy_lock);
- break;
- }
-
- buf = GetBufferDescriptor(StrategyControl->firstFreeBuffer);
- Assert(buf->freeNext != FREENEXT_NOT_IN_LIST);
-
- /* Unconditionally remove buffer from freelist */
- StrategyControl->firstFreeBuffer = buf->freeNext;
- buf->freeNext = FREENEXT_NOT_IN_LIST;
-
- /*
- * Release the lock so someone else can access the freelist while
- * we check out this buffer.
- */
- SpinLockRelease(&StrategyControl->buffer_strategy_lock);
-
- /*
- * If the buffer is pinned or has a nonzero usage_count, we cannot
- * use it; discard it and retry. (This can only happen if VACUUM
- * put a valid buffer in the freelist and then someone else used
- * it before we got to it. It's probably impossible altogether as
- * of 8.3, but we'd better check anyway.)
- */
- local_buf_state = LockBufHdr(buf);
- if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
- && BUF_STATE_GET_USAGECOUNT(local_buf_state) == 0)
- {
- if (strategy != NULL)
- AddBufferToRing(strategy, buf);
- *buf_state = local_buf_state;
- return buf;
- }
- UnlockBufHdr(buf, local_buf_state);
- }
- }
-
- /* Nothing on the freelist, so run the "clock sweep" algorithm */
+ /* Use the "clock sweep" algorithm to find a free buffer */
trycounter = NBuffers;
for (;;)
{
@@ -357,29 +270,6 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
}
/*
- * StrategyFreeBuffer: put a buffer on the freelist
- */
-void
-StrategyFreeBuffer(BufferDesc *buf)
-{
- SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
-
- /*
- * It is possible that we are told to put something in the freelist that
- * is already in it; don't screw up the list if so.
- */
- if (buf->freeNext == FREENEXT_NOT_IN_LIST)
- {
- buf->freeNext = StrategyControl->firstFreeBuffer;
- if (buf->freeNext < 0)
- StrategyControl->lastFreeBuffer = buf->buf_id;
- StrategyControl->firstFreeBuffer = buf->buf_id;
- }
-
- SpinLockRelease(&StrategyControl->buffer_strategy_lock);
-}
-
-/*
* StrategySyncStart -- tell BgBufferSync where to start syncing
*
* The result is the buffer index of the best buffer to sync first.
@@ -504,14 +394,7 @@ StrategyInitialize(bool init)
SpinLockInit(&StrategyControl->buffer_strategy_lock);
- /*
- * Grab the whole linked list of free buffers for our strategy. We
- * assume it was previously set up by BufferManagerShmemInit().
- */
- StrategyControl->firstFreeBuffer = 0;
- StrategyControl->lastFreeBuffer = NBuffers - 1;
-
- /* Initialize the clock sweep pointer */
+ /* Initialize the clock-sweep pointer */
pg_atomic_init_u32(&StrategyControl->nextVictimBuffer, 0);
/* Clear statistics */
@@ -759,7 +642,7 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
*
* If usage_count is 0 or 1 then the buffer is fair game (we expect 1,
* since our own previous usage of the ring element would have left it
- * there, but it might've been decremented by clock sweep since then). A
+ * there, but it might've been decremented by clock-sweep since then). A
* higher usage_count indicates someone else has touched the buffer, so we
* shouldn't re-use it.
*/
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 3c0d20f4659..04fef13409b 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -229,7 +229,7 @@ GetLocalVictimBuffer(void)
ResourceOwnerEnlarge(CurrentResourceOwner);
/*
- * Need to get a new buffer. We use a clock sweep algorithm (essentially
+ * Need to get a new buffer. We use a clock-sweep algorithm (essentially
* the same as what freelist.c does now...)
*/
trycounter = NLocBuffer;
diff --git a/src/backend/storage/ipc/dsm_registry.c b/src/backend/storage/ipc/dsm_registry.c
index ca12815f4a8..97130925106 100644
--- a/src/backend/storage/ipc/dsm_registry.c
+++ b/src/backend/storage/ipc/dsm_registry.c
@@ -299,8 +299,7 @@ GetNamedDSA(const char *name, bool *found)
entry->type = DSMR_ENTRY_TYPE_DSA;
/* Initialize the LWLock tranche for the DSA. */
- state->tranche = LWLockNewTrancheId();
- LWLockRegisterTranche(state->tranche, name);
+ state->tranche = LWLockNewTrancheId(name);
/* Initialize the DSA. */
ret = dsa_create(state->tranche);
@@ -321,9 +320,6 @@ GetNamedDSA(const char *name, bool *found)
ereport(ERROR,
(errmsg("requested DSA already attached to current process")));
- /* Initialize existing LWLock tranche for the DSA. */
- LWLockRegisterTranche(state->tranche, name);
-
/* Attach to existing DSA. */
ret = dsa_attach(state->handle);
dsa_pin_mapping(ret);
@@ -378,8 +374,7 @@ GetNamedDSHash(const char *name, const dshash_parameters *params, bool *found)
entry->type = DSMR_ENTRY_TYPE_DSH;
/* Initialize the LWLock tranche for the hash table. */
- dsh_state->tranche = LWLockNewTrancheId();
- LWLockRegisterTranche(dsh_state->tranche, name);
+ dsh_state->tranche = LWLockNewTrancheId(name);
/* Initialize the DSA for the hash table. */
dsa = dsa_create(dsh_state->tranche);
@@ -409,9 +404,6 @@ GetNamedDSHash(const char *name, const dshash_parameters *params, bool *found)
ereport(ERROR,
(errmsg("requested DSHash already attached to current process")));
- /* Initialize existing LWLock tranche for the hash table. */
- LWLockRegisterTranche(dsh_state->tranche, name);
-
/* Attach to existing DSA for the hash table. */
dsa = dsa_attach(dsh_state->dsa_handle);
dsa_pin_mapping(dsa);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index a4aecd1fbc3..fcbac5213a5 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -126,8 +126,8 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0,
* in lwlocklist.h. We absorb the names of these tranches, too.
*
* 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
- * or LWLockRegisterTranche. The names of these that are known in the current
- * process appear in LWLockTrancheNames[].
+ * or LWLockNewTrancheId. These names are stored in shared memory and can be
+ * accessed via LWLockTrancheNames.
*
* All these names are user-visible as wait event names, so choose with care
* ... and do not forget to update the documentation's list of wait events.
@@ -146,11 +146,12 @@ StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
/*
* This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
- * stores the names of all dynamically-created tranches known to the current
- * process. Any unused entries in the array will contain NULL.
+ * points to the shared memory locations of the names of all
+ * dynamically-created tranches. Backends inherit the pointer by fork from the
+ * postmaster (except in the EXEC_BACKEND case, where we have special measures
+ * to pass it down).
*/
-static const char **LWLockTrancheNames = NULL;
-static int LWLockTrancheNamesAllocated = 0;
+char **LWLockTrancheNames = NULL;
/*
* This points to the main array of LWLocks in shared memory. Backends inherit
@@ -184,20 +185,22 @@ typedef struct NamedLWLockTrancheRequest
} NamedLWLockTrancheRequest;
static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
-static int NamedLWLockTrancheRequestsAllocated = 0;
/*
- * NamedLWLockTrancheRequests is both the valid length of the request array,
- * and the length of the shared-memory NamedLWLockTrancheArray later on.
- * This variable and NamedLWLockTrancheArray are non-static so that
- * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
+ * NamedLWLockTrancheRequests is the valid length of the request array. This
+ * variable is non-static so that postmaster.c can copy them to child processes
+ * in EXEC_BACKEND builds.
*/
int NamedLWLockTrancheRequests = 0;
-/* points to data in shared memory: */
-NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
+/* shared memory counter of registered tranches */
int *LWLockCounter = NULL;
+/* backend-local counter of registered tranches */
+static int LocalLWLockCounter;
+
+#define MAX_NAMED_TRANCHES 256
+
static void InitializeLWLocks(void);
static inline void LWLockReportWaitStart(LWLock *lock);
static inline void LWLockReportWaitEnd(void);
@@ -392,31 +395,28 @@ Size
LWLockShmemSize(void)
{
Size size;
- int i;
int numLocks = NUM_FIXED_LWLOCKS;
/* Calculate total number of locks needed in the main array. */
numLocks += NumLWLocksForNamedTranches();
- /* Space for dynamic allocation counter, plus room for alignment. */
- size = sizeof(int) + LWLOCK_PADDED_SIZE;
+ /* Space for dynamic allocation counter. */
+ size = MAXALIGN(sizeof(int));
- /* Space for the LWLock array. */
- size = add_size(size, mul_size(numLocks, sizeof(LWLockPadded)));
-
- /* space for named tranches. */
- size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
+ /* Space for named tranches. */
+ size = add_size(size, mul_size(MAX_NAMED_TRANCHES, sizeof(char *)));
+ size = add_size(size, mul_size(MAX_NAMED_TRANCHES, NAMEDATALEN));
- /* space for name of each tranche. */
- for (i = 0; i < NamedLWLockTrancheRequests; i++)
- size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
+ /* Space for the LWLock array, plus room for cache line alignment. */
+ size = add_size(size, LWLOCK_PADDED_SIZE);
+ size = add_size(size, mul_size(numLocks, sizeof(LWLockPadded)));
return size;
}
/*
* Allocate shmem space for the main LWLock array and all tranches and
- * initialize it. We also register extension LWLock tranches here.
+ * initialize it.
*/
void
CreateLWLocks(void)
@@ -432,7 +432,16 @@ CreateLWLocks(void)
/* Initialize the dynamic-allocation counter for tranches */
LWLockCounter = (int *) ptr;
*LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
- ptr += sizeof(int);
+ ptr += MAXALIGN(sizeof(int));
+
+ /* Initialize tranche names */
+ LWLockTrancheNames = (char **) ptr;
+ ptr += MAX_NAMED_TRANCHES * sizeof(char *);
+ for (int i = 0; i < MAX_NAMED_TRANCHES; i++)
+ {
+ LWLockTrancheNames[i] = ptr;
+ ptr += NAMEDATALEN;
+ }
/* Ensure desired alignment of LWLock array */
ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
@@ -441,11 +450,6 @@ CreateLWLocks(void)
/* Initialize all LWLocks */
InitializeLWLocks();
}
-
- /* Register named extension LWLock tranches in the current process. */
- for (int i = 0; i < NamedLWLockTrancheRequests; i++)
- LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
- NamedLWLockTrancheArray[i].trancheName);
}
/*
@@ -454,7 +458,6 @@ CreateLWLocks(void)
static void
InitializeLWLocks(void)
{
- int numNamedLocks = NumLWLocksForNamedTranches();
int id;
int i;
int j;
@@ -485,32 +488,18 @@ InitializeLWLocks(void)
*/
if (NamedLWLockTrancheRequests > 0)
{
- char *trancheNames;
-
- NamedLWLockTrancheArray = (NamedLWLockTranche *)
- &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
-
- trancheNames = (char *) NamedLWLockTrancheArray +
- (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
for (i = 0; i < NamedLWLockTrancheRequests; i++)
{
NamedLWLockTrancheRequest *request;
- NamedLWLockTranche *tranche;
- char *name;
+ int tranche;
request = &NamedLWLockTrancheRequestArray[i];
- tranche = &NamedLWLockTrancheArray[i];
-
- name = trancheNames;
- trancheNames += strlen(request->tranche_name) + 1;
- strcpy(name, request->tranche_name);
- tranche->trancheId = LWLockNewTrancheId();
- tranche->trancheName = name;
+ tranche = LWLockNewTrancheId(request->tranche_name);
for (j = 0; j < request->num_lwlocks; j++, lock++)
- LWLockInitialize(&lock->lock, tranche->trancheId);
+ LWLockInitialize(&lock->lock, tranche);
}
}
}
@@ -562,59 +551,47 @@ GetNamedLWLockTranche(const char *tranche_name)
}
/*
- * Allocate a new tranche ID.
+ * Allocate a new tranche ID with the provided name.
*/
int
-LWLockNewTrancheId(void)
+LWLockNewTrancheId(const char *name)
{
int result;
- /* We use the ShmemLock spinlock to protect LWLockCounter */
- SpinLockAcquire(ShmemLock);
- result = (*LWLockCounter)++;
- SpinLockRelease(ShmemLock);
+ if (!name)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("tranche name cannot be NULL")));
- return result;
-}
+ if (strlen(name) >= NAMEDATALEN)
+ ereport(ERROR,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("tranche name too long"),
+ errdetail("LWLock tranche names must be no longer than %d bytes.",
+ NAMEDATALEN - 1)));
-/*
- * Register a dynamic tranche name in the lookup table of the current process.
- *
- * This routine will save a pointer to the tranche name passed as an argument,
- * so the name should be allocated in a backend-lifetime context
- * (shared memory, TopMemoryContext, static constant, or similar).
- *
- * The tranche name will be user-visible as a wait event name, so try to
- * use a name that fits the style for those.
- */
-void
-LWLockRegisterTranche(int tranche_id, const char *tranche_name)
-{
- /* This should only be called for user-defined tranches. */
- if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED)
- return;
-
- /* Convert to array index. */
- tranche_id -= LWTRANCHE_FIRST_USER_DEFINED;
+ /*
+ * We use the ShmemLock spinlock to protect LWLockCounter and
+ * LWLockTrancheNames.
+ */
+ SpinLockAcquire(ShmemLock);
- /* If necessary, create or enlarge array. */
- if (tranche_id >= LWLockTrancheNamesAllocated)
+ if (*LWLockCounter - LWTRANCHE_FIRST_USER_DEFINED >= MAX_NAMED_TRANCHES)
{
- int newalloc;
+ SpinLockRelease(ShmemLock);
+ ereport(ERROR,
+ (errmsg("maximum number of tranches already registered"),
+ errdetail("No more than %d tranches may be registered.",
+ MAX_NAMED_TRANCHES)));
+ }
- newalloc = pg_nextpower2_32(Max(8, tranche_id + 1));
+ result = (*LWLockCounter)++;
+ LocalLWLockCounter = *LWLockCounter;
+ strlcpy(LWLockTrancheNames[result - LWTRANCHE_FIRST_USER_DEFINED], name, NAMEDATALEN);
- if (LWLockTrancheNames == NULL)
- LWLockTrancheNames = (const char **)
- MemoryContextAllocZero(TopMemoryContext,
- newalloc * sizeof(char *));
- else
- LWLockTrancheNames =
- repalloc0_array(LWLockTrancheNames, const char *, LWLockTrancheNamesAllocated, newalloc);
- LWLockTrancheNamesAllocated = newalloc;
- }
+ SpinLockRelease(ShmemLock);
- LWLockTrancheNames[tranche_id] = tranche_name;
+ return result;
}
/*
@@ -633,10 +610,23 @@ void
RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
{
NamedLWLockTrancheRequest *request;
+ static int NamedLWLockTrancheRequestsAllocated;
if (!process_shmem_requests_in_progress)
elog(FATAL, "cannot request additional LWLocks outside shmem_request_hook");
+ if (!tranche_name)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("tranche name cannot be NULL")));
+
+ if (strlen(tranche_name) >= NAMEDATALEN)
+ ereport(ERROR,
+ (errcode(ERRCODE_NAME_TOO_LONG),
+ errmsg("tranche name too long"),
+ errdetail("LWLock tranche names must be no longer than %d bytes.",
+ NAMEDATALEN - 1)));
+
if (NamedLWLockTrancheRequestArray == NULL)
{
NamedLWLockTrancheRequestsAllocated = 16;
@@ -657,7 +647,6 @@ RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
}
request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
- Assert(strlen(tranche_name) + 1 <= NAMEDATALEN);
strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
request->num_lwlocks = num_lwlocks;
NamedLWLockTrancheRequests++;
@@ -669,6 +658,9 @@ RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
void
LWLockInitialize(LWLock *lock, int tranche_id)
{
+ /* verify the tranche_id is valid */
+ (void) GetLWTrancheName(tranche_id);
+
pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
#ifdef LOCK_DEBUG
pg_atomic_init_u32(&lock->nwaiters, 0);
@@ -710,15 +702,27 @@ GetLWTrancheName(uint16 trancheId)
return BuiltinTrancheNames[trancheId];
/*
- * It's an extension tranche, so look in LWLockTrancheNames[]. However,
- * it's possible that the tranche has never been registered in the current
- * process, in which case give up and return "extension".
+ * We only ever add new entries to LWLockTrancheNames, so most lookups can
+ * avoid taking the spinlock as long as the backend-local counter
+ * (LocalLWLockCounter) is greater than the requested tranche ID. Else,
+ * we need to first update the backend-local counter with ShmemLock held
+ * before attempting the lookup again. In practice, the latter case is
+ * probably rare.
*/
- trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
+ if (trancheId >= LocalLWLockCounter)
+ {
+ SpinLockAcquire(ShmemLock);
+ LocalLWLockCounter = *LWLockCounter;
+ SpinLockRelease(ShmemLock);
- if (trancheId >= LWLockTrancheNamesAllocated ||
- LWLockTrancheNames[trancheId] == NULL)
- return "extension";
+ if (trancheId >= LocalLWLockCounter)
+ elog(ERROR, "tranche %d is not registered", trancheId);
+ }
+
+ /*
+ * It's an extension tranche, so look in LWLockTrancheNames.
+ */
+ trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
return LWLockTrancheNames[trancheId];
}