diff options
author | Vadim B. Mikheev <vadim4o@yahoo.com> | 1999-05-07 01:23:11 +0000 |
---|---|---|
committer | Vadim B. Mikheev <vadim4o@yahoo.com> | 1999-05-07 01:23:11 +0000 |
commit | 122abf3af3e0519cfddcfdfd0dfb8eb0c4aca61c (patch) | |
tree | df5da77d26dc9f7337be57fe614030ee6a7b0330 /src/backend/storage/lmgr/lock.c | |
parent | 86bc1da2628322c25190a15f1b6a433237aa1a45 (diff) |
Fix LMGR for MVCC.
Get rid of Extend lock mode.
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 251 |
1 files changed, 113 insertions, 138 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index c39138ff129..c1996fe0713 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.49 1999/04/30 17:03:04 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.50 1999/05/07 01:23:03 vadim Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -50,8 +50,7 @@ #include "utils/trace.h" #include "utils/ps_status.h" -static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, - TransactionId xid); +static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); /* * lockDebugRelation can be used to trace unconditionally a single relation, @@ -143,12 +142,14 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, #endif /* !LOCK_MGR_DEBUG */ static char *lock_types[] = { - "", - "WRITE", - "READ", - "WRITE INTENT", - "READ INTENT", - "EXTEND" + "INVALID", + "AccessShareLock", + "RowShareLock", + "RowExclusiveLock", + "ShareLock", + "ShareRowExclusiveLock", + "ExclusiveLock", + "AccessExclusiveLock" }; SPINLOCK LockMgrLock; /* in Shmem or created in @@ -631,12 +632,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) /* -------------------- * If I'm the only one holding a lock, then there - * cannot be a conflict. Need to subtract one from the - * lock's count since we just bumped the count up by 1 - * above. + * cannot be a conflict. The same is true if we already + * hold this lock. * -------------------- */ - if (result->nHolding == lock->nActive) + if (result->nHolding == lock->nActive || result->holders[lockmode] != 0) { result->holders[lockmode]++; result->nHolding++; @@ -647,7 +647,39 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) return TRUE; } - status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result); + /* + * If lock requested conflicts with locks requested by waiters... + */ + if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) + { + int i = 1; + + /* + * If I don't hold locks or my locks don't conflict + * with waiters then force to sleep. + */ + if (result->nHolding > 0) + { + for ( ; i <= lockMethodTable->ctl->numLockModes; i++) + { + if (result->holders[i] > 0 && + lockMethodTable->ctl->conflictTab[i] & lock->waitMask) + break; /* conflict */ + } + } + + if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes) + { + XID_PRINT("LockAcquire: higher priority proc waiting", + result); + status = STATUS_FOUND; + } + else + status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result); + } + else + status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result); + if (status == STATUS_OK) GrantLock(lock, lockmode); else if (status == STATUS_FOUND) @@ -680,7 +712,25 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) return FALSE; } #endif - status = WaitOnLock(lockmethod, lock, lockmode, xid); + /* + * Construct bitmask of locks we hold before going to sleep. + */ + MyProc->holdLock = 0; + if (result->nHolding > 0) + { + int i, + tmpMask = 2; + + for (i = 1; i <= lockMethodTable->ctl->numLockModes; + i++, tmpMask <<= 1) + { + if (result->holders[i] > 0) + MyProc->holdLock |= tmpMask; + } + Assert(MyProc->holdLock != 0); + } + + status = WaitOnLock(lockmethod, lock, lockmode); /* * Check the xid entry status, in case something in the ipc @@ -712,10 +762,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) * determining whether or not any new lock acquired conflicts with * the old ones. * - * For example, if I am already holding a WRITE_INTENT lock, - * there will not be a conflict with my own READ_LOCK. If I - * don't consider the intent lock when checking for conflicts, - * I find no conflict. * ---------------------------- */ int @@ -812,32 +858,6 @@ LockResolveConflicts(LOCKMETHOD lockmethod, } Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0)); - /* - * We can control runtime this option. Default is lockReadPriority=0 - */ - if (!lockReadPriority) - { - /* ------------------------ - * If someone with a greater priority is waiting for the lock, - * do not continue and share the lock, even if we can. - * Don't do this if the process already has some locks, because - * this could hold up other people waiting on our locks, causing - * a priority inversion. bjm - * ------------------------ - */ - int myprio = LockMethodTable[lockmethod]->ctl->prio[lockmode]; - PROC_QUEUE *waitQueue = &(lock->waitProcs); - PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev); - - if (SHMQueueEmpty(&MyProc->lockQueue) && waitQueue->size && - topproc->prio > myprio) - { - XID_PRINT("LockResolveConflicts: higher priority proc waiting", - result); - return STATUS_FOUND; - } - } - /* ---------------------------- * first check for global conflicts: If no locks conflict * with mine, then I get the lock. @@ -909,12 +929,10 @@ GrantLock(LOCK *lock, LOCKMODE lockmode) } static int -WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, - TransactionId xid) +WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode) { PROC_QUEUE *waitQueue = &(lock->waitProcs); LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod]; - int prio = lockMethodTable->ctl->prio[lockmode]; char old_status[64], new_status[64]; @@ -934,11 +952,9 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, strcat(new_status, " waiting"); PS_SET_STATUS(new_status); if (ProcSleep(waitQueue, - lockMethodTable->ctl->masterLock, + lockMethodTable->ctl, lockmode, - prio, - lock, - xid) != NO_ERROR) + lock) != NO_ERROR) { /* ------------------- * This could have happend as a result of a deadlock, @@ -952,12 +968,16 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode); Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0)); Assert(lock->nActive <= lock->nHolding); + if (lock->activeHolders[lockmode] == lock->holders[lockmode]) + lock->waitMask &= BITS_OFF[lockmode]; SpinRelease(lockMethodTable->ctl->masterLock); elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction"); /* not reached */ } + if (lock->activeHolders[lockmode] == lock->holders[lockmode]) + lock->waitMask &= BITS_OFF[lockmode]; PS_SET_STATUS(old_status); LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode); return STATUS_OK; @@ -1129,6 +1149,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) lock->nActive--; lock->activeHolders[lockmode]--; +#ifdef NOT_USED /* -------------------------- * If there are still active locks of the type I just released, no one * should be woken up. Whoever is asleep will still conflict @@ -1138,6 +1159,19 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) if (lock->activeHolders[lockmode]) wakeupNeeded = false; else +#endif + /* + * Above is not valid any more (due to MVCC lock modes). + * Actually we should compare activeHolders[lockmode] with + * number of waiters holding lock of this type and try to + * wakeup only if these numbers are equal (and lock released + * conflicts with locks requested by waiters). For the moment + * we only check the last condition. + */ + if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) + wakeupNeeded = true; + + if (!(lock->activeHolders[lockmode])) { /* change the conflict mask. No more of this lock type. */ lock->mask &= BITS_OFF[lockmode]; @@ -1199,12 +1233,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) if (wakeupNeeded) { - /* -------------------------- - * Wake the first waiting process and grant him the lock if it - * doesn't conflict. The woken process must record the lock - * himself. - * -------------------------- - */ ProcLockWakeup(&(lock->waitProcs), lockmethod, lock); } else @@ -1275,6 +1303,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) for (;;) { + bool wakeupNeeded = false; /* * Sometimes the queue appears to be messed up. @@ -1380,6 +1409,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) &&(lock->activeHolders[i] >= 0)); if (!lock->activeHolders[i]) lock->mask &= BITS_OFF[i]; + /* + * Read comments in LockRelease + */ + if (!wakeupNeeded && xidLook->holders[i] > 0 && + lockMethodTable->ctl->conflictTab[i] & lock->waitMask) + wakeupNeeded = true; } lock->nHolding -= xidLook->nHolding; lock->nActive -= xidLook->nHolding; @@ -1444,14 +1479,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) return FALSE; } } - else + else if (wakeupNeeded) { - /* -------------------- - * Wake the first waiting process and grant him the lock if it - * doesn't conflict. The woken process must record the lock - * him/herself. - * -------------------- - */ waitQueue = &(lock->waitProcs); ProcLockWakeup(waitQueue, lockmethod, lock); } @@ -1534,46 +1563,22 @@ LockingDisabled() bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) { - int done; - XIDLookupEnt *xidLook = NULL; - XIDLookupEnt *tmp = NULL; - SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); - LOCK *lock; + int done; + XIDLookupEnt *xidLook = NULL; + XIDLookupEnt *tmp = NULL; + SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); + LOCK *lock; - LOCKMETHODTABLE *lockMethodTable; - XIDLookupEnt *result, - item; - HTAB *xidTable; - bool found; - - static PROC *checked_procs[MAXBACKENDS]; - static int nprocs; - static bool MyNHolding; + static PROC *checked_procs[MAXBACKENDS]; + static int nprocs; /* initialize at start of recursion */ if (skip_check) { checked_procs[0] = MyProc; nprocs = 1; - - lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD]; - xidTable = lockMethodTable->xidHash; - - MemSet(&item, 0, XID_TAGSIZE); - TransactionIdStore(MyProc->xid, &item.tag.xid); - item.tag.lock = MAKE_OFFSET(findlock); -#ifdef NOT_USED - item.tag.pid = pid; -#endif - - if (!(result = (XIDLookupEnt *) - hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found) - { - elog(NOTICE, "LockAcquire: xid table corrupted"); - return true; - } - MyNHolding = result->nHolding; } + if (SHMQueueEmpty(lockQueue)) return false; @@ -1583,12 +1588,6 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) for (;;) { - /* --------------------------- - * XXX Here we assume the shared memory queue is circular and - * that we know its internal structure. Should have some sort of - * macros to allow one to walk it. mer 20 July 1991 - * --------------------------- - */ done = (xidLook->queue.next == end); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); @@ -1613,45 +1612,21 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) proc = (PROC *) MAKE_PTR(waitQueue->links.prev); for (i = 0; i < waitQueue->size; i++) { + /* + * If I hold some locks on findlock and another proc + * waits on it holding locks too - check if we are + * waiting one another. + */ if (proc != MyProc && - lock == findlock && /* skip_check also true */ - MyNHolding) /* I already hold some lock on it */ + lock == findlock && /* skip_check also true */ + MyProc->holdLock) { - - /* - * For findlock's wait queue, we are interested in - * procs who are blocked waiting for a write-lock on - * the table we are waiting on, and already hold a - * lock on it. We first check to see if there is an - * escalation deadlock, where we hold a readlock and - * want a writelock, and someone else holds readlock - * on the same table, and wants a writelock. - * - * Basically, the test is, "Do we both hold some lock on - * findlock, and we are both waiting in the lock - * queue?" bjm - */ + LOCKMETHODCTL *lockctl = + LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; Assert(skip_check); - Assert(MyProc->prio >= 2); - - lockMethodTable = LockMethodTable[1]; - xidTable = lockMethodTable->xidHash; - - MemSet(&item, 0, XID_TAGSIZE); - TransactionIdStore(proc->xid, &item.tag.xid); - item.tag.lock = MAKE_OFFSET(findlock); -#ifdef NOT_USED - item.tag.pid = pid; -#endif - - if (!(result = (XIDLookupEnt *) - hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found) - { - elog(NOTICE, "LockAcquire: xid table corrupted"); - return true; - } - if (result->nHolding) + if (lockctl->conflictTab[MyProc->token] & proc->holdLock && + lockctl->conflictTab[proc->token] & MyProc->holdLock) return true; } |