| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
 | /*-------------------------------------------------------------------------
 *
 * hio.c
 *	  POSTGRES heap access method input/output code.
 *
 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $Id: hio.c,v 1.47 2003/02/13 05:35:11 momjian Exp $
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "access/heapam.h"
#include "access/hio.h"
#include "storage/freespace.h"
/*
 * RelationPutHeapTuple - place tuple at specified page
 *
 * !!! ELOG(ERROR) IS DISALLOWED HERE !!!
 *
 * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
 */
void
RelationPutHeapTuple(Relation relation,
					 Buffer buffer,
					 HeapTuple tuple)
{
	Page		pageHeader;
	OffsetNumber offnum;
	ItemId		itemId;
	Item		item;
	/* Add the tuple to the page */
	pageHeader = BufferGetPage(buffer);
	offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
						 tuple->t_len, InvalidOffsetNumber, LP_USED);
	if (offnum == InvalidOffsetNumber)
		elog(PANIC, "RelationPutHeapTuple: failed to add tuple");
	/* Update tuple->t_self to the actual position where it was stored */
	ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
	/* Insert the correct position into CTID of the stored tuple, too */
	itemId = PageGetItemId(pageHeader, offnum);
	item = PageGetItem(pageHeader, itemId);
	((HeapTupleHeader) item)->t_ctid = tuple->t_self;
}
/*
 * RelationGetBufferForTuple
 *
 *	Returns pinned and exclusive-locked buffer of a page in given relation
 *	with free space >= given len.
 *
 *	If otherBuffer is not InvalidBuffer, then it references a previously
 *	pinned buffer of another page in the same relation; on return, this
 *	buffer will also be exclusive-locked.  (This case is used by heap_update;
 *	the otherBuffer contains the tuple being updated.)
 *
 *	The reason for passing otherBuffer is that if two backends are doing
 *	concurrent heap_update operations, a deadlock could occur if they try
 *	to lock the same two buffers in opposite orders.  To ensure that this
 *	can't happen, we impose the rule that buffers of a relation must be
 *	locked in increasing page number order.  This is most conveniently done
 *	by having RelationGetBufferForTuple lock them both, with suitable care
 *	for ordering.
 *
 *	NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
 *	same buffer we select for insertion of the new tuple (this could only
 *	happen if space is freed in that page after heap_update finds there's not
 *	enough there).	In that case, the page will be pinned and locked only once.
 *
 *	Note that we use LockPage(rel, 0) to lock relation for extension.
 *	We can do this as long as in all other places we use page-level locking
 *	for indices only. Alternatively, we could define pseudo-table as
 *	we do for transactions with XactLockTable.
 *
 *	ELOG(ERROR) is allowed here, so this routine *must* be called
 *	before any (unlogged) changes are made in buffer pool.
 */
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
						  Buffer otherBuffer)
{
	Buffer		buffer = InvalidBuffer;
	Page		pageHeader;
	Size		pageFreeSpace;
	BlockNumber targetBlock,
				otherBlock;
	bool		needLock;
	len = MAXALIGN(len);		/* be conservative */
	/*
	 * If we're gonna fail for oversize tuple, do it right away
	 */
	if (len > MaxTupleSize)
		elog(ERROR, "Tuple is too big: size %lu, max size %ld",
			 (unsigned long) len, MaxTupleSize);
	if (otherBuffer != InvalidBuffer)
		otherBlock = BufferGetBlockNumber(otherBuffer);
	else
		otherBlock = InvalidBlockNumber;		/* just to keep compiler
												 * quiet */
	/*
	 * We first try to put the tuple on the same page we last inserted a
	 * tuple on, as cached in the relcache entry.  If that doesn't work,
	 * we ask the shared Free Space Map to locate a suitable page.	Since
	 * the FSM's info might be out of date, we have to be prepared to loop
	 * around and retry multiple times.  (To insure this isn't an infinite
	 * loop, we must update the FSM with the correct amount of free space
	 * on each page that proves not to be suitable.)  If the FSM has no
	 * record of a page with enough free space, we give up and extend the
	 * relation.
	 */
	targetBlock = relation->rd_targblock;
	if (targetBlock == InvalidBlockNumber)
	{
		/*
		 * We have no cached target page, so ask the FSM for an initial
		 * target.
		 */
		targetBlock = GetPageWithFreeSpace(&relation->rd_node, len);
		/*
		 * If the FSM knows nothing of the rel, try the last page before
		 * we give up and extend.  This avoids one-tuple-per-page syndrome
		 * during bootstrapping or in a recently-started system.
		 */
		if (targetBlock == InvalidBlockNumber)
		{
			BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
			if (nblocks > 0)
				targetBlock = nblocks - 1;
		}
	}
	while (targetBlock != InvalidBlockNumber)
	{
		/*
		 * Read and exclusive-lock the target block, as well as the other
		 * block if one was given, taking suitable care with lock ordering
		 * and the possibility they are the same block.
		 */
		if (otherBuffer == InvalidBuffer)
		{
			/* easy case */
			buffer = ReadBuffer(relation, targetBlock);
			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		}
		else if (otherBlock == targetBlock)
		{
			/* also easy case */
			buffer = otherBuffer;
			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		}
		else if (otherBlock < targetBlock)
		{
			/* lock other buffer first */
			buffer = ReadBuffer(relation, targetBlock);
			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		}
		else
		{
			/* lock target buffer first */
			buffer = ReadBuffer(relation, targetBlock);
			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
		}
		/*
		 * Now we can check to see if there's enough free space here. If
		 * so, we're done.
		 */
		pageHeader = (Page) BufferGetPage(buffer);
		pageFreeSpace = PageGetFreeSpace(pageHeader);
		if (len <= pageFreeSpace)
		{
			/* use this page as future insert target, too */
			relation->rd_targblock = targetBlock;
			return buffer;
		}
		/*
		 * Not enough space, so we must give up our page locks and pin (if
		 * any) and prepare to look elsewhere.	We don't care which order
		 * we unlock the two buffers in, so this can be slightly simpler
		 * than the code above.
		 */
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		if (otherBuffer == InvalidBuffer)
			ReleaseBuffer(buffer);
		else if (otherBlock != targetBlock)
		{
			LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
			ReleaseBuffer(buffer);
		}
		/*
		 * Update FSM as to condition of this page, and ask for another
		 * page to try.
		 */
		targetBlock = RecordAndGetPageWithFreeSpace(&relation->rd_node,
													targetBlock,
													pageFreeSpace,
													len);
	}
	/*
	 * Have to extend the relation.
	 *
	 * We have to use a lock to ensure no one else is extending the rel at
	 * the same time, else we will both try to initialize the same new
	 * page.  We can skip locking for new or temp relations, however,
	 * since no one else could be accessing them.
	 */
	needLock = !(relation->rd_isnew || relation->rd_istemp);
	if (needLock)
		LockPage(relation, 0, ExclusiveLock);
	/*
	 * XXX This does an lseek - rather expensive - but at the moment it is
	 * the only way to accurately determine how many blocks are in a
	 * relation.  Is it worth keeping an accurate file length in shared
	 * memory someplace, rather than relying on the kernel to do it for
	 * us?
	 */
	buffer = ReadBuffer(relation, P_NEW);
	/*
	 * Release the file-extension lock; it's now OK for someone else to
	 * extend the relation some more.
	 */
	if (needLock)
		UnlockPage(relation, 0, ExclusiveLock);
	/*
	 * We can be certain that locking the otherBuffer first is OK, since
	 * it must have a lower page number.
	 */
	if (otherBuffer != InvalidBuffer)
		LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
	/*
	 * We need to initialize the empty new page.
	 */
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
	pageHeader = (Page) BufferGetPage(buffer);
	Assert(PageIsNew((PageHeader) pageHeader));
	PageInit(pageHeader, BufferGetPageSize(buffer), 0);
	if (len > PageGetFreeSpace(pageHeader))
	{
		/* We should not get here given the test at the top */
		elog(PANIC, "Tuple is too big: size %lu", (unsigned long) len);
	}
	/*
	 * Remember the new page as our target for future insertions.
	 *
	 * XXX should we enter the new page into the free space map immediately,
	 * or just keep it for this backend's exclusive use in the short run
	 * (until VACUUM sees it)?	Seems to depend on whether you expect the
	 * current backend to make more insertions or not, which is probably a
	 * good bet most of the time.  So for now, don't add it to FSM yet.
	 */
	relation->rd_targblock = BufferGetBlockNumber(buffer);
	return buffer;
}
 |