| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
 | /*-------------------------------------------------------------------------
 *
 * sinvaladt.c
 *	  POSTGRES shared cache invalidation segment definitions.
 *
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.46 2002/05/05 00:03:28 tgl Exp $
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/sinvaladt.h"
SISeg	   *shmInvalBuffer;
static void CleanupInvalidationState(int status, Datum arg);
static void SISetProcStateInvalid(SISeg *segP);
/*
 * SInvalShmemSize --- return shared-memory space needed
 */
int
SInvalShmemSize(int maxBackends)
{
	/*
	 * Figure space needed. Note sizeof(SISeg) includes the first
	 * ProcState entry.
	 */
	return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
}
/*
 * SIBufferInit
 *		Create and initialize a new SI message buffer
 */
void
SIBufferInit(int maxBackends)
{
	int			segSize;
	SISeg	   *segP;
	int			i;
	/* Allocate space in shared memory */
	segSize = SInvalShmemSize(maxBackends);
	shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize);
	/* Clear message counters, save size of procState array */
	segP->minMsgNum = 0;
	segP->maxMsgNum = 0;
	segP->lastBackend = 0;
	segP->maxBackends = maxBackends;
	/* The buffer[] array is initially all unused, so we need not fill it */
	/* Mark all backends inactive */
	for (i = 0; i < maxBackends; i++)
	{
		segP->procState[i].nextMsgNum = -1;		/* inactive */
		segP->procState[i].resetState = false;
		segP->procState[i].procStruct = INVALID_OFFSET;
	}
}
/*
 * SIBackendInit
 *		Initialize a new backend to operate on the sinval buffer
 *
 * Returns:
 *	   >0	A-OK
 *		0	Failed to find a free procState slot (ie, MaxBackends exceeded)
 *	   <0	Some other failure (not currently used)
 *
 * NB: this routine, and all following ones, must be executed with the
 * SInvalLock lock held, since there may be multiple backends trying
 * to access the buffer.
 */
int
SIBackendInit(SISeg *segP)
{
	int			index;
	ProcState  *stateP = NULL;
	/* Look for a free entry in the procState array */
	for (index = 0; index < segP->lastBackend; index++)
	{
		if (segP->procState[index].nextMsgNum < 0)		/* inactive slot? */
		{
			stateP = &segP->procState[index];
			break;
		}
	}
	if (stateP == NULL)
	{
		if (segP->lastBackend < segP->maxBackends)
		{
			stateP = &segP->procState[segP->lastBackend];
			Assert(stateP->nextMsgNum < 0);
			segP->lastBackend++;
		}
		else
		{
			/* out of procState slots */
			MyBackendId = InvalidBackendId;
			return 0;
		}
	}
	MyBackendId = (stateP - &segP->procState[0]) + 1;
#ifdef	INVALIDDEBUG
	elog(DEBUG1, "SIBackendInit: backend id %d", MyBackendId);
#endif   /* INVALIDDEBUG */
	/* mark myself active, with all extant messages already read */
	stateP->nextMsgNum = segP->maxMsgNum;
	stateP->resetState = false;
	stateP->procStruct = MAKE_OFFSET(MyProc);
	/* register exit routine to mark my entry inactive at exit */
	on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
	return 1;
}
/*
 * CleanupInvalidationState
 *		Mark the current backend as no longer active.
 *
 * This function is called via on_shmem_exit() during backend shutdown,
 * so the caller has NOT acquired the lock for us.
 *
 * arg is really of type "SISeg*".
 */
static void
CleanupInvalidationState(int status, Datum arg)
{
	SISeg	   *segP = (SISeg *) DatumGetPointer(arg);
	int			i;
	Assert(PointerIsValid(segP));
	LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
	/* Mark myself inactive */
	segP->procState[MyBackendId - 1].nextMsgNum = -1;
	segP->procState[MyBackendId - 1].resetState = false;
	segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET;
	/* Recompute index of last active backend */
	for (i = segP->lastBackend; i > 0; i--)
	{
		if (segP->procState[i - 1].nextMsgNum >= 0)
			break;
	}
	segP->lastBackend = i;
	LWLockRelease(SInvalLock);
}
/*
 * SIInsertDataEntry
 *		Add a new invalidation message to the buffer.
 *
 * If we are unable to insert the message because the buffer is full,
 * then clear the buffer and assert the "reset" flag to each backend.
 * This will cause all the backends to discard *all* invalidatable state.
 *
 * Returns true for normal successful insertion, false if had to reset.
 */
