summaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r--src/backend/storage/lmgr/lock.c55
1 files changed, 52 insertions, 3 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 4f0ad27e6d1..f7e947af936 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
+ * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear
*
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
@@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait)
{
- return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+ return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+ true, NULL);
}
/*
@@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag,
* caller to note that the lock table is full and then begin taking
* extreme action to reduce the number of other lock holders before
* retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
*/
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
- bool reportMemoryError)
+ bool reportMemoryError,
+ LOCALLOCK **locallockp)
{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
@@ -769,6 +775,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
locallock->holdsStrongLockCount = FALSE;
+ locallock->lockCleared = false;
locallock->lockOwners = NULL; /* in case next line fails */
locallock->lockOwners = (LOCALLOCKOWNER *)
MemoryContextAlloc(TopMemoryContext,
@@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
}
hashcode = locallock->hashcode;
+ if (locallockp)
+ *locallockp = locallock;
+
/*
* If we already hold the lock, we can just increase the count locally.
+ *
+ * If lockCleared is already set, caller need not worry about absorbing
+ * sinval messages related to the lock's object.
*/
if (locallock->nLocks > 0)
{
GrantLockLocal(locallock, owner);
- return LOCKACQUIRE_ALREADY_HELD;
+ if (locallock->lockCleared)
+ return LOCKACQUIRE_ALREADY_CLEAR;
+ else
+ return LOCKACQUIRE_ALREADY_HELD;
}
/*
@@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
hashcode))
{
AbortStrongLockAcquire();
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
{
AbortStrongLockAcquire();
LWLockRelease(partitionLock);
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
return LOCKACQUIRE_NOT_AVAIL;
}
@@ -1646,6 +1672,20 @@ GrantAwaitedLock(void)
}
/*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+ Assert(locallock->nLocks > 0);
+ locallock->lockCleared = true;
+}
+
+/*
* WaitOnLock -- wait to acquire a lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
@@ -1912,6 +1952,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return TRUE;
+ /*
+ * At this point we can no longer suppose we are clear of invalidation
+ * messages related to this lock. Although we'll delete the LOCALLOCK
+ * object before any intentional return from this routine, it seems worth
+ * the trouble to explicitly reset lockCleared right now, just in case
+ * some error prevents us from deleting the LOCALLOCK.
+ */
+ locallock->lockCleared = false;
+
/* Attempt fast release of any lock eligible for the fast path. */
if (EligibleForRelationFastPath(locktag, lockmode) &&
FastPathLocalUseCount > 0)