diff options
Diffstat (limited to 'src/backend/storage/lmgr/proc.c')
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 242 |
1 files changed, 139 insertions, 103 deletions
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6b5159dbf04..406aa35f7c6 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -48,7 +48,7 @@ * This is so that we can support more backends. (system-wide semaphore * sets run out pretty fast.) -ay 4/95 * - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $ */ #include "postgres.h" @@ -74,13 +74,14 @@ #include <sys/sem.h> #endif +#include "access/xact.h" #include "storage/proc.h" void HandleDeadLock(SIGNAL_ARGS); static void ProcFreeAllSemaphores(void); -static bool GetOffWaitqueue(PROC *); +static bool GetOffWaitQueue(PROC *); int DeadlockTimeout = 1000; @@ -300,50 +301,76 @@ InitProcess(void) /* ----------------------- * get process off any wait queue it might be on + * + * NB: this does not remove the process' holder object, nor the lock object, + * even though their holder counts might now have gone to zero. That will + * happen during a subsequent LockReleaseAll call, which we expect will happen + * during transaction cleanup. (Removal of a proc from its wait queue by + * this routine can only happen if we are aborting the transaction.) * ----------------------- */ static bool -GetOffWaitqueue(PROC *proc) +GetOffWaitQueue(PROC *proc) { - bool getoffed = false; + bool gotoff = false; LockLockTable(); if (proc->links.next != INVALID_OFFSET) { - int lockmode = proc->token; - LOCK *waitLock = proc->waitLock; + LOCK *waitLock = proc->waitLock; + LOCKMODE lockmode = proc->waitLockMode; + /* Remove proc from lock's wait queue */ Assert(waitLock); Assert(waitLock->waitProcs.size > 0); SHMQueueDelete(&(proc->links)); --waitLock->waitProcs.size; + + /* Undo increments of holder counts by waiting process */ Assert(waitLock->nHolding > 0); Assert(waitLock->nHolding > proc->waitLock->nActive); --waitLock->nHolding; Assert(waitLock->holders[lockmode] > 0); --waitLock->holders[lockmode]; + /* don't forget to clear waitMask bit if appropriate */ if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode]) waitLock->waitMask &= ~(1 << lockmode); - ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock); - getoffed = true; + + /* Clean up the proc's own state */ + SHMQueueElemInit(&(proc->links)); + proc->waitLock = NULL; + proc->waitHolder = NULL; + + /* See if any other waiters can be woken up now */ + ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock); + + gotoff = true; } - SHMQueueElemInit(&(proc->links)); UnlockLockTable(); - return getoffed; + return gotoff; } /* - * ProcReleaseLocks() -- release all locks associated with current transaction + * ProcReleaseLocks() -- release locks associated with current transaction + * at transaction commit or abort + * + * At commit, we release only locks tagged with the current transaction's XID, + * leaving those marked with XID 0 (ie, session locks) undisturbed. At abort, + * we release all locks including XID 0, because we need to clean up after + * a failure. This logic will need extension if we ever support nested + * transactions. * + * Note that user locks are not released in either case. */ void -ProcReleaseLocks() +ProcReleaseLocks(bool isCommit) { if (!MyProc) return; - LockReleaseAll(DEFAULT_LOCKMETHOD, &MyProc->lockQueue); - GetOffWaitqueue(MyProc); + GetOffWaitQueue(MyProc); + LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc, + !isCommit, GetCurrentTransactionId()); } /* @@ -384,47 +411,47 @@ static void ProcKill(int exitStatus, Datum pid) { PROC *proc; - SHMEM_OFFSET location; /* -------------------- * If this is a FATAL exit the postmaster will have to kill all the - * existing backends and reinitialize shared memory. So all we don't + * existing backends and reinitialize shared memory. So we don't * need to do anything here. * -------------------- */ if (exitStatus != 0) return; - ShmemPIDLookup(MyProcPid, &location); - if (location == INVALID_OFFSET) - return; - - proc = (PROC *) MAKE_PTR(location); + if ((int) pid == MyProcPid) + { + proc = MyProc; + MyProc = NULL; + } + else + { + /* This path is dead code at the moment ... */ + SHMEM_OFFSET location = INVALID_OFFSET; - Assert(proc == MyProc || (int)pid != MyProcPid); + ShmemPIDLookup((int) pid, &location); + if (location == INVALID_OFFSET) + return; + proc = (PROC *) MAKE_PTR(location); + } - MyProc = NULL; + Assert(proc); - /* --------------- - * Assume one lock table. - * --------------- - */ + /* Release any spinlocks the proc is holding */ ProcReleaseSpins(proc); - LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue); -#ifdef USER_LOCKS + /* Get the proc off any wait queue it might be on */ + GetOffWaitQueue(proc); - /* - * Assume we have a second lock table. - */ - LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue); -#endif + /* Remove from the standard lock table */ + LockReleaseAll(DEFAULT_LOCKMETHOD, proc, true, InvalidTransactionId); - /* ---------------- - * get off the wait queue - * ---------------- - */ - GetOffWaitqueue(proc); +#ifdef USER_LOCKS + /* Remove from the user lock table */ + LockReleaseAll(USER_LOCKMETHOD, proc, true, InvalidTransactionId); +#endif } /* @@ -488,10 +515,10 @@ SetWaitingForLock(bool waiting) } if (QueryCancel) /* cancel request pending */ { - if (GetOffWaitqueue(MyProc)) + if (GetOffWaitQueue(MyProc)) { lockWaiting = false; - elog(ERROR, "Query cancel requested while waiting lock"); + elog(ERROR, "Query cancel requested while waiting for lock"); } } } @@ -519,8 +546,8 @@ LockWaitCancel(void) set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM); #endif /* __BEOS__ */ - if (GetOffWaitqueue(MyProc)) - elog(ERROR, "Query cancel requested while waiting lock"); + if (GetOffWaitQueue(MyProc)) + elog(ERROR, "Query cancel requested while waiting for lock"); } /* @@ -538,18 +565,19 @@ LockWaitCancel(void) * NOTES: The process queue is now a priority queue for locking. */ int -ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ - LOCKMETHODCTL *lockctl, - int token, /* lockmode */ - LOCK *lock) +ProcSleep(LOCKMETHODCTL *lockctl, + LOCKMODE lockmode, + LOCK *lock, + HOLDER *holder) { - int i; + PROC_QUEUE *waitQueue = &(lock->waitProcs); SPINLOCK spinlock = lockctl->masterLock; - PROC *proc; - int myMask = (1 << token); + int myMask = (1 << lockmode); int waitMask = lock->waitMask; + PROC *proc; + int i; int aheadHolders[MAX_LOCKMODES]; - bool selfConflict = (lockctl->conflictTab[token] & myMask), + bool selfConflict = (lockctl->conflictTab[lockmode] & myMask), prevSame = false; #ifndef __BEOS__ struct itimerval timeval, @@ -558,26 +586,28 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ bigtime_t time_interval; #endif - MyProc->token = token; MyProc->waitLock = lock; + MyProc->waitHolder = holder; + MyProc->waitLockMode = lockmode; + /* We assume the caller set up MyProc->holdLock */ proc = (PROC *) MAKE_PTR(waitQueue->links.prev); /* if we don't conflict with any waiter - be first in queue */ - if (!(lockctl->conflictTab[token] & waitMask)) + if (!(lockctl->conflictTab[lockmode] & waitMask)) goto ins; for (i = 1; i < MAX_LOCKMODES; i++) aheadHolders[i] = lock->activeHolders[i]; - (aheadHolders[token])++; + (aheadHolders[lockmode])++; for (i = 0; i < waitQueue->size; i++) { /* am I waiting for him ? */ - if (lockctl->conflictTab[token] & proc->holdLock) + if (lockctl->conflictTab[lockmode] & proc->holdLock) { /* is he waiting for me ? */ - if (lockctl->conflictTab[proc->token] & MyProc->holdLock) + if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock) { /* Yes, report deadlock failure */ MyProc->errType = STATUS_ERROR; @@ -586,10 +616,10 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ /* being waiting for him - go past */ } /* if he waits for me */ - else if (lockctl->conflictTab[proc->token] & MyProc->holdLock) + else if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock) break; /* if conflicting locks requested */ - else if (lockctl->conflictTab[proc->token] & myMask) + else if (lockctl->conflictTab[proc->waitLockMode] & myMask) { /* @@ -604,13 +634,13 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ * Last attempt to don't move any more: if we don't conflict with * rest waiters in queue. */ - else if (!(lockctl->conflictTab[token] & waitMask)) + else if (!(lockctl->conflictTab[lockmode] & waitMask)) break; - prevSame = (proc->token == token); - (aheadHolders[proc->token])++; - if (aheadHolders[proc->token] == lock->holders[proc->token]) - waitMask &= ~(1 << proc->token); + prevSame = (proc->waitLockMode == lockmode); + (aheadHolders[proc->waitLockMode])++; + if (aheadHolders[proc->waitLockMode] == lock->holders[proc->waitLockMode]) + waitMask &= ~(1 << proc->waitLockMode); proc = (PROC *) MAKE_PTR(proc->links.prev); } @@ -692,10 +722,8 @@ ins:; rt:; -#ifdef LOCK_DEBUG - /* Just to get meaningful debug messages from DumpLocks() */ - MyProc->waitLock = (LOCK *) NULL; -#endif + MyProc->waitLock = NULL; + MyProc->waitHolder = NULL; return MyProc->errType; } @@ -704,7 +732,7 @@ rt:; /* * ProcWakeup -- wake up a process by releasing its private semaphore. * - * remove the process from the wait queue and set its links invalid. + * Also remove the process from the wait queue and set its links invalid. * RETURN: the next process in the wait queue. */ PROC * @@ -720,9 +748,9 @@ ProcWakeup(PROC *proc, int errType) retProc = (PROC *) MAKE_PTR(proc->links.prev); - /* you have to update waitLock->waitProcs.size yourself */ SHMQueueDelete(&(proc->links)); SHMQueueElemInit(&(proc->links)); + (proc->waitLock->waitProcs.size)--; proc->errType = errType; @@ -736,65 +764,70 @@ ProcWakeup(PROC *proc, int errType) * released. */ int -ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) +ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock) { + PROC_QUEUE *queue = &(lock->waitProcs); PROC *proc; - int count = 0; - int last_locktype = 0; + int awoken = 0; + LOCKMODE last_lockmode = 0; int queue_size = queue->size; - Assert(queue->size >= 0); + Assert(queue_size >= 0); - if (!queue->size) + if (!queue_size) return STATUS_NOT_FOUND; proc = (PROC *) MAKE_PTR(queue->links.prev); - while ((queue_size--) && (proc)) - { - /* - * This proc will conflict as the previous one did, don't even - * try. - */ - if (proc->token == last_locktype) - continue; + while (queue_size-- > 0) + { + if (proc->waitLockMode == last_lockmode) + { + /* + * This proc will conflict as the previous one did, don't even + * try. + */ + goto nextProc; + } /* * Does this proc conflict with locks held by others ? */ if (LockResolveConflicts(lockmethod, + proc->waitLockMode, lock, - proc->token, - proc->xid, - (XIDLookupEnt *) NULL) != STATUS_OK) + proc->waitHolder, + proc, + NULL) != STATUS_OK) { - if (count != 0) + /* Yes. Quit if we already awoke at least one process. */ + if (awoken != 0) break; - last_locktype = proc->token; - continue; + /* Otherwise, see if any later waiters can be awoken. */ + last_lockmode = proc->waitLockMode; + goto nextProc; } /* - * there was a waiting process, grant it the lock before waking it - * up. This will prevent another process from seizing the lock - * between the time we release the lock master (spinlock) and the - * time that the awoken process begins executing again. + * OK to wake up this sleeping process. */ - GrantLock(lock, proc->token); + GrantLock(lock, proc->waitHolder, proc->waitLockMode); + proc = ProcWakeup(proc, NO_ERROR); + awoken++; /* - * ProcWakeup removes proc from the lock waiting process queue and - * returns the next proc in chain. + * ProcWakeup removes proc from the lock's waiting process queue + * and returns the next proc in chain; don't use prev link. */ + continue; - count++; - queue->size--; - proc = ProcWakeup(proc, NO_ERROR); +nextProc: + proc = (PROC *) MAKE_PTR(proc->links.prev); } Assert(queue->size >= 0); - if (count) + if (awoken) return STATUS_OK; else { @@ -802,9 +835,10 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) #ifdef LOCK_DEBUG if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) { - elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock)); + elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", + MAKE_OFFSET(lock)); if (Debug_deadlocks) - DumpAllLocks(); + DumpAllLocks(); } #endif return STATUS_NOT_FOUND; @@ -872,10 +906,12 @@ HandleDeadLock(SIGNAL_ARGS) */ mywaitlock = MyProc->waitLock; Assert(mywaitlock->waitProcs.size > 0); - lockWaiting = false; --mywaitlock->waitProcs.size; SHMQueueDelete(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links)); + MyProc->waitLock = NULL; + MyProc->waitHolder = NULL; + lockWaiting = false; /* ------------------ * Unlock my semaphore so that the interrupted ProcSleep() call can finish. |