summaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2024-03-03 19:37:28 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2024-03-03 19:37:28 +0200
commitab355e3a88de745607f6dd4c21f0119b5c68f2ad (patch)
tree7808e71e8fbee29095708f47f80bb2bea099e52e /src/backend/storage/ipc
parent30b8d6e4ce1112168ddfe8cdbba76fbefd304b34 (diff)
Redefine backend ID to be an index into the proc array
Previously, backend ID was an index into the ProcState array, in the shared cache invalidation manager (sinvaladt.c). The entry in the ProcState array was reserved at backend startup by scanning the array for a free entry, and that was also when the backend got its backend ID. Things become slightly simpler if we redefine backend ID to be the index into the PGPROC array, and directly use it also as an index to the ProcState array. This uses a little more memory, as we reserve a few extra slots in the ProcState array for aux processes that don't need them, but the simplicity is worth it. Aux processes now also have a backend ID. This simplifies the reservation of BackendStatusArray and ProcSignal slots. You can now convert a backend ID into an index into the PGPROC array simply by subtracting 1. We still use 0-based "pgprocnos" in various places, for indexes into the PGPROC array, but the only difference now is that backend IDs start at 1 while pgprocnos start at 0. (The next commmit will get rid of the term "backend ID" altogether and make everything 0-based.) There is still a 'backendId' field in PGPROC, now part of 'vxid' which encapsulates the backend ID and local transaction ID together. It's needed for prepared xacts. For regular backends, the backendId is always equal to pgprocno + 1, but for prepared xact PGPROC entries, it's the ID of the original backend that processed the transaction. Reviewed-by: Andres Freund, Reid Thompson Discussion: https://www.postgresql.org/message-id/8171f1aa-496f-46a6-afc3-c46fe7a9b407@iki.fi
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r--src/backend/storage/ipc/procarray.c73
-rw-r--r--src/backend/storage/ipc/procsignal.c27
-rw-r--r--src/backend/storage/ipc/sinvaladt.c200
-rw-r--r--src/backend/storage/ipc/standby.c1
4 files changed, 142 insertions, 159 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index dd329a86ef4..d96606ebba5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -701,7 +701,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
Assert(proc->subxidStatus.count == 0);
Assert(!proc->subxidStatus.overflowed);
- proc->lxid = InvalidLocalTransactionId;
+ proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
/* be sure this is cleared in abort */
@@ -743,7 +743,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
ProcGlobal->xids[pgxactoff] = InvalidTransactionId;
proc->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
+ proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
/* be sure this is cleared in abort */
@@ -930,7 +930,7 @@ ProcArrayClearTransaction(PGPROC *proc)
ProcGlobal->xids[pgxactoff] = InvalidTransactionId;
proc->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
+ proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
@@ -2536,6 +2536,11 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
/* Get lock so source xact can't end while we're doing this */
LWLockAcquire(ProcArrayLock, LW_SHARED);
+ /*
+ * Find the PGPROC entry of the source transaction. (This could use
+ * GetPGProcByBackendId(), unless it's a prepared xact. But this isn't
+ * performance critical.)
+ */
for (index = 0; index < arrayP->numProcs; index++)
{
int pgprocno = arrayP->pgprocnos[index];
@@ -2548,9 +2553,9 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
continue;
/* We are only interested in the specific virtual transaction. */
- if (proc->backendId != sourcevxid->backendId)
+ if (proc->vxid.backendId != sourcevxid->backendId)
continue;
- if (proc->lxid != sourcevxid->localTransactionId)
+ if (proc->vxid.lxid != sourcevxid->localTransactionId)
continue;
/*
@@ -3100,6 +3105,64 @@ HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids, int type)
}
/*
+ * BackendIdGetProc -- get a backend's PGPROC given its backend ID
+ *
+ * The result may be out of date arbitrarily quickly, so the caller
+ * must be careful about how this information is used. NULL is
+ * returned if the backend is not active.
+ */
+PGPROC *
+BackendIdGetProc(int backendID)
+{
+ PGPROC *result;
+
+ if (backendID < 1 || backendID > ProcGlobal->allProcCount)
+ return NULL;
+ result = GetPGProcByBackendId(backendID);
+
+ if (result->pid == 0)
+ return NULL;
+
+ return result;
+}
+
+/*
+ * BackendIdGetTransactionIds -- get a backend's transaction status
+ *
+ * Get the xid, xmin, nsubxid and overflow status of the backend. The
+ * result may be out of date arbitrarily quickly, so the caller must be
+ * careful about how this information is used.
+ */
+void
+BackendIdGetTransactionIds(int backendID, TransactionId *xid,
+ TransactionId *xmin, int *nsubxid, bool *overflowed)
+{
+ PGPROC *proc;
+
+ *xid = InvalidTransactionId;
+ *xmin = InvalidTransactionId;
+ *nsubxid = 0;
+ *overflowed = false;
+
+ if (backendID < 1 || backendID > ProcGlobal->allProcCount)
+ return;
+ proc = GetPGProcByBackendId(backendID);
+
+ /* Need to lock out additions/removals of backends */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ if (proc->pid != 0)
+ {
+ *xid = proc->xid;
+ *xmin = proc->xmin;
+ *nsubxid = proc->subxidStatus.count;
+ *overflowed = proc->subxidStatus.overflowed;
+ }
+
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
* BackendPidGetProc -- get a backend's PGPROC given its PID
*
* Returns NULL if not found. Note that it is up to the caller to be
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 0f9f90d2c7b..199dd182253 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -87,7 +87,7 @@ typedef struct
* possible auxiliary process type. (This scheme assumes there is not
* more than one of any auxiliary process type at a time.)
*/
-#define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES)
+#define NumProcSignalSlots (MaxBackends + NUM_AUXILIARY_PROCS)
/* Check whether the relevant type bit is set in the flags. */
#define BARRIER_SHOULD_CHECK(flags, type) \
@@ -154,24 +154,23 @@ ProcSignalShmemInit(void)
/*
* ProcSignalInit
* Register the current process in the ProcSignal array
- *
- * The passed index should be my BackendId if the process has one,
- * or MaxBackends + aux process type if not.
*/
void
-ProcSignalInit(int pss_idx)
+ProcSignalInit(void)
{
ProcSignalSlot *slot;
uint64 barrier_generation;
- Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);
-
- slot = &ProcSignal->psh_slot[pss_idx - 1];
+ if (MyBackendId <= 0)
+ elog(ERROR, "MyBackendId not set");
+ if (MyBackendId > NumProcSignalSlots)
+ elog(ERROR, "unexpected MyBackendId %d in ProcSignalInit (max %d)", MyBackendId, NumProcSignalSlots);
+ slot = &ProcSignal->psh_slot[MyBackendId - 1];
/* sanity check */
if (slot->pss_pid != 0)
elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",
- MyProcPid, pss_idx);
+ MyProcPid, MyBackendId - 1);
/* Clear out any leftover signal reasons */
MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));
@@ -200,7 +199,7 @@ ProcSignalInit(int pss_idx)
MyProcSignalSlot = slot;
/* Set up to release the slot on process exit */
- on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
+ on_shmem_exit(CleanupProcSignalState, (Datum) 0);
}
/*
@@ -212,11 +211,7 @@ ProcSignalInit(int pss_idx)
static void
CleanupProcSignalState(int status, Datum arg)
{
- int pss_idx = DatumGetInt32(arg);
- ProcSignalSlot *slot;
-
- slot = &ProcSignal->psh_slot[pss_idx - 1];
- Assert(slot == MyProcSignalSlot);
+ ProcSignalSlot *slot = MyProcSignalSlot;
/*
* Clear MyProcSignalSlot, so that a SIGUSR1 received after this point
@@ -233,7 +228,7 @@ CleanupProcSignalState(int status, Datum arg)
* infinite loop trying to exit
*/
elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d",
- MyProcPid, pss_idx, (int) slot->pss_pid);
+ MyProcPid, (int) (slot - ProcSignal->psh_slot), (int) slot->pss_pid);
return; /* XXX better to zero the slot anyway? */
}
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 748a792a854..f624bfc7d78 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -139,7 +139,6 @@ typedef struct ProcState
{
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
- PGPROC *proc; /* PGPROC of backend */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
@@ -172,8 +171,6 @@ typedef struct SISeg
int minMsgNum; /* oldest message still needed */
int maxMsgNum; /* next message number to be assigned */
int nextThreshold; /* # of messages to call SICleanupQueue */
- int lastBackend; /* index of last active procState entry, +1 */
- int maxBackends; /* size of procState array */
slock_t msgnumLock; /* spinlock protecting maxMsgNum */
@@ -183,11 +180,29 @@ typedef struct SISeg
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
/*
- * Per-backend invalidation state info (has MaxBackends entries).
+ * Per-backend invalidation state info.
+ *
+ * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
+ * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
+ * a dense array of their indexes, to speed up scanning all in-use slots.
+ *
+ * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
+ * having our separate copy avoids contention on ProcArrayLock, and allows
+ * us to track only the processes that participate in shared cache
+ * invalidations.
*/
+ int numProcs;
+ int *pgprocnos;
ProcState procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;
+/*
+ * We reserve a slot for each possible BackendId, plus one for each
+ * possible auxiliary process type. (This scheme assumes there is not
+ * more than one of any auxiliary process type at a time.)
+ */
+#define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
+
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
@@ -205,16 +220,8 @@ SInvalShmemSize(void)
Size size;
size = offsetof(SISeg, procState);
-
- /*
- * In Hot Standby mode, the startup process requests a procState array
- * slot using InitRecoveryTransactionEnvironment(). Even though
- * MaxBackends doesn't account for the startup process, it is guaranteed
- * to get a free slot. This is because the autovacuum launcher and worker
- * processes, which are included in MaxBackends, are not started in Hot
- * Standby mode.
- */
- size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
+ size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
+ size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
return size;
}
@@ -239,23 +246,22 @@ CreateSharedInvalidationState(void)
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
- shmInvalBuffer->lastBackend = 0;
- shmInvalBuffer->maxBackends = MaxBackends;
SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */
- for (i = 0; i < shmInvalBuffer->maxBackends; i++)
+ for (i = 0; i < NumProcStateSlots; i++)
{
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
- shmInvalBuffer->procState[i].proc = NULL;
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false;
shmInvalBuffer->procState[i].hasMessages = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
+ shmInvalBuffer->numProcs = 0;
+ shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
}
/*
@@ -265,59 +271,41 @@ CreateSharedInvalidationState(void)
void
SharedInvalBackendInit(bool sendOnly)
{
- int index;
- ProcState *stateP = NULL;
+ ProcState *stateP;
+ pid_t oldPid;
SISeg *segP = shmInvalBuffer;
+ int pgprocno;
+
+ if (MyBackendId <= 0)
+ elog(ERROR, "MyBackendId not set");
+ if (MyBackendId > NumProcStateSlots)
+ elog(PANIC, "unexpected MyBackendId %d in SharedInvalBackendInit (max %d)",
+ MyBackendId, NumProcStateSlots);
+ pgprocno = MyBackendId - 1;
+ stateP = &segP->procState[pgprocno];
/*
* This can run in parallel with read operations, but not with write
- * operations, since SIInsertDataEntries relies on lastBackend to set
- * hasMessages appropriately.
+ * operations, since SIInsertDataEntries relies on the pgprocnos array to
+ * set hasMessages appropriately.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- /* Look for a free entry in the procState array */
- for (index = 0; index < segP->lastBackend; index++)
- {
- if (segP->procState[index].procPid == 0) /* inactive slot? */
- {
- stateP = &segP->procState[index];
- break;
- }
- }
-
- if (stateP == NULL)
+ oldPid = stateP->procPid;
+ if (oldPid != 0)
{
- if (segP->lastBackend < segP->maxBackends)
- {
- stateP = &segP->procState[segP->lastBackend];
- Assert(stateP->procPid == 0);
- segP->lastBackend++;
- }
- else
- {
- /*
- * out of procState slots: MaxBackends exceeded -- report normally
- */
- MyBackendId = InvalidBackendId;
- LWLockRelease(SInvalWriteLock);
- ereport(FATAL,
- (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
- errmsg("sorry, too many clients already")));
- }
+ LWLockRelease(SInvalWriteLock);
+ elog(ERROR, "sinval slot for backend %d is already in use by process %d",
+ MyBackendId, (int) oldPid);
}
- MyBackendId = (stateP - &segP->procState[0]) + 1;
-
- /* Advertise assigned backend ID in MyProc */
- MyProc->backendId = MyBackendId;
+ shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = pgprocno;
/* Fetch next local transaction ID into local memory */
nextLocalTransactionId = stateP->nextLXID;
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
- stateP->proc = MyProc;
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
@@ -328,8 +316,6 @@ SharedInvalBackendInit(bool sendOnly)
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
-
- elog(DEBUG4, "my backend ID is %d", MyBackendId);
}
/*
@@ -345,96 +331,36 @@ CleanupInvalidationState(int status, Datum arg)
{
SISeg *segP = (SISeg *) DatumGetPointer(arg);
ProcState *stateP;
+ int pgprocno = MyBackendId - 1;
int i;
Assert(PointerIsValid(segP));
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- stateP = &segP->procState[MyBackendId - 1];
+ stateP = &segP->procState[pgprocno];
/* Update next local transaction ID for next holder of this backendID */
stateP->nextLXID = nextLocalTransactionId;
/* Mark myself inactive */
stateP->procPid = 0;
- stateP->proc = NULL;
stateP->nextMsgNum = 0;
stateP->resetState = false;
stateP->signaled = false;
- /* Recompute index of last active backend */
- for (i = segP->lastBackend; i > 0; i--)
+ for (i = segP->numProcs - 1; i >= 0; i--)
{
- if (segP->procState[i - 1].procPid != 0)
- break;
- }
- segP->lastBackend = i;
-
- LWLockRelease(SInvalWriteLock);
-}
-
-/*
- * BackendIdGetProc
- * Get the PGPROC structure for a backend, given the backend ID.
- * The result may be out of date arbitrarily quickly, so the caller
- * must be careful about how this information is used. NULL is
- * returned if the backend is not active.
- */
-PGPROC *
-BackendIdGetProc(int backendID)
-{
- PGPROC *result = NULL;
- SISeg *segP = shmInvalBuffer;
-
- /* Need to lock out additions/removals of backends */
- LWLockAcquire(SInvalWriteLock, LW_SHARED);
-
- if (backendID > 0 && backendID <= segP->lastBackend)
- {
- ProcState *stateP = &segP->procState[backendID - 1];
-
- result = stateP->proc;
- }
-
- LWLockRelease(SInvalWriteLock);
-
- return result;
-}
-
-/*
- * BackendIdGetTransactionIds
- * Get the xid, xmin, nsubxid and overflow status of the backend. The
- * result may be out of date arbitrarily quickly, so the caller must be
- * careful about how this information is used.
- */
-void
-BackendIdGetTransactionIds(int backendID, TransactionId *xid,
- TransactionId *xmin, int *nsubxid, bool *overflowed)
-{
- SISeg *segP = shmInvalBuffer;
-
- *xid = InvalidTransactionId;
- *xmin = InvalidTransactionId;
- *nsubxid = 0;
- *overflowed = false;
-
- /* Need to lock out additions/removals of backends */
- LWLockAcquire(SInvalWriteLock, LW_SHARED);
-
- if (backendID > 0 && backendID <= segP->lastBackend)
- {
- ProcState *stateP = &segP->procState[backendID - 1];
- PGPROC *proc = stateP->proc;
-
- if (proc != NULL)
+ if (segP->pgprocnos[i] == pgprocno)
{
- *xid = proc->xid;
- *xmin = proc->xmin;
- *nsubxid = proc->subxidStatus.count;
- *overflowed = proc->subxidStatus.overflowed;
+ if (i != segP->numProcs - 1)
+ segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
+ break;
}
}
+ if (i < 0)
+ elog(PANIC, "could not find entry in sinval array");
+ segP->numProcs--;
LWLockRelease(SInvalWriteLock);
}
@@ -507,9 +433,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
* these (unlocked) changes will be committed to memory before we exit
* the function.
*/
- for (i = 0; i < segP->lastBackend; i++)
+ for (i = 0; i < segP->numProcs; i++)
{
- ProcState *stateP = &segP->procState[i];
+ ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
stateP->hasMessages = true;
}
@@ -677,13 +603,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
minsig = min - SIG_THRESHOLD;
lowbound = min - MAXNUMMESSAGES + minFree;
- for (i = 0; i < segP->lastBackend; i++)
+ for (i = 0; i < segP->numProcs; i++)
{
- ProcState *stateP = &segP->procState[i];
+ ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
int n = stateP->nextMsgNum;
- /* Ignore if inactive or already in reset state */
- if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
+ /* Ignore if already in reset state */
+ Assert(stateP->procPid != 0);
+ if (stateP->resetState || stateP->sendOnly)
continue;
/*
@@ -719,11 +646,8 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
segP->maxMsgNum -= MSGNUMWRAPAROUND;
- for (i = 0; i < segP->lastBackend; i++)
- {
- /* we don't bother skipping inactive entries here */
- segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
- }
+ for (i = 0; i < segP->numProcs; i++)
+ segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
}
/*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index d8755a106d5..97d1ab65740 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -137,6 +137,7 @@ InitRecoveryTransactionEnvironment(void)
* are held by vxids and row level locks are held by xids. All queries
* hold AccessShareLocks so never block while we write or lock new rows.
*/
+ MyProc->vxid.backendId = MyBackendId;
vxid.backendId = MyBackendId;
vxid.localTransactionId = GetNextLocalTransactionId();
VirtualXactLockTableInsert(vxid);