diff options
Diffstat (limited to 'src/backend/storage/lmgr/proc.c')
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 243 |
1 files changed, 78 insertions, 165 deletions
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 00d26dc0f68..e2adf493ca9 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -15,8 +15,6 @@ /* * Interface (a): * ProcSleep(), ProcWakeup(), - * ProcQueueAlloc() -- create a shm queue for sleeping processes - * ProcQueueInit() -- create a queue without allocing memory * * Waiting for a lock causes the backend to be put to sleep. Whoever releases * the lock wakes the process up again (and gives it an error code so it knows @@ -173,10 +171,10 @@ InitProcGlobal(void) * Initialize the data structures. */ ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; - ProcGlobal->freeProcs = NULL; - ProcGlobal->autovacFreeProcs = NULL; - ProcGlobal->bgworkerFreeProcs = NULL; - ProcGlobal->walsenderFreeProcs = NULL; + dlist_init(&ProcGlobal->freeProcs); + dlist_init(&ProcGlobal->autovacFreeProcs); + dlist_init(&ProcGlobal->bgworkerFreeProcs); + dlist_init(&ProcGlobal->walsenderFreeProcs); ProcGlobal->startupBufferPinWaitBufId = -1; ProcGlobal->walwriterLatch = NULL; ProcGlobal->checkpointerLatch = NULL; @@ -214,6 +212,8 @@ InitProcGlobal(void) for (i = 0; i < TotalProcs; i++) { + PGPROC *proc = &procs[i]; + /* Common initialization for all PGPROCs, regardless of type. */ /* @@ -223,11 +223,11 @@ InitProcGlobal(void) */ if (i < MaxBackends + NUM_AUXILIARY_PROCS) { - procs[i].sem = PGSemaphoreCreate(); - InitSharedLatch(&(procs[i].procLatch)); - LWLockInitialize(&(procs[i].fpInfoLock), LWTRANCHE_LOCK_FASTPATH); + proc->sem = PGSemaphoreCreate(); + InitSharedLatch(&(proc->procLatch)); + LWLockInitialize(&(proc->fpInfoLock), LWTRANCHE_LOCK_FASTPATH); } - procs[i].pgprocno = i; + proc->pgprocno = i; /* * Newly created PGPROCs for normal backends, autovacuum and bgworkers @@ -240,46 +240,42 @@ InitProcGlobal(void) if (i < MaxConnections) { /* PGPROC for normal backend, add to freeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; - ProcGlobal->freeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->freeProcs; + dlist_push_head(&ProcGlobal->freeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->freeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1) { /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; - ProcGlobal->autovacFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->autovacFreeProcs; + dlist_push_head(&ProcGlobal->autovacFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->autovacFreeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes) { /* PGPROC for bgworker, add to bgworkerFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; - ProcGlobal->bgworkerFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs; + dlist_push_head(&ProcGlobal->bgworkerFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->bgworkerFreeProcs; } else if (i < MaxBackends) { /* PGPROC for walsender, add to walsenderFreeProcs list */ - procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs; - ProcGlobal->walsenderFreeProcs = &procs[i]; - procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs; + dlist_push_head(&ProcGlobal->walsenderFreeProcs, &proc->links); + proc->procgloballist = &ProcGlobal->walsenderFreeProcs; } /* Initialize myProcLocks[] shared memory queues. */ for (j = 0; j < NUM_LOCK_PARTITIONS; j++) - SHMQueueInit(&(procs[i].myProcLocks[j])); + dlist_init(&(proc->myProcLocks[j])); /* Initialize lockGroupMembers list. */ - dlist_init(&procs[i].lockGroupMembers); + dlist_init(&proc->lockGroupMembers); /* * Initialize the atomic variables, otherwise, it won't be safe to * access them for backends that aren't currently in use. */ - pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO); - pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO); - pg_atomic_init_u64(&(procs[i].waitStart), 0); + pg_atomic_init_u32(&(proc->procArrayGroupNext), INVALID_PGPROCNO); + pg_atomic_init_u32(&(proc->clogGroupNext), INVALID_PGPROCNO); + pg_atomic_init_u64(&(proc->waitStart), 0); } /* @@ -300,7 +296,7 @@ InitProcGlobal(void) void InitProcess(void) { - PGPROC *volatile *procgloballist; + dlist_head *procgloballist; /* * ProcGlobal should be set up already (if we are a backend, we inherit @@ -333,11 +329,9 @@ InitProcess(void) set_spins_per_delay(ProcGlobal->spins_per_delay); - MyProc = *procgloballist; - - if (MyProc != NULL) + if (!dlist_is_empty(procgloballist)) { - *procgloballist = (PGPROC *) MyProc->links.next; + MyProc = (PGPROC*) dlist_pop_head_node(procgloballist); SpinLockRelease(ProcStructLock); } else @@ -378,7 +372,7 @@ InitProcess(void) * Initialize all fields of MyProc, except for those previously * initialized by InitProcGlobal. */ - SHMQueueElemInit(&(MyProc->links)); + dlist_node_init(&MyProc->links); MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->lxid = InvalidLocalTransactionId; MyProc->fpVXIDLock = false; @@ -408,7 +402,7 @@ InitProcess(void) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif MyProc->recoveryConflictPending = false; @@ -565,7 +559,7 @@ InitAuxiliaryProcess(void) * Initialize all fields of MyProc, except for those previously * initialized by InitProcGlobal. */ - SHMQueueElemInit(&(MyProc->links)); + dlist_node_init(&MyProc->links); MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->lxid = InvalidLocalTransactionId; MyProc->fpVXIDLock = false; @@ -590,7 +584,7 @@ InitAuxiliaryProcess(void) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif @@ -658,16 +652,15 @@ GetStartupBufferPinWaitBufId(void) bool HaveNFreeProcs(int n) { - PGPROC *proc; + dlist_iter iter; SpinLockAcquire(ProcStructLock); - proc = ProcGlobal->freeProcs; - - while (n > 0 && proc != NULL) + dlist_foreach(iter, &ProcGlobal->freeProcs) { - proc = (PGPROC *) proc->links.next; n--; + if (n == 0) + break; } SpinLockRelease(ProcStructLock); @@ -730,7 +723,7 @@ LockErrorCleanup(void) partitionLock = LockHashPartitionLock(lockAwaited->hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); - if (MyProc->links.next != NULL) + if (!dlist_node_is_detached(&MyProc->links)) { /* We could not have been granted the lock yet */ RemoveFromWaitQueue(MyProc, lockAwaited->hashcode); @@ -803,7 +796,7 @@ static void ProcKill(int code, Datum arg) { PGPROC *proc; - PGPROC *volatile *procgloballist; + dlist_head *procgloballist; Assert(MyProc != NULL); @@ -816,7 +809,7 @@ ProcKill(int code, Datum arg) /* Last process should have released all locks. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); + Assert(dlist_is_empty(&(MyProc->myProcLocks[i]))); } #endif @@ -832,7 +825,7 @@ ProcKill(int code, Datum arg) /* * Detach from any lock group of which we are a member. If the leader - * exist before all other group members, its PGPROC will remain allocated + * exits before all other group members, its PGPROC will remain allocated * until the last group process exits; that process must return the * leader's PGPROC to the appropriate list. */ @@ -853,8 +846,7 @@ ProcKill(int code, Datum arg) /* Leader exited first; return its PGPROC. */ SpinLockAcquire(ProcStructLock); - leader->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = leader; + dlist_push_head(procgloballist, &leader->links); SpinLockRelease(ProcStructLock); } } @@ -893,8 +885,7 @@ ProcKill(int code, Datum arg) Assert(dlist_is_empty(&proc->lockGroupMembers)); /* Return PGPROC structure (and semaphore) to appropriate freelist */ - proc->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = proc; + dlist_push_tail(procgloballist, &proc->links); } /* Update shared estimate of spins_per_delay */ @@ -986,44 +977,6 @@ AuxiliaryPidGetProc(int pid) return result; } -/* - * ProcQueue package: routines for putting processes to sleep - * and waking them up - */ - -/* - * ProcQueueAlloc -- alloc/attach to a shared memory process queue - * - * Returns: a pointer to the queue - * Side Effects: Initializes the queue if it wasn't there before - */ -#ifdef NOT_USED -PROC_QUEUE * -ProcQueueAlloc(const char *name) -{ - PROC_QUEUE *queue; - bool found; - - queue = (PROC_QUEUE *) - ShmemInitStruct(name, sizeof(PROC_QUEUE), &found); - - if (!found) - ProcQueueInit(queue); - - return queue; -} -#endif - -/* - * ProcQueueInit -- initialize a shared memory process queue - */ -void -ProcQueueInit(PROC_QUEUE *queue) -{ - SHMQueueInit(&(queue->links)); - queue->size = 0; -} - /* * ProcSleep -- put a process to sleep on the specified lock @@ -1049,8 +1002,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) PROCLOCK *proclock = locallock->proclock; uint32 hashcode = locallock->hashcode; LWLock *partitionLock = LockHashPartitionLock(hashcode); - PROC_QUEUE *waitQueue = &(lock->waitProcs); - SHM_QUEUE *waitQueuePos; + dclist_head *waitQueue = &lock->waitProcs; + PGPROC *insert_before = NULL; LOCKMASK myHeldLocks = MyProc->heldLocks; TimestampTz standbyWaitStart = 0; bool early_deadlock = false; @@ -1058,7 +1011,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) bool logged_recovery_conflict = false; ProcWaitStatus myWaitStatus; PGPROC *leader = MyProc->lockGroupLeader; - int i; /* * If group locking is in use, locks held by members of my locking group @@ -1072,18 +1024,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) */ if (leader != NULL) { - SHM_QUEUE *procLocks = &(lock->procLocks); - PROCLOCK *otherproclock; + dlist_iter iter; - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); - while (otherproclock != NULL) + dlist_foreach(iter, &lock->procLocks) { + PROCLOCK *otherproclock; + + otherproclock = dlist_container(PROCLOCK, lockLink, iter.cur); + if (otherproclock->groupLeader == leader) myHeldLocks |= otherproclock->holdMask; - otherproclock = (PROCLOCK *) - SHMQueueNext(procLocks, &otherproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } } @@ -1104,15 +1054,14 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * we are only considering the part of the wait queue before my insertion * point. */ - if (myHeldLocks != 0 && waitQueue->size > 0) + if (myHeldLocks != 0 && !dclist_is_empty(waitQueue)) { LOCKMASK aheadRequests = 0; - SHM_QUEUE *proc_node; + dlist_iter iter; - proc_node = waitQueue->links.next; - for (i = 0; i < waitQueue->size; i++) + dclist_foreach(iter, waitQueue) { - PGPROC *proc = (PGPROC *) proc_node; + PGPROC *proc = dlist_container(PGPROC, links, iter.cur); /* * If we're part of the same locking group as this waiter, its @@ -1120,10 +1069,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * aheadRequests. */ if (leader != NULL && leader == proc->lockGroupLeader) - { - proc_node = proc->links.next; continue; - } + /* Must he wait for me? */ if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks) { @@ -1151,31 +1098,23 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) GrantAwaitedLock(); return PROC_WAIT_STATUS_OK; } - /* Break out of loop to put myself before him */ + + /* Put myself into wait queue before conflicting process */ + insert_before = proc; break; } /* Nope, so advance to next waiter */ aheadRequests |= LOCKBIT_ON(proc->waitLockMode); - proc_node = proc->links.next; } - - /* - * If we iterated through the whole queue, cur points to the waitQueue - * head, so we will insert at tail of queue as desired. - */ - waitQueuePos = proc_node; - } - else - { - /* I hold no locks, so I can't push in front of anyone. */ - waitQueuePos = &waitQueue->links; } /* * Insert self into queue, at the position determined above. */ - SHMQueueInsertBefore(waitQueuePos, &MyProc->links); - waitQueue->size++; + if (insert_before) + dclist_insert_before(waitQueue, &insert_before->links, &MyProc->links); + else + dclist_push_tail(waitQueue, &MyProc->links); lock->waitMask |= LOCKBIT_ON(lockmode); @@ -1453,7 +1392,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) long secs; int usecs; long msecs; - SHM_QUEUE *procLocks; + dlist_iter proc_iter; PROCLOCK *curproclock; bool first_holder = true, first_waiter = true; @@ -1483,12 +1422,11 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LWLockAcquire(partitionLock, LW_SHARED); - procLocks = &(lock->procLocks); - curproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, lockLink)); - - while (curproclock) + dlist_foreach(proc_iter, &lock->procLocks) { + curproclock = + dlist_container(PROCLOCK, lockLink, proc_iter.cur); + /* * we are a waiter if myProc->waitProcLock == curproclock; we * are a holder if it is NULL or something different @@ -1519,10 +1457,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) lockHoldersNum++; } - - curproclock = (PROCLOCK *) SHMQueueNext(procLocks, - &curproclock->lockLink, - offsetof(PROCLOCK, lockLink)); } LWLockRelease(partitionLock); @@ -1657,7 +1591,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * ProcWakeup -- wake up a process by setting its latch. * * Also remove the process from the wait queue and set its links invalid. - * RETURN: the next process in the wait queue. * * The appropriate lock partition lock must be held by caller. * @@ -1666,23 +1599,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * to twiddle the lock's request counts too --- see RemoveFromWaitQueue. * Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK. */ -PGPROC * +void ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) { - PGPROC *retProc; + if (dlist_node_is_detached(&proc->links)) + return; - /* Proc should be sleeping ... */ - if (proc->links.prev == NULL || - proc->links.next == NULL) - return NULL; Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING); - /* Save next process before we zap the list link */ - retProc = (PGPROC *) proc->links.next; - /* Remove process from wait queue */ - SHMQueueDelete(&(proc->links)); - (proc->waitLock->waitProcs.size)--; + dclist_delete_from_thoroughly(&proc->waitLock->waitProcs, &proc->links); /* Clean up process' state and pass it the ok/fail signal */ proc->waitLock = NULL; @@ -1692,8 +1618,6 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) /* And awaken it */ SetLatch(&proc->procLatch); - - return retProc; } /* @@ -1706,20 +1630,16 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { - PROC_QUEUE *waitQueue = &(lock->waitProcs); - int queue_size = waitQueue->size; - PGPROC *proc; + dclist_head *waitQueue = &lock->waitProcs; LOCKMASK aheadRequests = 0; + dlist_mutable_iter miter; - Assert(queue_size >= 0); - - if (queue_size == 0) + if (dclist_is_empty(waitQueue)) return; - proc = (PGPROC *) waitQueue->links.next; - - while (queue_size-- > 0) + dclist_foreach_modify(miter, waitQueue) { + PGPROC *proc = dlist_container(PGPROC, links, miter.cur); LOCKMODE lockmode = proc->waitLockMode; /* @@ -1732,25 +1652,18 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { /* OK to waken */ GrantLock(lock, proc->waitProcLock, lockmode); - proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK); - - /* - * ProcWakeup removes proc from the lock's waiting process queue - * and returns the next proc in chain; don't use proc's next-link, - * because it's been cleared. - */ + /* removes proc from the lock's waiting process queue */ + ProcWakeup(proc, PROC_WAIT_STATUS_OK); } else { /* - * Cannot wake this guy. Remember his request for later checks. + * Lock conflicts: Don't wake, but remember requested mode for + * later checks. */ aheadRequests |= LOCKBIT_ON(lockmode); - proc = (PGPROC *) proc->links.next; } } - - Assert(waitQueue->size >= 0); } /* |