| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
 | /*-------------------------------------------------------------------------
 *
 * hashinsert.c
 *	  Item insertion in hash tables for Postgres.
 *
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/backend/access/hash/hashinsert.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "access/hash.h"
#include "access/hash_xlog.h"
#include "access/heapam.h"
#include "miscadmin.h"
#include "utils/rel.h"
#include "storage/lwlock.h"
#include "storage/buf_internals.h"
static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
								  RelFileNode hnode);
/*
 *	_hash_doinsert() -- Handle insertion of a single index tuple.
 *
 *		This routine is called by the public interface routines, hashbuild
 *		and hashinsert.  By here, itup is completely filled in.
 */
void
_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
{
	Buffer		buf = InvalidBuffer;
	Buffer		bucket_buf;
	Buffer		metabuf;
	HashMetaPage metap;
	HashMetaPage usedmetap = NULL;
	Page		metapage;
	Page		page;
	HashPageOpaque pageopaque;
	Size		itemsz;
	bool		do_expand;
	uint32		hashkey;
	Bucket		bucket;
	OffsetNumber itup_off;
	/*
	 * Get the hash key for the item (it's stored in the index tuple itself).
	 */
	hashkey = _hash_get_indextuple_hashkey(itup);
	/* compute item size too */
	itemsz = IndexTupleDSize(*itup);
	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
								 * need to be consistent */
restart_insert:
	/*
	 * Read the metapage.  We don't lock it yet; HashMaxItemSize() will
	 * examine pd_pagesize_version, but that can't change so we can examine
	 * it without a lock.
	 */
	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
	metapage = BufferGetPage(metabuf);
	/*
	 * Check whether the item can fit on a hash page at all. (Eventually, we
	 * ought to try to apply TOAST methods if not.)  Note that at this point,
	 * itemsz doesn't include the ItemId.
	 *
	 * XXX this is useless code if we are only storing hash keys.
	 */
	if (itemsz > HashMaxItemSize(metapage))
		ereport(ERROR,
				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
				 errmsg("index row size %zu exceeds hash maximum %zu",
						itemsz, HashMaxItemSize(metapage)),
			errhint("Values larger than a buffer page cannot be indexed.")));
	/* Lock the primary bucket page for the target bucket. */
	buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
										  &usedmetap);
	Assert(usedmetap != NULL);
	/* remember the primary bucket buffer to release the pin on it at end. */
	bucket_buf = buf;
	page = BufferGetPage(buf);
	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
	bucket = pageopaque->hasho_bucket;
	/*
	 * If this bucket is in the process of being split, try to finish the
	 * split before inserting, because that might create room for the
	 * insertion to proceed without allocating an additional overflow page.
	 * It's only interesting to finish the split if we're trying to insert
	 * into the bucket from which we're removing tuples (the "old" bucket),
	 * not if we're trying to insert into the bucket into which tuples are
	 * being moved (the "new" bucket).
	 */
	if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
	{
		/* release the lock on bucket buffer, before completing the split. */
		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
		_hash_finish_split(rel, metabuf, buf, bucket,
						   usedmetap->hashm_maxbucket,
						   usedmetap->hashm_highmask,
						   usedmetap->hashm_lowmask);
		/* release the pin on old and meta buffer.  retry for insert. */
		_hash_dropbuf(rel, buf);
		_hash_dropbuf(rel, metabuf);
		goto restart_insert;
	}
	/* Do the insertion */
	while (PageGetFreeSpace(page) < itemsz)
	{
		BlockNumber nextblkno;
		/*
		 * Check if current page has any DEAD tuples. If yes,
		 * delete these tuples and see if we can get a space for
		 * the new item to be inserted before moving to the next
		 * page in the bucket chain.
		 */
		if (H_HAS_DEAD_TUPLES(pageopaque))
		{
			if (IsBufferCleanupOK(buf))
			{
				_hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
				if (PageGetFreeSpace(page) >= itemsz)
					break;				/* OK, now we have enough space */
			}
		}
		/*
		 * no space on this page; check for an overflow page
		 */
		nextblkno = pageopaque->hasho_nextblkno;
		if (BlockNumberIsValid(nextblkno))
		{
			/*
			 * ovfl page exists; go get it.  if it doesn't have room, we'll
			 * find out next pass through the loop test above.  we always
			 * release both the lock and pin if this is an overflow page, but
			 * only the lock if this is the primary bucket page, since the pin
			 * on the primary bucket must be retained throughout the scan.
			 */
			if (buf != bucket_buf)
				_hash_relbuf(rel, buf);
			else
				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
			buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
			page = BufferGetPage(buf);
		}
		else
		{
			/*
			 * we're at the end of the bucket chain and we haven't found a
			 * page with enough room.  allocate a new overflow page.
			 */
			/* release our write lock without modifying buffer */
			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
			/* chain to a new overflow page */
			buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf) ? true : false);
			page = BufferGetPage(buf);
			/* should fit now, given test above */
			Assert(PageGetFreeSpace(page) >= itemsz);
		}
		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
		Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
		Assert(pageopaque->hasho_bucket == bucket);
	}
	/*
	 * Write-lock the metapage so we can increment the tuple count. After
	 * incrementing it, check to see if it's time for a split.
	 */
	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
	/* Do the update.  No ereport(ERROR) until changes are logged */
	START_CRIT_SECTION();
	/* found page with enough space, so add the item here */
	itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
	MarkBufferDirty(buf);
	/* metapage operations */
	metap = HashPageGetMeta(metapage);
	metap->hashm_ntuples += 1;
	/* Make sure this stays in sync with _hash_expandtable() */
	do_expand = metap->hashm_ntuples >
		(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
	MarkBufferDirty(metabuf);
	/* XLOG stuff */
	if (RelationNeedsWAL(rel))
	{
		xl_hash_insert xlrec;
		XLogRecPtr	recptr;
		xlrec.offnum = itup_off;
		XLogBeginInsert();
		XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
		XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
		XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
		PageSetLSN(BufferGetPage(buf), recptr);
		PageSetLSN(BufferGetPage(metabuf), recptr);
	}
	END_CRIT_SECTION();
	/* drop lock on metapage, but keep pin */
	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
	/*
	 * Release the modified page and ensure to release the pin on primary
	 * page.
	 */
	_hash_relbuf(rel, buf);
	if (buf != bucket_buf)
		_hash_dropbuf(rel, bucket_buf);
	/* Attempt to split if a split is needed */
	if (do_expand)
		_hash_expandtable(rel, metabuf);
	/* Finally drop our pin on the metapage */
	_hash_dropbuf(rel, metabuf);
}
/*
 *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
 *
 * This routine adds the tuple to the page as requested; it does not write out
 * the page.  It is an error to call pgaddtup() without pin and write lock on
 * the target buffer.
 *
 * Returns the offset number at which the tuple was inserted.  This function
 * is responsible for preserving the condition that tuples in a hash index
 * page are sorted by hashkey value.
 */
