summaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r--src/backend/storage/ipc/procarray.c73
-rw-r--r--src/backend/storage/ipc/procsignal.c27
-rw-r--r--src/backend/storage/ipc/sinvaladt.c200
-rw-r--r--src/backend/storage/ipc/standby.c1
4 files changed, 142 insertions, 159 deletions
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index dd329a86ef4..d96606ebba5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -701,7 +701,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
Assert(proc->subxidStatus.count == 0);
Assert(!proc->subxidStatus.overflowed);
- proc->lxid = InvalidLocalTransactionId;
+ proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
/* be sure this is cleared in abort */
@@ -743,7 +743,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
ProcGlobal->xids[pgxactoff] = InvalidTransactionId;
proc->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
+ proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
/* be sure this is cleared in abort */
@@ -930,7 +930,7 @@ ProcArrayClearTransaction(PGPROC *proc)
ProcGlobal->xids[pgxactoff] = InvalidTransactionId;
proc->xid = InvalidTransactionId;
- proc->lxid = InvalidLocalTransactionId;
+ proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
@@ -2536,6 +2536,11 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
/* Get lock so source xact can't end while we're doing this */
LWLockAcquire(ProcArrayLock, LW_SHARED);
+ /*
+ * Find the PGPROC entry of the source transaction. (This could use
+ * GetPGProcByBackendId(), unless it's a prepared xact. But this isn't
+ * performance critical.)
+ */
for (index = 0; index < arrayP->numProcs; index++)
{
int pgprocno = arrayP->pgprocnos[index];
@@ -2548,9 +2553,9 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
continue;
/* We are only interested in the specific virtual transaction. */
- if (proc->backendId != sourcevxid->backendId)
+ if (proc->vxid.backendId != sourcevxid->backendId)
continue;
- if (proc->lxid != sourcevxid->localTransactionId)
+ if (proc->vxid.lxid != sourcevxid->localTransactionId)
continue;
/*
@@ -3100,6 +3105,64 @@ HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids, int type)
}
/*
+ * BackendIdGetProc -- get a backend's PGPROC given its backend ID
+ *
+ * The result may be out of date arbitrarily quickly, so the caller
+ * must be careful about how this information is used. NULL is
+ * returned if the backend is not active.
+ */
+PGPROC *
+BackendIdGetProc(int backendID)
+{
+ PGPROC *result;
+
+ if (backendID < 1 || backendID > ProcGlobal->allProcCount)
+ return NULL;
+ result = GetPGProcByBackendId(backendID);
+
+ if (result->pid == 0)
+ return NULL;
+
+ return result;
+}
+
+/*
+ * BackendIdGetTransactionIds -- get a backend's transaction status
+ *
+ * Get the xid, xmin, nsubxid and overflow status of the backend. The
+ * result may be out of date arbitrarily quickly, so the caller must be
+ * careful about how this information is used.
+ */
+void
+BackendIdGetTransactionIds(int backendID, TransactionId *xid,
+ TransactionId *xmin, int *nsubxid, bool *overflowed)
+{
+ PGPROC *proc;
+
+ *xid = InvalidTransactionId;
+ *xmin = InvalidTransactionId;
+ *nsubxid = 0;
+ *overflowed = false;
+
+ if (backendID < 1 || backendID > ProcGlobal->allProcCount)
+ return;
+ proc = GetPGProcByBackendId(backendID);
+
+ /* Need to lock out additions/removals of backends */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ if (proc->pid != 0)
+ {
+ *xid = proc->xid;
+ *xmin = proc->xmin;
+ *nsubxid = proc->subxidStatus.count;
+ *overflowed = proc->subxidStatus.overflowed;
+ }
+
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
* BackendPidGetProc -- get a backend's PGPROC given its PID
*
* Returns NULL if not found. Note that it is up to the caller to be
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 0f9f90d2c7b..199dd182253 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -87,7 +87,7 @@ typedef struct
* possible auxiliary process type. (This scheme assumes there is not
* more than one of any auxiliary process type at a time.)
*/
-#define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES)
+#define NumProcSignalSlots (MaxBackends + NUM_AUXILIARY_PROCS)
/* Check whether the relevant type bit is set in the flags. */
#define BARRIER_SHOULD_CHECK(flags, type) \
@@ -154,24 +154,23 @@ ProcSignalShmemInit(void)
/*
* ProcSignalInit
* Register the current process in the ProcSignal array
- *
- * The passed index should be my BackendId if the process has one,
- * or MaxBackends + aux process type if not.
*/
void
-ProcSignalInit(int pss_idx)
+ProcSignalInit(void)
{
ProcSignalSlot *slot;
uint64 barrier_generation;
- Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);
-
- slot = &ProcSignal->psh_slot[pss_idx - 1];
+ if (MyBackendId <= 0)
+ elog(ERROR, "MyBackendId not set");
+ if (MyBackendId > NumProcSignalSlots)
+ elog(ERROR, "unexpected MyBackendId %d in ProcSignalInit (max %d)", MyBackendId, NumProcSignalSlots);
+ slot = &ProcSignal->psh_slot[MyBackendId - 1];
/* sanity check */
if (slot->pss_pid != 0)
elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",
- MyProcPid, pss_idx);
+ MyProcPid, MyBackendId - 1);
/* Clear out any leftover signal reasons */
MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));
@@ -200,7 +199,7 @@ ProcSignalInit(int pss_idx)
MyProcSignalSlot = slot;
/* Set up to release the slot on process exit */
- on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
+ on_shmem_exit(CleanupProcSignalState, (Datum) 0);
}
/*
@@ -212,11 +211,7 @@ ProcSignalInit(int pss_idx)
static void
CleanupProcSignalState(int status, Datum arg)
{
- int pss_idx = DatumGetInt32(arg);
- ProcSignalSlot *slot;
-
- slot = &ProcSignal->psh_slot[pss_idx - 1];
- Assert(slot == MyProcSignalSlot);
+ ProcSignalSlot *slot = MyProcSignalSlot;
/*
* Clear MyProcSignalSlot, so that a SIGUSR1 received after this point
@@ -233,7 +228,7 @@ CleanupProcSignalState(int status, Datum arg)
* infinite loop trying to exit
*/
elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d",
- MyProcPid, pss_idx, (int) slot->pss_pid);
+ MyProcPid, (int) (slot - ProcSignal->psh_slot), (int) slot->pss_pid);
return; /* XXX better to zero the slot anyway? */
}
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 748a792a854..f624bfc7d78 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -139,7 +139,6 @@ typedef struct ProcState
{
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
- PGPROC *proc; /* PGPROC of backend */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
@@ -172,8 +171,6 @@ typedef struct SISeg
int minMsgNum; /* oldest message still needed */
int maxMsgNum; /* next message number to be assigned */
int nextThreshold; /* # of messages to call SICleanupQueue */
- int lastBackend; /* index of last active procState entry, +1 */
- int maxBackends; /* size of procState array */
slock_t msgnumLock; /* spinlock protecting maxMsgNum */
@@ -183,11 +180,29 @@ typedef struct SISeg
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
/*
- * Per-backend invalidation state info (has MaxBackends entries).
+ * Per-backend invalidation state info.
+ *
+ * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
+ * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
+ * a dense array of their indexes, to speed up scanning all in-use slots.
+ *
+ * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
+ * having our separate copy avoids contention on ProcArrayLock, and allows
+ * us to track only the processes that participate in shared cache
+ * invalidations.
*/
+ int numProcs;
+ int *pgprocnos;
ProcState procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg;
+/*
+ * We reserve a slot for each possible BackendId, plus one for each
+ * possible auxiliary process type. (This scheme assumes there is not
+ * more than one of any auxiliary process type at a time.)
+ */
+#define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
+
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
@@ -205,16 +220,8 @@ SInvalShmemSize(void)
Size size;
size = offsetof(SISeg, procState);
-
- /*
- * In Hot Standby mode, the startup process requests a procState array
- * slot using InitRecoveryTransactionEnvironment(). Even though
- * MaxBackends doesn't account for the startup process, it is guaranteed
- * to get a free slot. This is because the autovacuum launcher and worker
- * processes, which are included in MaxBackends, are not started in Hot
- * Standby mode.
- */
- size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
+ size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
+ size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
return size;
}
@@ -239,23 +246,22 @@ CreateSharedInvalidationState(void)
shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
- shmInvalBuffer->lastBackend = 0;
- shmInvalBuffer->maxBackends = MaxBackends;
SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */
- for (i = 0; i < shmInvalBuffer->maxBackends; i++)
+ for (i = 0; i < NumProcStateSlots; i++)
{
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
- shmInvalBuffer->procState[i].proc = NULL;
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false;
shmInvalBuffer->procState[i].hasMessages = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
}
+ shmInvalBuffer->numProcs = 0;
+ shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
}
/*
@@ -265,59 +271,41 @@ CreateSharedInvalidationState(void)
void
SharedInvalBackendInit(bool sendOnly)
{
- int index;
- ProcState *stateP = NULL;
+ ProcState *stateP;
+ pid_t oldPid;
SISeg *segP = shmInvalBuffer;
+ int pgprocno;
+
+ if (MyBackendId <= 0)
+ elog(ERROR, "MyBackendId not set");
+ if (MyBackendId > NumProcStateSlots)
+ elog(PANIC, "unexpected MyBackendId %d in SharedInvalBackendInit (max %d)",
+ MyBackendId, NumProcStateSlots);
+ pgprocno = MyBackendId - 1;
+ stateP = &segP->procState[pgprocno];
/*
* This can run in parallel with read operations, but not with write
- * operations, since SIInsertDataEntries relies on lastBackend to set
- * hasMessages appropriately.
+ * operations, since SIInsertDataEntries relies on the pgprocnos array to
+ * set hasMessages appropriately.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- /* Look for a free entry in the procState array */
- for (index = 0; index < segP->lastBackend; index++)
- {
- if (segP->procState[index].procPid == 0) /* inactive slot? */
- {
- stateP = &segP->procState[index];
- break;
- }
- }
-
- if (stateP == NULL)
+ oldPid = stateP->procPid;
+ if (oldPid != 0)
{
- if (segP->lastBackend < segP->maxBackends)
- {
- stateP = &segP->procState[segP->lastBackend];
- Assert(stateP->procPid == 0);
- segP->lastBackend++;
- }
- else
- {
- /*
- * out of procState slots: MaxBackends exceeded -- report normally
- */
- MyBackendId = InvalidBackendId;
- LWLockRelease(SInvalWriteLock);
- ereport(FATAL,
- (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
- errmsg("sorry, too many clients already")));
- }
+ LWLockRelease(SInvalWriteLock);
+ elog(ERROR, "sinval slot for backend %d is already in use by process %d",
+ MyBackendId, (int) oldPid);
}
- MyBackendId = (stateP - &segP->procState[0]) + 1;
-
- /* Advertise assigned backend ID in MyProc */
- MyProc->backendId = MyBackendId;
+ shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = pgprocno;
/* Fetch next local transaction ID into local memory */
nextLocalTransactionId = stateP->nextLXID;
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
- stateP->proc = MyProc;
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
@@ -328,8 +316,6 @@ SharedInvalBackendInit(bool sendOnly)
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
-
- elog(DEBUG4, "my backend ID is %d", MyBackendId);
}
/*
@@ -345,96 +331,36 @@ CleanupInvalidationState(int status, Datum arg)
{
SISeg *segP = (SISeg *) DatumGetPointer(arg);
ProcState *stateP;
+ int pgprocno = MyBackendId - 1;
int i;
Assert(PointerIsValid(segP));
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- stateP = &segP->procState[MyBackendId - 1];
+ stateP = &segP->procState[pgprocno];
/* Update next local transaction ID for next holder of this backendID */
stateP->nextLXID = nextLocalTransactionId;
/* Mark myself inactive */
stateP->procPid = 0;
- stateP->proc = NULL;
stateP->nextMsgNum = 0;
stateP->resetState = false;
stateP->signaled = false;
- /* Recompute index of last active backend */
- for (i = segP->lastBackend; i > 0; i--)
+ for (i = segP->numProcs - 1; i >= 0; i--)
{
- if (segP->procState[i - 1].procPid != 0)
- break;
- }
- segP->lastBackend = i;
-
- LWLockRelease(SInvalWriteLock);
-}
-
-/*
- * BackendIdGetProc
- * Get the PGPROC structure for a backend, given the backend ID.
- * The result may be out of date arbitrarily quickly, so the caller
- * must be careful about how this information is used. NULL is
- * returned if the backend is not active.
- */
-PGPROC *
-BackendIdGetProc(int backendID)
-{
- PGPROC *result = NULL;
- SISeg *segP = shmInvalBuffer;
-
- /* Need to lock out additions/removals of backends */
- LWLockAcquire(SInvalWriteLock, LW_SHARED);
-
- if (backendID > 0 && backendID <= segP->lastBackend)
- {
- ProcState *stateP = &segP->procState[backendID - 1];
-
- result = stateP->proc;
- }
-
- LWLockRelease(SInvalWriteLock);
-
- return result;
-}
-
-/*
- * BackendIdGetTransactionIds
- * Get the xid, xmin, nsubxid and overflow status of the backend. The
- * result may be out of date arbitrarily quickly, so the caller must be
- * careful about how this information is used.
- */
-void
-BackendIdGetTransactionIds(int backendID, TransactionId *xid,
- TransactionId *xmin, int *nsubxid, bool *overflowed)
-{
- SISeg *segP = shmInvalBuffer;
-
- *xid = InvalidTransactionId;
- *xmin = InvalidTransactionId;
- *nsubxid = 0;
- *overflowed = false;
-
- /* Need to lock out additions/removals of backends */
- LWLockAcquire(SInvalWriteLock, LW_SHARED);
-
- if (backendID > 0 && backendID <= segP->lastBackend)
- {
- ProcState *stateP = &segP->procState[backendID - 1];
- PGPROC *proc = stateP->proc;
-
- if (proc != NULL)
+ if (segP->pgprocnos[i] == pgprocno)
{
- *xid = proc->xid;
- *xmin = proc->xmin;
- *nsubxid = proc->subxidStatus.count;
- *overflowed = proc->subxidStatus.overflowed;
+ if (i != segP->numProcs - 1)
+ segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
+ break;
}
}
+ if (i < 0)
+ elog(PANIC, "could not find entry in sinval array");
+ segP->numProcs--;
LWLockRelease(SInvalWriteLock);
}
@@ -507,9 +433,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
* these (unlocked) changes will be committed to memory before we exit
* the function.
*/
- for (i = 0; i < segP->lastBackend; i++)
+ for (i = 0; i < segP->numProcs; i++)
{
- ProcState *stateP = &segP->procState[i];
+ ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
stateP->hasMessages = true;
}
@@ -677,13 +603,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
minsig = min - SIG_THRESHOLD;
lowbound = min - MAXNUMMESSAGES + minFree;
- for (i = 0; i < segP->lastBackend; i++)
+ for (i = 0; i < segP->numProcs; i++)
{
- ProcState *stateP = &segP->procState[i];
+ ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
int n = stateP->nextMsgNum;
- /* Ignore if inactive or already in reset state */
- if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
+ /* Ignore if already in reset state */
+ Assert(stateP->procPid != 0);
+ if (stateP->resetState || stateP->sendOnly)
continue;
/*
@@ -719,11 +646,8 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
segP->maxMsgNum -= MSGNUMWRAPAROUND;
- for (i = 0; i < segP->lastBackend; i++)
- {
- /* we don't bother skipping inactive entries here */
- segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
- }
+ for (i = 0; i < segP->numProcs; i++)
+ segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
}
/*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index d8755a106d5..97d1ab65740 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -137,6 +137,7 @@ InitRecoveryTransactionEnvironment(void)
* are held by vxids and row level locks are held by xids. All queries
* hold AccessShareLocks so never block while we write or lock new rows.
*/
+ MyProc->vxid.backendId = MyBackendId;
vxid.backendId = MyBackendId;
vxid.localTransactionId = GetNextLocalTransactionId();
VirtualXactLockTableInsert(vxid);