bool
SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
{
	int			numMsgs = segP->maxMsgNum - segP->minMsgNum;
	/* Is the buffer full? */
	if (numMsgs >= MAXNUMMESSAGES)
	{
		/*
		 * Don't panic just yet: slowest backend might have consumed some
		 * messages but not yet have done SIDelExpiredDataEntries() to
		 * advance minMsgNum.  So, make sure minMsgNum is up-to-date.
		 */
		SIDelExpiredDataEntries(segP);
		numMsgs = segP->maxMsgNum - segP->minMsgNum;
		if (numMsgs >= MAXNUMMESSAGES)
		{
			/* Yup, it's definitely full, no choice but to reset */
			SISetProcStateInvalid(segP);
			return false;
		}
	}
	/*
	 * Try to prevent table overflow.  When the table is 70% full send a
	 * WAKEN_CHILDREN request to the postmaster.  The postmaster will send
	 * a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends.
	 * This will force idle backends to execute a transaction to look
	 * through pg_listener for NOTIFY messages, and as a byproduct of the
	 * transaction start they will read SI entries.
	 *
	 * This should never happen if all the backends are actively executing
	 * queries, but if a backend is sitting idle then it won't be starting
	 * transactions and so won't be reading SI entries.
	 *
	 * dz - 27 Jan 1998
	 */
	if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
		IsUnderPostmaster)
	{
		elog(DEBUG3, "SIInsertDataEntry: table is 70%% full, signaling postmaster");
		SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
	}
	/*
	 * Insert new message into proper slot of circular buffer
	 */
	segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
	segP->maxMsgNum++;
	return true;
}
/*
 * SISetProcStateInvalid
 *		Flush pending messages from buffer, assert reset flag for each backend
 *
 * This is used only to recover from SI buffer overflow.
 */
static void
SISetProcStateInvalid(SISeg *segP)
{
	int			i;
	segP->minMsgNum = 0;
	segP->maxMsgNum = 0;
	for (i = 0; i < segP->lastBackend; i++)
	{
		if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
		{
			segP->procState[i].resetState = true;
			segP->procState[i].nextMsgNum = 0;
		}
	}
}
/*
 * SIGetDataEntry
 *		get next SI message for specified backend, if there is one
 *
 * Possible return values:
 *	0: no SI message available
 *	1: next SI message has been extracted into *data
 *		(there may be more messages available after this one!)
 * -1: SI reset message extracted
 *
 * NB: this can run in parallel with other instances of SIGetDataEntry
 * executing on behalf of other backends.  See comments in sinval.c in
 * ReceiveSharedInvalidMessages().
 */
int
SIGetDataEntry(SISeg *segP, int backendId,
			   SharedInvalidationMessage *data)
{
	ProcState  *stateP = &segP->procState[backendId - 1];
	if (stateP->resetState)
	{
		/*
		 * Force reset.  We can say we have dealt with any messages added
		 * since the reset, as well...
		 */
		stateP->resetState = false;
		stateP->nextMsgNum = segP->maxMsgNum;
		return -1;
	}
	if (stateP->nextMsgNum >= segP->maxMsgNum)
		return 0;				/* nothing to read */
	/*
	 * Retrieve message and advance my counter.
	 */
	*data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
	stateP->nextMsgNum++;
	/*
	 * There may be other backends that haven't read the message, so we
	 * cannot delete it here. SIDelExpiredDataEntries() should be called
	 * to remove dead messages.
	 */
	return 1;					/* got a message */
}
/*
 * SIDelExpiredDataEntries
 *		Remove messages that have been consumed by all active backends
 */
void
SIDelExpiredDataEntries(SISeg *segP)
{
	int			min,
				i,
				h;
	min = segP->maxMsgNum;
	if (min == segP->minMsgNum)
		return;					/* fast path if no messages exist */
	/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
	for (i = 0; i < segP->lastBackend; i++)
	{
		h = segP->procState[i].nextMsgNum;
		if (h >= 0)
		{						/* backend active */
			if (h < min)
				min = h;
		}
	}
	segP->minMsgNum = min;
	/*
	 * When minMsgNum gets really large, decrement all message counters so
	 * as to forestall overflow of the counters.
	 */
	if (min >= MSGNUMWRAPAROUND)
	{
		segP->minMsgNum -= MSGNUMWRAPAROUND;
		segP->maxMsgNum -= MSGNUMWRAPAROUND;
		for (i = 0; i < segP->lastBackend; i++)
		{
			if (segP->procState[i].nextMsgNum >= 0)
				segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
		}
	}
}
 |