OffsetNumber
_hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
{
	OffsetNumber itup_off;
	Page		page;
	uint32		hashkey;
	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
	page = BufferGetPage(buf);
	/* Find where to insert the tuple (preserving page's hashkey ordering) */
	hashkey = _hash_get_indextuple_hashkey(itup);
	itup_off = _hash_binsearch(page, hashkey);
	if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
		== InvalidOffsetNumber)
		elog(ERROR, "failed to add index item to \"%s\"",
			 RelationGetRelationName(rel));
	return itup_off;
}
/*
 *	_hash_pgaddmultitup() -- add a tuple vector to a particular page in the
 *							 index.
 *
 * This routine has same requirements for locking and tuple ordering as
 * _hash_pgaddtup().
 *
 * Returns the offset number array at which the tuples were inserted.
 */
void
_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
					OffsetNumber *itup_offsets, uint16 nitups)
{
	OffsetNumber itup_off;
	Page		page;
	uint32		hashkey;
	int			i;
	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
	page = BufferGetPage(buf);
	for (i = 0; i < nitups; i++)
	{
		Size		itemsize;
		itemsize = IndexTupleDSize(*itups[i]);
		itemsize = MAXALIGN(itemsize);
		/* Find where to insert the tuple (preserving page's hashkey ordering) */
		hashkey = _hash_get_indextuple_hashkey(itups[i]);
		itup_off = _hash_binsearch(page, hashkey);
		itup_offsets[i] = itup_off;
		if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
			== InvalidOffsetNumber)
			elog(ERROR, "failed to add index item to \"%s\"",
				 RelationGetRelationName(rel));
	}
}
/*
 * _hash_vacuum_one_page - vacuum just one index page.
 *
 * Try to remove LP_DEAD items from the given page. We must acquire cleanup
 * lock on the page being modified before calling this function.
 */
static void
_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
					  RelFileNode hnode)
{
	OffsetNumber	deletable[MaxOffsetNumber];
	int ndeletable = 0;
	OffsetNumber offnum,
				 maxoff;
	Page	page = BufferGetPage(buf);
	HashPageOpaque	pageopaque;
	HashMetaPage	metap;
	/* Scan each tuple in page to see if it is marked as LP_DEAD */
	maxoff = PageGetMaxOffsetNumber(page);
	for (offnum = FirstOffsetNumber;
		 offnum <= maxoff;
		 offnum = OffsetNumberNext(offnum))
	{
		ItemId	itemId = PageGetItemId(page, offnum);
		if (ItemIdIsDead(itemId))
			deletable[ndeletable++] = offnum;
	}
	if (ndeletable > 0)
	{
		/*
		 * Write-lock the meta page so that we can decrement
		 * tuple count.
		 */
		LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
		/* No ereport(ERROR) until changes are logged */
		START_CRIT_SECTION();
		PageIndexMultiDelete(page, deletable, ndeletable);
		/*
		 * Mark the page as not containing any LP_DEAD items. This is not
		 * certainly true (there might be some that have recently been marked,
		 * but weren't included in our target-item list), but it will almost
		 * always be true and it doesn't seem worth an additional page scan
		 * to check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
		 * anyway.
		 */
		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
		pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
		metap = HashPageGetMeta(BufferGetPage(metabuf));
		metap->hashm_ntuples -= ndeletable;
		MarkBufferDirty(buf);
		MarkBufferDirty(metabuf);
		/* XLOG stuff */
		if (RelationNeedsWAL(rel))
		{
			xl_hash_vacuum_one_page	xlrec;
			XLogRecPtr	recptr;
			xlrec.hnode = hnode;
			xlrec.ntuples = ndeletable;
			XLogBeginInsert();
			XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
			XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
			/*
			 * We need the target-offsets array whether or not we store the whole
			 * buffer, to allow us to find the latestRemovedXid on a standby
			 * server.
			 */
			XLogRegisterData((char *) deletable,
						ndeletable * sizeof(OffsetNumber));
			XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
			PageSetLSN(BufferGetPage(buf), recptr);
			PageSetLSN(BufferGetPage(metabuf), recptr);
		}
		END_CRIT_SECTION();
		/*
		 * Releasing write lock on meta page as we have updated
		 * the tuple count.
		 */
		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
	}
}
 |