summaryrefslogtreecommitdiff
path: root/src/backend/storage/lmgr/proc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/lmgr/proc.c')
-rw-r--r--src/backend/storage/lmgr/proc.c242
1 files changed, 139 insertions, 103 deletions
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 6b5159dbf04..406aa35f7c6 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -48,7 +48,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $
*/
#include "postgres.h"
@@ -74,13 +74,14 @@
#include <sys/sem.h>
#endif
+#include "access/xact.h"
#include "storage/proc.h"
void HandleDeadLock(SIGNAL_ARGS);
static void ProcFreeAllSemaphores(void);
-static bool GetOffWaitqueue(PROC *);
+static bool GetOffWaitQueue(PROC *);
int DeadlockTimeout = 1000;
@@ -300,50 +301,76 @@ InitProcess(void)
/* -----------------------
* get process off any wait queue it might be on
+ *
+ * NB: this does not remove the process' holder object, nor the lock object,
+ * even though their holder counts might now have gone to zero. That will
+ * happen during a subsequent LockReleaseAll call, which we expect will happen
+ * during transaction cleanup. (Removal of a proc from its wait queue by
+ * this routine can only happen if we are aborting the transaction.)
* -----------------------
*/
static bool
-GetOffWaitqueue(PROC *proc)
+GetOffWaitQueue(PROC *proc)
{
- bool getoffed = false;
+ bool gotoff = false;
LockLockTable();
if (proc->links.next != INVALID_OFFSET)
{
- int lockmode = proc->token;
- LOCK *waitLock = proc->waitLock;
+ LOCK *waitLock = proc->waitLock;
+ LOCKMODE lockmode = proc->waitLockMode;
+ /* Remove proc from lock's wait queue */
Assert(waitLock);
Assert(waitLock->waitProcs.size > 0);
SHMQueueDelete(&(proc->links));
--waitLock->waitProcs.size;
+
+ /* Undo increments of holder counts by waiting process */
Assert(waitLock->nHolding > 0);
Assert(waitLock->nHolding > proc->waitLock->nActive);
--waitLock->nHolding;
Assert(waitLock->holders[lockmode] > 0);
--waitLock->holders[lockmode];
+ /* don't forget to clear waitMask bit if appropriate */
if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode])
waitLock->waitMask &= ~(1 << lockmode);
- ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock);
- getoffed = true;
+
+ /* Clean up the proc's own state */
+ SHMQueueElemInit(&(proc->links));
+ proc->waitLock = NULL;
+ proc->waitHolder = NULL;
+
+ /* See if any other waiters can be woken up now */
+ ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
+
+ gotoff = true;
}
- SHMQueueElemInit(&(proc->links));
UnlockLockTable();
- return getoffed;
+ return gotoff;
}
/*
- * ProcReleaseLocks() -- release all locks associated with current transaction
+ * ProcReleaseLocks() -- release locks associated with current transaction
+ * at transaction commit or abort
+ *
+ * At commit, we release only locks tagged with the current transaction's XID,
+ * leaving those marked with XID 0 (ie, session locks) undisturbed. At abort,
+ * we release all locks including XID 0, because we need to clean up after
+ * a failure. This logic will need extension if we ever support nested
+ * transactions.
*
+ * Note that user locks are not released in either case.
*/
void
-ProcReleaseLocks()
+ProcReleaseLocks(bool isCommit)
{
if (!MyProc)
return;
- LockReleaseAll(DEFAULT_LOCKMETHOD, &MyProc->lockQueue);
- GetOffWaitqueue(MyProc);
+ GetOffWaitQueue(MyProc);
+ LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc,
+ !isCommit, GetCurrentTransactionId());
}
/*
@@ -384,47 +411,47 @@ static void
ProcKill(int exitStatus, Datum pid)
{
PROC *proc;
- SHMEM_OFFSET location;
/* --------------------
* If this is a FATAL exit the postmaster will have to kill all the
- * existing backends and reinitialize shared memory. So all we don't
+ * existing backends and reinitialize shared memory. So we don't
* need to do anything here.
* --------------------
*/
if (exitStatus != 0)
return;
- ShmemPIDLookup(MyProcPid, &location);
- if (location == INVALID_OFFSET)
- return;
-
- proc = (PROC *) MAKE_PTR(location);
+ if ((int) pid == MyProcPid)
+ {
+ proc = MyProc;
+ MyProc = NULL;
+ }
+ else
+ {
+ /* This path is dead code at the moment ... */
+ SHMEM_OFFSET location = INVALID_OFFSET;
- Assert(proc == MyProc || (int)pid != MyProcPid);
+ ShmemPIDLookup((int) pid, &location);
+ if (location == INVALID_OFFSET)
+ return;
+ proc = (PROC *) MAKE_PTR(location);
+ }
- MyProc = NULL;
+ Assert(proc);
- /* ---------------
- * Assume one lock table.
- * ---------------
- */
+ /* Release any spinlocks the proc is holding */
ProcReleaseSpins(proc);
- LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
-#ifdef USER_LOCKS
+ /* Get the proc off any wait queue it might be on */
+ GetOffWaitQueue(proc);
- /*
- * Assume we have a second lock table.
- */
- LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
-#endif
+ /* Remove from the standard lock table */
+ LockReleaseAll(DEFAULT_LOCKMETHOD, proc, true, InvalidTransactionId);
- /* ----------------
- * get off the wait queue
- * ----------------
- */
- GetOffWaitqueue(proc);
+#ifdef USER_LOCKS
+ /* Remove from the user lock table */
+ LockReleaseAll(USER_LOCKMETHOD, proc, true, InvalidTransactionId);
+#endif
}
/*
@@ -488,10 +515,10 @@ SetWaitingForLock(bool waiting)
}
if (QueryCancel) /* cancel request pending */
{
- if (GetOffWaitqueue(MyProc))
+ if (GetOffWaitQueue(MyProc))
{
lockWaiting = false;
- elog(ERROR, "Query cancel requested while waiting lock");
+ elog(ERROR, "Query cancel requested while waiting for lock");
}
}
}
@@ -519,8 +546,8 @@ LockWaitCancel(void)
set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM);
#endif /* __BEOS__ */
- if (GetOffWaitqueue(MyProc))
- elog(ERROR, "Query cancel requested while waiting lock");
+ if (GetOffWaitQueue(MyProc))
+ elog(ERROR, "Query cancel requested while waiting for lock");
}
/*
@@ -538,18 +565,19 @@ LockWaitCancel(void)
* NOTES: The process queue is now a priority queue for locking.
*/
int
-ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
- LOCKMETHODCTL *lockctl,
- int token, /* lockmode */
- LOCK *lock)
+ProcSleep(LOCKMETHODCTL *lockctl,
+ LOCKMODE lockmode,
+ LOCK *lock,
+ HOLDER *holder)
{
- int i;
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
SPINLOCK spinlock = lockctl->masterLock;
- PROC *proc;
- int myMask = (1 << token);
+ int myMask = (1 << lockmode);
int waitMask = lock->waitMask;
+ PROC *proc;
+ int i;
int aheadHolders[MAX_LOCKMODES];
- bool selfConflict = (lockctl->conflictTab[token] & myMask),
+ bool selfConflict = (lockctl->conflictTab[lockmode] & myMask),
prevSame = false;
#ifndef __BEOS__
struct itimerval timeval,
@@ -558,26 +586,28 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
bigtime_t time_interval;
#endif
- MyProc->token = token;
MyProc->waitLock = lock;
+ MyProc->waitHolder = holder;
+ MyProc->waitLockMode = lockmode;
+ /* We assume the caller set up MyProc->holdLock */
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* if we don't conflict with any waiter - be first in queue */
- if (!(lockctl->conflictTab[token] & waitMask))
+ if (!(lockctl->conflictTab[lockmode] & waitMask))
goto ins;
for (i = 1; i < MAX_LOCKMODES; i++)
aheadHolders[i] = lock->activeHolders[i];
- (aheadHolders[token])++;
+ (aheadHolders[lockmode])++;
for (i = 0; i < waitQueue->size; i++)
{
/* am I waiting for him ? */
- if (lockctl->conflictTab[token] & proc->holdLock)
+ if (lockctl->conflictTab[lockmode] & proc->holdLock)
{
/* is he waiting for me ? */
- if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock)
{
/* Yes, report deadlock failure */
MyProc->errType = STATUS_ERROR;
@@ -586,10 +616,10 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
/* being waiting for him - go past */
}
/* if he waits for me */
- else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ else if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock)
break;
/* if conflicting locks requested */
- else if (lockctl->conflictTab[proc->token] & myMask)
+ else if (lockctl->conflictTab[proc->waitLockMode] & myMask)
{
/*
@@ -604,13 +634,13 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
* Last attempt to don't move any more: if we don't conflict with
* rest waiters in queue.
*/
- else if (!(lockctl->conflictTab[token] & waitMask))
+ else if (!(lockctl->conflictTab[lockmode] & waitMask))
break;
- prevSame = (proc->token == token);
- (aheadHolders[proc->token])++;
- if (aheadHolders[proc->token] == lock->holders[proc->token])
- waitMask &= ~(1 << proc->token);
+ prevSame = (proc->waitLockMode == lockmode);
+ (aheadHolders[proc->waitLockMode])++;
+ if (aheadHolders[proc->waitLockMode] == lock->holders[proc->waitLockMode])
+ waitMask &= ~(1 << proc->waitLockMode);
proc = (PROC *) MAKE_PTR(proc->links.prev);
}
@@ -692,10 +722,8 @@ ins:;
rt:;
-#ifdef LOCK_DEBUG
- /* Just to get meaningful debug messages from DumpLocks() */
- MyProc->waitLock = (LOCK *) NULL;
-#endif
+ MyProc->waitLock = NULL;
+ MyProc->waitHolder = NULL;
return MyProc->errType;
}
@@ -704,7 +732,7 @@ rt:;
/*
* ProcWakeup -- wake up a process by releasing its private semaphore.
*
- * remove the process from the wait queue and set its links invalid.
+ * Also remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue.
*/
PROC *
@@ -720,9 +748,9 @@ ProcWakeup(PROC *proc, int errType)
retProc = (PROC *) MAKE_PTR(proc->links.prev);
- /* you have to update waitLock->waitProcs.size yourself */
SHMQueueDelete(&(proc->links));
SHMQueueElemInit(&(proc->links));
+ (proc->waitLock->waitProcs.size)--;
proc->errType = errType;
@@ -736,65 +764,70 @@ ProcWakeup(PROC *proc, int errType)
* released.
*/
int
-ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
+ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
{
+ PROC_QUEUE *queue = &(lock->waitProcs);
PROC *proc;
- int count = 0;
- int last_locktype = 0;
+ int awoken = 0;
+ LOCKMODE last_lockmode = 0;
int queue_size = queue->size;
- Assert(queue->size >= 0);
+ Assert(queue_size >= 0);
- if (!queue->size)
+ if (!queue_size)
return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev);
- while ((queue_size--) && (proc))
- {
- /*
- * This proc will conflict as the previous one did, don't even
- * try.
- */
- if (proc->token == last_locktype)
- continue;
+ while (queue_size-- > 0)
+ {
+ if (proc->waitLockMode == last_lockmode)
+ {
+ /*
+ * This proc will conflict as the previous one did, don't even
+ * try.
+ */
+ goto nextProc;
+ }
/*
* Does this proc conflict with locks held by others ?
*/
if (LockResolveConflicts(lockmethod,
+ proc->waitLockMode,
lock,
- proc->token,
- proc->xid,
- (XIDLookupEnt *) NULL) != STATUS_OK)
+ proc->waitHolder,
+ proc,
+ NULL) != STATUS_OK)
{
- if (count != 0)
+ /* Yes. Quit if we already awoke at least one process. */
+ if (awoken != 0)
break;
- last_locktype = proc->token;
- continue;
+ /* Otherwise, see if any later waiters can be awoken. */
+ last_lockmode = proc->waitLockMode;
+ goto nextProc;
}
/*
- * there was a waiting process, grant it the lock before waking it
- * up. This will prevent another process from seizing the lock
- * between the time we release the lock master (spinlock) and the
- * time that the awoken process begins executing again.
+ * OK to wake up this sleeping process.
*/
- GrantLock(lock, proc->token);
+ GrantLock(lock, proc->waitHolder, proc->waitLockMode);
+ proc = ProcWakeup(proc, NO_ERROR);
+ awoken++;
/*
- * ProcWakeup removes proc from the lock waiting process queue and
- * returns the next proc in chain.
+ * ProcWakeup removes proc from the lock's waiting process queue
+ * and returns the next proc in chain; don't use prev link.
*/
+ continue;
- count++;
- queue->size--;
- proc = ProcWakeup(proc, NO_ERROR);
+nextProc:
+ proc = (PROC *) MAKE_PTR(proc->links.prev);
}
Assert(queue->size >= 0);
- if (count)
+ if (awoken)
return STATUS_OK;
else
{
@@ -802,9 +835,10 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
#ifdef LOCK_DEBUG
if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
{
- elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock));
+ elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
+ MAKE_OFFSET(lock));
if (Debug_deadlocks)
- DumpAllLocks();
+ DumpAllLocks();
}
#endif
return STATUS_NOT_FOUND;
@@ -872,10 +906,12 @@ HandleDeadLock(SIGNAL_ARGS)
*/
mywaitlock = MyProc->waitLock;
Assert(mywaitlock->waitProcs.size > 0);
- lockWaiting = false;
--mywaitlock->waitProcs.size;
SHMQueueDelete(&(MyProc->links));
SHMQueueElemInit(&(MyProc->links));
+ MyProc->waitLock = NULL;
+ MyProc->waitHolder = NULL;
+ lockWaiting = false;
/* ------------------
* Unlock my semaphore so that the interrupted ProcSleep() call can finish.