| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
 | /*-------------------------------------------------------------------------
 *
 * hashinsert.c
 *	  Item insertion in hash tables for Postgres.
 *
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.20 2000/03/17 02:36:02 tgl Exp $
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "access/hash.h"
static InsertIndexResult _hash_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, HashItem hitem, Buffer metabuf);
static OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, HashItem hitem);
/*
 *	_hash_doinsert() -- Handle insertion of a single HashItem in the table.
 *
 *		This routine is called by the public interface routines, hashbuild
 *		and hashinsert.  By here, hashitem is filled in, and has a unique
 *		(xid, seqno) pair. The datum to be used as a "key" is in the
 *		hashitem.
 */
InsertIndexResult
_hash_doinsert(Relation rel, HashItem hitem)
{
	Buffer		buf;
	Buffer		metabuf;
	BlockNumber blkno;
	HashMetaPage metap;
	IndexTuple	itup;
	InsertIndexResult res;
	ScanKey		itup_scankey;
	int			natts;
	Page		page;
	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
	metap = (HashMetaPage) BufferGetPage(metabuf);
	_hash_checkpage((Page) metap, LH_META_PAGE);
	/* we need a scan key to do our search, so build one */
	itup = &(hitem->hash_itup);
	if ((natts = rel->rd_rel->relnatts) != 1)
		elog(ERROR, "Hash indices valid for only one index key.");
	itup_scankey = _hash_mkscankey(rel, itup, metap);
	/*
	 * find the first page in the bucket chain containing this key and
	 * place it in buf.  _hash_search obtains a read lock for us.
	 */
	_hash_search(rel, natts, itup_scankey, &buf, metap);
	page = BufferGetPage(buf);
	_hash_checkpage(page, LH_BUCKET_PAGE);
	/*
	 * trade in our read lock for a write lock so that we can do the
	 * insertion.
	 */
	blkno = BufferGetBlockNumber(buf);
	_hash_relbuf(rel, buf, HASH_READ);
	buf = _hash_getbuf(rel, blkno, HASH_WRITE);
	/*
	 * XXX btree comment (haven't decided what to do in hash): don't think
	 * the bucket can be split while we're reading the metapage.
	 *
	 * If the page was split between the time that we surrendered our read
	 * lock and acquired our write lock, then this page may no longer be
	 * the right place for the key we want to insert.
	 */
	/* do the insertion */
	res = _hash_insertonpg(rel, buf, natts, itup_scankey,
						   hitem, metabuf);
	/* be tidy */
	_hash_freeskey(itup_scankey);
	return res;
}
/*
 *	_hash_insertonpg() -- Insert a tuple on a particular page in the table.
 *
 *		This recursive procedure does the following things:
 *
 *			+  if necessary, splits the target page.
 *			+  inserts the tuple.
 *
 *		On entry, we must have the right buffer on which to do the
 *		insertion, and the buffer must be pinned and locked.  On return,
 *		we will have dropped both the pin and the write lock on the buffer.
 *
 */
static InsertIndexResult
_hash_insertonpg(Relation rel,
				 Buffer buf,
				 int keysz,
				 ScanKey scankey,
				 HashItem hitem,
				 Buffer metabuf)
{
	InsertIndexResult res;
	Page		page;
	BlockNumber itup_blkno;
	OffsetNumber itup_off;
	Size		itemsz;
	HashPageOpaque pageopaque;
	bool		do_expand = false;
	Buffer		ovflbuf;
	HashMetaPage metap;
	Bucket		bucket;
	metap = (HashMetaPage) BufferGetPage(metabuf);
	_hash_checkpage((Page) metap, LH_META_PAGE);
	page = BufferGetPage(buf);
	_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
	pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
	bucket = pageopaque->hasho_bucket;
	itemsz = IndexTupleDSize(hitem->hash_itup)
		+ (sizeof(HashItemData) - sizeof(IndexTupleData));
	itemsz = MAXALIGN(itemsz);
	while (PageGetFreeSpace(page) < itemsz)
	{
		/*
		 * no space on this page; check for an overflow page
		 */
		if (BlockNumberIsValid(pageopaque->hasho_nextblkno))
		{
			/*
			 * ovfl page exists; go get it.  if it doesn't have room,
			 * we'll find out next pass through the loop test above.
			 */
			ovflbuf = _hash_getbuf(rel, pageopaque->hasho_nextblkno,
								   HASH_WRITE);
			_hash_relbuf(rel, buf, HASH_WRITE);
			buf = ovflbuf;
			page = BufferGetPage(buf);
		}
		else
		{
			/*
			 * we're at the end of the bucket chain and we haven't found a
			 * page with enough room.  allocate a new overflow page.
			 */
			do_expand = true;
			ovflbuf = _hash_addovflpage(rel, &metabuf, buf);
			_hash_relbuf(rel, buf, HASH_WRITE);
			buf = ovflbuf;
			page = BufferGetPage(buf);
			if (PageGetFreeSpace(page) < itemsz)
			{
				/* it doesn't fit on an empty page -- give up */
				elog(ERROR, "hash item too large");
			}
		}
		_hash_checkpage(page, LH_OVERFLOW_PAGE);
		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
		Assert(pageopaque->hasho_bucket == bucket);
	}
	itup_off = _hash_pgaddtup(rel, buf, keysz, scankey, itemsz, hitem);
	itup_blkno = BufferGetBlockNumber(buf);
	/* by here, the new tuple is inserted */
	res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
	ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
	if (res != NULL)
	{
		/*
		 * Increment the number of keys in the table. We switch lock
		 * access type just for a moment to allow greater accessibility to
		 * the metapage.
		 */
		metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf,
												  HASH_READ, HASH_WRITE);
		metap->hashm_nkeys += 1;
		metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf,
												  HASH_WRITE, HASH_READ);
	}
	_hash_wrtbuf(rel, buf);
	if (do_expand ||
		(metap->hashm_nkeys / (metap->hashm_maxbucket + 1))
		> metap->hashm_ffactor)
		_hash_expandtable(rel, metabuf);
	_hash_relbuf(rel, metabuf, HASH_READ);
	return res;
}
/*
 *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
 *
 *		This routine adds the tuple to the page as requested, and keeps the
 *		write lock and reference associated with the page's buffer.  It is
 *		an error to call pgaddtup() without a write lock and reference.
 */
static OffsetNumber
_hash_pgaddtup(Relation rel,
			   Buffer buf,
			   int keysz,
			   ScanKey itup_scankey,
			   Size itemsize,
			   HashItem hitem)
{
	OffsetNumber itup_off;
	Page		page;
	page = BufferGetPage(buf);
	_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
	itup_off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
	PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED);
	/* write the buffer, but hold our lock */
	_hash_wrtnorelbuf(rel, buf);
	return itup_off;
}
 |