summaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtinsert.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2003-02-21 00:06:22 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2003-02-21 00:06:22 +0000
commit70508ba7aed76954b7e630a4952e1360c15db830 (patch)
tree5e688d748e5a183a1248153203ff8ededfd020a8 /src/backend/access/nbtree/nbtinsert.c
parent4df0f1d26f62e835bb357fa7c2e3d5de5fcbf802 (diff)
Make btree index structure adjustments and WAL logging changes needed to
support btree compaction, as per proposal of a few days ago. btree index pages no longer store parent links, instead they have a level indicator (counting up from zero for leaf pages). The FixBTree recovery logic is removed, and replaced by code that detects missing parent-level insertions during WAL replay. Also, generate appropriate WAL entries when updating btree metapage and when building a btree index from scratch. I believe btree indexes are now completely WAL-legal for the first time. initdb forced due to index and WAL changes.
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r--src/backend/access/nbtree/nbtinsert.c1181
1 files changed, 314 insertions, 867 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 99011a5c958..a93a9fed8c6 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.96 2002/09/04 20:31:09 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.97 2003/02/21 00:06:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -35,15 +35,6 @@ typedef struct
int best_delta; /* best size delta so far */
} FindSplitData;
-extern bool FixBTree;
-
-Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
-static void _bt_fixtree(Relation rel, BlockNumber blkno);
-static void _bt_fixbranch(Relation rel, BlockNumber lblkno,
- BlockNumber rblkno, BTStack true_stack);
-static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
-static void _bt_fixup(Relation rel, Buffer buf);
-static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
@@ -54,9 +45,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
BTStack stack,
int keysz, ScanKey scankey,
BTItem btitem,
- OffsetNumber afteritem);
-static void _bt_insertuple(Relation rel, Buffer buf,
- Size itemsz, BTItem btitem, OffsetNumber newitemoff);
+ OffsetNumber afteritem,
+ bool split_only_page);
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
OffsetNumber newitemoff, Size newitemsz,
BTItem newitem, bool newitemonleft,
@@ -149,7 +139,8 @@ top:
}
/* do the insertion */
- res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0);
+ res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem,
+ 0, false);
/* be tidy */
_bt_freestack(stack);
@@ -320,6 +311,7 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
* right using information stored in the parent stack).
* + invokes itself with the appropriate tuple for the right
* child page on the parent.
+ * + updates the metapage if a true root or fast root is split.
*
* On entry, we must have the right buffer on which to do the
* insertion, and the buffer must be pinned and locked. On return,
@@ -358,7 +350,8 @@ _bt_insertonpg(Relation rel,
int keysz,
ScanKey scankey,
BTItem btitem,
- OffsetNumber afteritem)
+ OffsetNumber afteritem,
+ bool split_only_page)
{
InsertIndexResult res;
Page page;
@@ -458,11 +451,10 @@ _bt_insertonpg(Relation rel,
*/
if (PageGetFreeSpace(page) < itemsz)
{
- Buffer rbuf;
- BlockNumber bknum = BufferGetBlockNumber(buf);
- BlockNumber rbknum;
bool is_root = P_ISROOT(lpageop);
+ bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
bool newitemonleft;
+ Buffer rbuf;
/* Choose the split point */
firstright = _bt_findsplitloc(rel, page,
@@ -488,128 +480,127 @@ _bt_insertonpg(Relation rel,
* locks for the child pages until we locate the parent, but we can
* release them before doing the actual insertion (see Lehman and Yao
* for the reasoning).
- *
- * Here we have to do something Lehman and Yao don't talk about:
- * deal with a root split and construction of a new root. If our
- * stack is empty then we have just split a node on what had been
- * the root level when we descended the tree. If it is still the
- * root then we perform a new-root construction. If it *wasn't*
- * the root anymore, use the parent pointer to get up to the root
- * level that someone constructed meanwhile, and find the right
- * place to insert as for the normal case.
*----------
*/
+ _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
+ }
+ else
+ {
+ Buffer metabuf = InvalidBuffer;
+ Page metapg = NULL;
+ BTMetaPageData *metad = NULL;
- if (is_root)
- {
- Buffer rootbuf;
-
- Assert(stack == (BTStack) NULL);
- /* create a new root node and release the split buffers */
- rootbuf = _bt_newroot(rel, buf, rbuf);
- _bt_wrtbuf(rel, rootbuf);
- _bt_wrtbuf(rel, rbuf);
- _bt_wrtbuf(rel, buf);
- }
- else
+ itup_off = newitemoff;
+ itup_blkno = BufferGetBlockNumber(buf);
+
+ /*
+ * If we are doing this insert because we split a page that was
+ * the only one on its tree level, but was not the root, it may
+ * have been the "fast root". We need to ensure that the fast root
+ * link points at or above the current page. We can safely acquire
+ * a lock on the metapage here --- see comments for _bt_newroot().
+ */
+ if (split_only_page)
{
- InsertIndexResult newres;
- BTItem new_item;
- BTStackData fakestack;
- BTItem ritem;
- Buffer pbuf;
-
- /* If root page was splitted */
- if (stack == (BTStack) NULL)
- {
- elog(LOG, "btree: concurrent ROOT page split");
-
- /*
- * If root page splitter failed to create new root page
- * then old root' btpo_parent still points to metapage. We
- * have to fix root page in this case.
- */
- if (BTreeInvalidParent(lpageop))
- {
- if (!FixBTree)
- elog(ERROR, "bt_insertonpg[%s]: no root page found", RelationGetRelationName(rel));
- _bt_wrtbuf(rel, rbuf);
- _bt_wrtnorelbuf(rel, buf);
- elog(WARNING, "bt_insertonpg[%s]: root page unfound - fixing upper levels", RelationGetRelationName(rel));
- _bt_fixup(rel, buf);
- goto formres;
- }
+ metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+ metapg = BufferGetPage(metabuf);
+ metad = BTPageGetMeta(metapg);
- /*
- * Set up a phony stack entry if we haven't got a real one
- */
- stack = &fakestack;
- stack->bts_blkno = lpageop->btpo_parent;
- stack->bts_offset = InvalidOffsetNumber;
- /* bts_btitem will be initialized below */
- stack->bts_parent = NULL;
+ if (metad->btm_fastlevel >= lpageop->btpo.level)
+ {
+ /* no update wanted */
+ _bt_relbuf(rel, metabuf);
+ metabuf = InvalidBuffer;
}
+ }
- /* get high key from left page == lowest key on new right page */
- ritem = (BTItem) PageGetItem(page,
- PageGetItemId(page, P_HIKEY));
+ /* Do the actual update. No elog(ERROR) until changes are logged */
+ START_CRIT_SECTION();
- /* form an index tuple that points at the new right page */
- new_item = _bt_formitem(&(ritem->bti_itup));
- rbknum = BufferGetBlockNumber(rbuf);
- ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
+ _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
- /*
- * Find the parent buffer and get the parent page.
- *
- * Oops - if we were moved right then we need to change stack
- * item! We want to find parent pointing to where we are,
- * right ? - vadim 05/27/97
- *
- * Interestingly, this means we didn't *really* need to stack the
- * parent key at all; all we really care about is the saved
- * block and offset as a starting point for our search...
- */
- ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
- bknum, P_HIKEY);
+ if (BufferIsValid(metabuf))
+ {
+ metad->btm_fastroot = itup_blkno;
+ metad->btm_fastlevel = lpageop->btpo.level;
+ }
- pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+ /* XLOG stuff */
+ if (!rel->rd_istemp)
+ {
+ xl_btree_insert xlrec;
+ xl_btree_metadata xlmeta;
+ uint8 xlinfo;
+ XLogRecPtr recptr;
+ XLogRecData rdata[3];
+ XLogRecData *nextrdata;
+ BTItemData truncitem;
+
+ xlrec.target.node = rel->rd_node;
+ ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfBtreeInsert;
+ rdata[0].next = nextrdata = &(rdata[1]);
+
+ if (BufferIsValid(metabuf))
+ {
+ xlmeta.root = metad->btm_root;
+ xlmeta.level = metad->btm_level;
+ xlmeta.fastroot = metad->btm_fastroot;
+ xlmeta.fastlevel = metad->btm_fastlevel;
+
+ nextrdata->buffer = InvalidBuffer;
+ nextrdata->data = (char *) &xlmeta;
+ nextrdata->len = sizeof(xl_btree_metadata);
+ nextrdata->next = nextrdata + 1;
+ nextrdata++;
+ xlinfo = XLOG_BTREE_INSERT_META;
+ }
+ else if (P_ISLEAF(lpageop))
+ xlinfo = XLOG_BTREE_INSERT_LEAF;
+ else
+ xlinfo = XLOG_BTREE_INSERT_UPPER;
- /* Now we can write and unlock the children */
- _bt_wrtbuf(rel, rbuf);
- _bt_wrtbuf(rel, buf);
+ /* Read comments in _bt_pgaddtup */
+ if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
+ {
+ truncitem = *btitem;
+ truncitem.bti_itup.t_info = sizeof(BTItemData);
+ nextrdata->data = (char *) &truncitem;
+ nextrdata->len = sizeof(BTItemData);
+ }
+ else
+ {
+ nextrdata->data = (char *) btitem;
+ nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
+ }
+ nextrdata->buffer = buf;
+ nextrdata->next = NULL;
+
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
- if (pbuf == InvalidBuffer)
+ if (BufferIsValid(metabuf))
{
- if (!FixBTree)
- elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
- "\n\tRecreate index %s.", RelationGetRelationName(rel));
- pfree(new_item);
- elog(WARNING, "bt_insertonpg[%s]: parent page unfound - fixing branch", RelationGetRelationName(rel));
- _bt_fixbranch(rel, bknum, rbknum, stack);
- goto formres;
+ PageSetLSN(metapg, recptr);
+ PageSetSUI(metapg, ThisStartUpID);
}
- /* Recursively update the parent */
- newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
- 0, NULL, new_item, stack->bts_offset);
- /* be tidy */
- pfree(newres);
- pfree(new_item);
+ PageSetLSN(page, recptr);
+ PageSetSUI(page, ThisStartUpID);
}
- }
- else
- {
- itup_off = newitemoff;
- itup_blkno = BufferGetBlockNumber(buf);
- _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
+ END_CRIT_SECTION();
/* Write out the updated page and release pin/lock */
+ if (BufferIsValid(metabuf))
+ _bt_wrtbuf(rel, metabuf);
+
_bt_wrtbuf(rel, buf);
}
-formres:;
/* by here, the new tuple is inserted at itup_blkno/itup_off */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
@@ -617,61 +608,6 @@ formres:;
return res;
}
-static void
-_bt_insertuple(Relation rel, Buffer buf,
- Size itemsz, BTItem btitem, OffsetNumber newitemoff)
-{
- Page page = BufferGetPage(buf);
- BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
-
- START_CRIT_SECTION();
-
- _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
-
- /* XLOG stuff */
- if (!rel->rd_istemp)
- {
- xl_btree_insert xlrec;
- uint8 flag = XLOG_BTREE_INSERT;
- XLogRecPtr recptr;
- XLogRecData rdata[2];
- BTItemData truncitem;
-
- xlrec.target.node = rel->rd_node;
- ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfBtreeInsert;
- rdata[0].next = &(rdata[1]);
-
- /* Read comments in _bt_pgaddtup */
- if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop))
- {
- truncitem = *btitem;
- truncitem.bti_itup.t_info = sizeof(BTItemData);
- rdata[1].data = (char *) &truncitem;
- rdata[1].len = sizeof(BTItemData);
- }
- else
- {
- rdata[1].data = (char *) btitem;
- rdata[1].len = IndexTupleDSize(btitem->bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- }
- rdata[1].buffer = buf;
- rdata[1].next = NULL;
- if (P_ISLEAF(pageop))
- flag |= XLOG_BTREE_LEAF;
-
- recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
-
- PageSetLSN(page, recptr);
- PageSetSUI(page, ThisStartUpID);
- }
-
- END_CRIT_SECTION();
-}
-
/*
* _bt_split() -- split a page in the btree.
*
@@ -729,13 +665,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
lopaque->btpo_next = BufferGetBlockNumber(rbuf);
ropaque->btpo_prev = BufferGetBlockNumber(buf);
ropaque->btpo_next = oopaque->btpo_next;
-
- /*
- * Must copy the original parent link into both new pages, even though
- * it might be quite obsolete by now. We might need it if this level
- * is or recently was the root (see README).
- */
- lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
+ lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
/*
* If the page we're splitting is not the rightmost page at its level
@@ -876,34 +806,29 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
if (!rel->rd_istemp)
{
xl_btree_split xlrec;
- int flag = (newitemonleft) ?
- XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT;
- BlockNumber blkno;
+ uint8 xlinfo;
XLogRecPtr recptr;
XLogRecData rdata[4];
xlrec.target.node = rel->rd_node;
ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
if (newitemonleft)
- {
- blkno = BufferGetBlockNumber(rbuf);
- BlockIdSet(&(xlrec.otherblk), blkno);
- }
+ xlrec.otherblk = BufferGetBlockNumber(rbuf);
else
- {
- blkno = BufferGetBlockNumber(buf);
- BlockIdSet(&(xlrec.otherblk), blkno);
- }
- BlockIdSet(&(xlrec.parentblk), lopaque->btpo_parent);
- BlockIdSet(&(xlrec.leftblk), lopaque->btpo_prev);
- BlockIdSet(&(xlrec.rightblk), ropaque->btpo_next);
+ xlrec.otherblk = BufferGetBlockNumber(buf);
+ xlrec.leftblk = lopaque->btpo_prev;
+ xlrec.rightblk = ropaque->btpo_next;
+ xlrec.level = lopaque->btpo.level;
/*
* Direct access to page is not good but faster - we should
- * implement some new func in page API.
+ * implement some new func in page API. Note we only store the
+ * tuples themselves, knowing that the item pointers are in the
+ * same order and can be reconstructed by scanning the tuples.
*/
xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
((PageHeader) leftpage)->pd_upper;
+
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBtreeSplit;
@@ -933,10 +858,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
rdata[3].next = NULL;
}
- if (P_ISLEAF(lopaque))
- flag |= XLOG_BTREE_LEAF;
+ if (P_ISROOT(oopaque))
+ xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
+ else
+ xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
- recptr = XLogInsert(RM_BTREE_ID, flag, rdata);
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
PageSetLSN(leftpage, recptr);
PageSetSUI(leftpage, ThisStartUpID);
@@ -1176,47 +1103,178 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
}
/*
+ * _bt_insert_parent() -- Insert downlink into parent after a page split.
+ *
+ * On entry, buf and rbuf are the left and right split pages, which we
+ * still hold write locks on per the L&Y algorithm. We release the
+ * write locks once we have write lock on the parent page. (Any sooner,
+ * and it'd be possible for some other process to try to split or delete
+ * one of these pages, and get confused because it cannot find the downlink.)
+ *
+ * stack - stack showing how we got here. May be NULL in cases that don't
+ * have to be efficient (concurrent ROOT split, WAL recovery)
+ * is_root - we split the true root
+ * is_only - we split a page alone on its level (might have been fast root)
+ *
+ * This is exported so it can be called by nbtxlog.c.
+ */
+void
+_bt_insert_parent(Relation rel,
+ Buffer buf,
+ Buffer rbuf,
+ BTStack stack,
+ bool is_root,
+ bool is_only)
+{
+ /*
+ * Here we have to do something Lehman and Yao don't talk about:
+ * deal with a root split and construction of a new root. If our
+ * stack is empty then we have just split a node on what had been
+ * the root level when we descended the tree. If it was still the
+ * root then we perform a new-root construction. If it *wasn't*
+ * the root anymore, search to find the next higher level that
+ * someone constructed meanwhile, and find the right place to insert
+ * as for the normal case.
+ *
+ * If we have to search for the parent level, we do so by
+ * re-descending from the root. This is not super-efficient,
+ * but it's rare enough not to matter. (This path is also taken
+ * when called from WAL recovery --- we have no stack in that case.)
+ */
+ if (is_root)
+ {
+ Buffer rootbuf;
+
+ Assert(stack == (BTStack) NULL);
+ Assert(is_only);
+ /* create a new root node and update the metapage */
+ rootbuf = _bt_newroot(rel, buf, rbuf);
+ /* release the split buffers */
+ _bt_wrtbuf(rel, rootbuf);
+ _bt_wrtbuf(rel, rbuf);
+ _bt_wrtbuf(rel, buf);
+ }
+ else
+ {
+ BlockNumber bknum = BufferGetBlockNumber(buf);
+ BlockNumber rbknum = BufferGetBlockNumber(rbuf);
+ Page page = BufferGetPage(buf);
+ InsertIndexResult newres;
+ BTItem new_item;
+ BTStackData fakestack;
+ BTItem ritem;
+ Buffer pbuf;
+
+ if (stack == (BTStack) NULL)
+ {
+ BTPageOpaque lpageop;
+
+ if (!InRecovery)
+ elog(DEBUG1, "_bt_insert_parent: concurrent ROOT page split");
+ lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ /* Find the leftmost page at the next level up */
+ pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
+ /* Set up a phony stack entry pointing there */
+ stack = &fakestack;
+ stack->bts_blkno = BufferGetBlockNumber(pbuf);
+ stack->bts_offset = InvalidOffsetNumber;
+ /* bts_btitem will be initialized below */
+ stack->bts_parent = NULL;
+ _bt_relbuf(rel, pbuf);
+ }
+
+ /* get high key from left page == lowest key on new right page */
+ ritem = (BTItem) PageGetItem(page,
+ PageGetItemId(page, P_HIKEY));
+
+ /* form an index tuple that points at the new right page */
+ new_item = _bt_formitem(&(ritem->bti_itup));
+ ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
+
+ /*
+ * Find the parent buffer and get the parent page.
+ *
+ * Oops - if we were moved right then we need to change stack
+ * item! We want to find parent pointing to where we are,
+ * right ? - vadim 05/27/97
+ */
+ ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
+ bknum, P_HIKEY);
+
+ pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
+
+ /* Now we can write and unlock the children */
+ _bt_wrtbuf(rel, rbuf);
+ _bt_wrtbuf(rel, buf);
+
+ /* Check for error only after writing children */
+ if (pbuf == InvalidBuffer)
+ elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
+ "\n\tRecreate index %s.", RelationGetRelationName(rel));
+
+ /* Recursively update the parent */
+ newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
+ 0, NULL, new_item, stack->bts_offset,
+ is_only);
+
+ /* be tidy */
+ pfree(newres);
+ pfree(new_item);
+ }
+}
+
+/*
* _bt_getstackbuf() -- Walk back up the tree one step, and find the item
* we last looked at in the parent.
*
- * This is possible because we save a bit image of the last item
- * we looked at in the parent, and the update algorithm guarantees
- * that if items above us in the tree move, they only move right.
+ * This is possible because we save the downlink from the parent item,
+ * which is enough to uniquely identify it. Insertions into the parent
+ * level could cause the item to move right; deletions could cause it
+ * to move left, but not left of the page we previously found it in.
*
- * Also, re-set bts_blkno & bts_offset if changed.
+ * Adjusts bts_blkno & bts_offset if changed.
+ *
+ * Returns InvalidBuffer if item not found (should not happen).
*/
static Buffer
_bt_getstackbuf(Relation rel, BTStack stack, int access)
{
BlockNumber blkno;
- Buffer buf;
- OffsetNumber start,
- offnum,
- maxoff;
- Page page;
- ItemId itemid;
- BTItem item;
- BTPageOpaque opaque;
+ OffsetNumber start;
blkno = stack->bts_blkno;
- buf = _bt_getbuf(rel, blkno, access);
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- maxoff = PageGetMaxOffsetNumber(page);
-
start = stack->bts_offset;
- /*
- * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the case of
- * concurrent ROOT page split. Also, watch out for possibility that
- * page has a high key now when it didn't before.
- */
- if (start < P_FIRSTDATAKEY(opaque))
- start = P_FIRSTDATAKEY(opaque);
-
for (;;)
{
- /* see if it's on this page */
+ Buffer buf;
+ Page page;
+ BTPageOpaque opaque;
+ OffsetNumber offnum,
+ minoff,
+ maxoff;
+ ItemId itemid;
+ BTItem item;
+
+ buf = _bt_getbuf(rel, blkno, access);
+ page = BufferGetPage(buf);
+ opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ minoff = P_FIRSTDATAKEY(opaque);
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ /*
+ * start = InvalidOffsetNumber means "search the whole page".
+ * We need this test anyway due to possibility that
+ * page has a high key now when it didn't before.
+ */
+ if (start < minoff)
+ start = minoff;
+
+ /*
+ * These loops will check every item on the page --- but in an order
+ * that's attuned to the probability of where it actually is. Scan
+ * to the right first, then to the left.
+ */
for (offnum = start;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
@@ -1232,23 +1290,32 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
}
}
+ for (offnum = OffsetNumberPrev(start);
+ offnum >= minoff;
+ offnum = OffsetNumberPrev(offnum))
+ {
+ itemid = PageGetItemId(page, offnum);
+ item = (BTItem) PageGetItem(page, itemid);
+ if (BTItemSame(item, &stack->bts_btitem))
+ {
+ /* Return accurate pointer to where link is now */
+ stack->bts_blkno = blkno;
+ stack->bts_offset = offnum;
+ return buf;
+ }
+ }
+
/*
- * by here, the item we're looking for moved right at least one
- * page
+ * The item we're looking for moved right at least one page.
*/
if (P_RIGHTMOST(opaque))
{
_bt_relbuf(rel, buf);
return (InvalidBuffer);
}
-
blkno = opaque->btpo_next;
+ start = InvalidOffsetNumber;
_bt_relbuf(rel, buf);
- buf = _bt_getbuf(rel, blkno, access);
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- maxoff = PageGetMaxOffsetNumber(page);
- start = P_FIRSTDATAKEY(opaque);
}
}
@@ -1289,6 +1356,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
Page metapg;
BTMetaPageData *metad;
+ lbkno = BufferGetBlockNumber(lbuf);
+ rbkno = BufferGetBlockNumber(rbuf);
+ lpage = BufferGetPage(lbuf);
+ rpage = BufferGetPage(rbuf);
+
/* get a new root page */
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
rootpage = BufferGetPage(rootbuf);
@@ -1303,22 +1375,15 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
/* set btree special data */
rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
- rootopaque->btpo_flags |= BTP_ROOT;
- rootopaque->btpo_parent = BTREE_METAPAGE;
+ rootopaque->btpo_flags = BTP_ROOT;
+ rootopaque->btpo.level =
+ ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
- lbkno = BufferGetBlockNumber(lbuf);
- rbkno = BufferGetBlockNumber(rbuf);
- lpage = BufferGetPage(lbuf);
- rpage = BufferGetPage(rbuf);
-
- /*
- * Make sure pages in old root level have valid parent links --- we
- * will need this in _bt_insertonpg() if a concurrent root split
- * happens (see README).
- */
- ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
- ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
- rootblknum;
+ /* update metapage data */
+ metad->btm_root = rootblknum;
+ metad->btm_level = rootopaque->btpo.level;
+ metad->btm_fastroot = rootblknum;
+ metad->btm_fastlevel = rootopaque->btpo.level;
/*
* Create downlink item for left page (old root). Since this will be
@@ -1356,9 +1421,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
elog(PANIC, "btree: failed to add rightkey to new root page");
pfree(new_item);
- metad->btm_root = rootblknum;
- (metad->btm_level)++;
-
/* XLOG stuff */
if (!rel->rd_istemp)
{
@@ -1367,8 +1429,9 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
XLogRecData rdata[2];
xlrec.node = rel->rd_node;
+ xlrec.rootblk = rootblknum;
xlrec.level = metad->btm_level;
- BlockIdSet(&(xlrec.rootblk), rootblknum);
+
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBtreeNewroot;
@@ -1390,8 +1453,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
PageSetSUI(rootpage, ThisStartUpID);
PageSetLSN(metapg, recptr);
PageSetSUI(metapg, ThisStartUpID);
-
- /* we changed their btpo_parent */
PageSetLSN(lpage, recptr);
PageSetSUI(lpage, ThisStartUpID);
PageSetLSN(rpage, recptr);
@@ -1407,620 +1468,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
}
/*
- * In the event old root page was splitted but no new one was created we
- * build required parent levels keeping write lock on old root page.
- * Note: it's assumed that old root page' btpo_parent points to meta page,
- * ie not to parent page. On exit, new root page buffer is write locked.
- * If "release" is TRUE then oldrootbuf will be released immediately
- * after upper level is builded.
- */
-Buffer
-_bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
-{
- Buffer rootbuf;
- BlockNumber rootblk;
- Page rootpage;
- XLogRecPtr rootLSN;
- Page oldrootpage = BufferGetPage(oldrootbuf);
- BTPageOpaque oldrootopaque = (BTPageOpaque)
- PageGetSpecialPointer(oldrootpage);
- Buffer buf,
- leftbuf,
- rightbuf;
- Page page,
- leftpage,
- rightpage;
- BTPageOpaque opaque,
- leftopaque,
- rightopaque;
- OffsetNumber newitemoff;
- BTItem btitem,
- ritem;
- Size itemsz;
-
- if (!P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque))
- elog(ERROR, "bt_fixroot: not valid old root page");
-
- /* Read right neighbor and create new root page */
- leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE);
- leftpage = BufferGetPage(leftbuf);
- leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
- rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf);
- rootpage = BufferGetPage(rootbuf);
- rootLSN = PageGetLSN(rootpage);
- rootblk = BufferGetBlockNumber(rootbuf);
-
- /* parent page where to insert pointers */
- buf = rootbuf;
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-
- /*
- * Now read other pages (if any) on level and add them to new root.
- * Here we break one of our locking rules - never hold lock on parent
- * page when acquiring lock on its child, - but we free from deadlock:
- *
- * If concurrent process will split one of pages on this level then it
- * will see either btpo_parent == metablock or btpo_parent == rootblk.
- * In first case it will give up its locks and walk to the leftmost
- * page (oldrootbuf) in _bt_fixup() - ie it will wait for us and let
- * us continue. In second case it will try to lock rootbuf keeping its
- * locks on buffers we already passed, also waiting for us. If we'll
- * have to unlock rootbuf (split it) and that process will have to
- * split page of new level we created (level of rootbuf) then it will
- * wait while we create upper level. Etc.
- */
- while (!P_RIGHTMOST(leftopaque))
- {
- rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE);
- rightpage = BufferGetPage(rightbuf);
- rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
-
- /*
- * Update LSN & StartUpID of child page buffer to ensure that it
- * will be written on disk after flushing log record for new root
- * creation. Unfortunately, for the moment (?) we do not log this
- * operation and so possibly break our rule to log entire page
- * content on first after checkpoint modification.
- */
- HOLD_INTERRUPTS();
- rightopaque->btpo_parent = rootblk;
- if (XLByteLT(PageGetLSN(rightpage), rootLSN))
- PageSetLSN(rightpage, rootLSN);
- PageSetSUI(rightpage, ThisStartUpID);
- RESUME_INTERRUPTS();
-
- ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY));
- btitem = _bt_formitem(&(ritem->bti_itup));
- ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY);
- itemsz = IndexTupleDSize(btitem->bti_itup)
- + (sizeof(BTItemData) - sizeof(IndexTupleData));
- itemsz = MAXALIGN(itemsz);
-
- newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-
- if (PageGetFreeSpace(page) < itemsz)
- {
- Buffer newbuf;
- OffsetNumber firstright;
- OffsetNumber itup_off;
- BlockNumber itup_blkno;
- bool newitemonleft;
-
- firstright = _bt_findsplitloc(rel, page,
- newitemoff, itemsz, &newitemonleft);
- newbuf = _bt_split(rel, buf, firstright,
- newitemoff, itemsz, btitem, newitemonleft,
- &itup_off, &itup_blkno);
- /* Keep lock on new "root" buffer ! */
- if (buf != rootbuf)
- _bt_relbuf(rel, buf);
- buf = newbuf;
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- }
- else
- _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
-
- /* give up left buffer */
- _bt_wrtbuf(rel, leftbuf);
- pfree(btitem);
- leftbuf = rightbuf;
- leftpage = rightpage;
- leftopaque = rightopaque;
- }
-
- /* give up rightmost page buffer */
- _bt_wrtbuf(rel, leftbuf);
-
- /*
- * Here we hold locks on old root buffer, new root buffer we've
- * created with _bt_newroot() - rootbuf, - and buf we've used for last
- * insert ops - buf. If rootbuf != buf then we have to create at least
- * one more level. And if "release" is TRUE then we give up
- * oldrootbuf.
- */
- if (release)
- _bt_wrtbuf(rel, oldrootbuf);
-
- if (rootbuf != buf)
- {
- _bt_wrtbuf(rel, buf);
- return (_bt_fixroot(rel, rootbuf, true));
- }
-
- return (rootbuf);
-}
-
-/*
- * Using blkno of leftmost page on a level inside tree this func
- * checks/fixes tree from this level up to the root page.
- */
-static void
-_bt_fixtree(Relation rel, BlockNumber blkno)
-{
- Buffer buf;
- Page page;
- BTPageOpaque opaque;
- BlockNumber pblkno;
-
- for (;;)
- {
- buf = _bt_getbuf(rel, blkno, BT_READ);
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (!P_LEFTMOST(opaque) || P_ISLEAF(opaque))
- elog(ERROR, "bt_fixtree[%s]: invalid start page (need to recreate index)", RelationGetRelationName(rel));
- pblkno = opaque->btpo_parent;
-
- /* check/fix entire level */
- _bt_fixlevel(rel, buf, InvalidBlockNumber);
-
- /*
- * No pins/locks are held here. Re-read start page if its
- * btpo_parent pointed to meta page else go up one level.
- *
- * XXX have to catch InvalidBlockNumber at the moment -:(
- */
- if (pblkno == BTREE_METAPAGE || pblkno == InvalidBlockNumber)
- {
- buf = _bt_getbuf(rel, blkno, BT_WRITE);
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (P_ISROOT(opaque))
- {
- /* Tree is Ok now */
- _bt_relbuf(rel, buf);
- return;
- }
- /* Call _bt_fixroot() if there is no upper level */
- if (BTreeInvalidParent(opaque))
- {
- elog(WARNING, "bt_fixtree[%s]: fixing root page", RelationGetRelationName(rel));
- buf = _bt_fixroot(rel, buf, true);
- _bt_relbuf(rel, buf);
- return;
- }
- /* Have to go up one level */
- pblkno = opaque->btpo_parent;
- _bt_relbuf(rel, buf);
- }
- blkno = pblkno;
- }
-
-}
-
-/*
- * Check/fix level starting from page in buffer buf up to block
- * limit on *child* level (or till rightmost child page if limit
- * is InvalidBlockNumber). Start buffer must be read locked.
- * No pins/locks are held on exit.
- */
-static void
-_bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
-{
- BlockNumber blkno = BufferGetBlockNumber(buf);
- Page page;
- BTPageOpaque opaque;
- BlockNumber cblkno[3];
- OffsetNumber coff[3];
- Buffer cbuf[3];
- Page cpage[3];
- BTPageOpaque copaque[3];
- BTItem btitem;
- int cidx,
- i;
- bool goodbye = false;
- char tbuf[BLCKSZ];
-
- page = BufferGetPage(buf);
- /* copy page to temp storage */
- memmove(tbuf, page, PageGetPageSize(page));
- _bt_relbuf(rel, buf);
-
- page = (Page) tbuf;
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-
- /* Initialize first child data */
- coff[0] = P_FIRSTDATAKEY(opaque);
- if (coff[0] > PageGetMaxOffsetNumber(page))
- elog(ERROR, "bt_fixlevel[%s]: invalid maxoff on start page (need to recreate index)", RelationGetRelationName(rel));
- btitem = (BTItem) PageGetItem(page, PageGetItemId(page, coff[0]));
- cblkno[0] = ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid));
- cbuf[0] = _bt_getbuf(rel, cblkno[0], BT_READ);
- cpage[0] = BufferGetPage(cbuf[0]);
- copaque[0] = (BTPageOpaque) PageGetSpecialPointer(cpage[0]);
- if (P_LEFTMOST(opaque) && !P_LEFTMOST(copaque[0]))
- elog(ERROR, "bt_fixtlevel[%s]: non-leftmost child page of leftmost parent (need to recreate index)", RelationGetRelationName(rel));
- /* caller should take care and avoid this */
- if (P_RIGHTMOST(copaque[0]))
- elog(ERROR, "bt_fixtlevel[%s]: invalid start child (need to recreate index)", RelationGetRelationName(rel));
-
- for (;;)
- {
- /*
- * Read up to 2 more child pages and look for pointers to them in
- * *saved* parent page
- */
- coff[1] = coff[2] = InvalidOffsetNumber;
- for (cidx = 0; cidx < 2;)
- {
- cidx++;
- cblkno[cidx] = (copaque[cidx - 1])->btpo_next;
- cbuf[cidx] = _bt_getbuf(rel, cblkno[cidx], BT_READ);
- cpage[cidx] = BufferGetPage(cbuf[cidx]);
- copaque[cidx] = (BTPageOpaque) PageGetSpecialPointer(cpage[cidx]);
- coff[cidx] = _bt_getoff(page, cblkno[cidx]);
-
- /* sanity check */
- if (coff[cidx] != InvalidOffsetNumber)
- {
- for (i = cidx - 1; i >= 0; i--)
- {
- if (coff[i] == InvalidOffsetNumber)
- continue;
- if (coff[cidx] != coff[i] + 1)
- elog(ERROR, "bt_fixlevel[%s]: invalid item order(1) (need to recreate index)", RelationGetRelationName(rel));
- break;
- }
- }
-
- if (P_RIGHTMOST(copaque[cidx]))
- break;
- }
-
- /*
- * Read parent page and insert missed pointers.
- */
- if (coff[1] == InvalidOffsetNumber ||
- (cidx == 2 && coff[2] == InvalidOffsetNumber))
- {
- Buffer newbuf;
- Page newpage;
- BTPageOpaque newopaque;
- BTItem ritem;
- Size itemsz;
- OffsetNumber newitemoff;
- BlockNumber parblk[3];
- BTStackData stack;
-
- stack.bts_parent = NULL;
- stack.bts_blkno = blkno;
- stack.bts_offset = InvalidOffsetNumber;
- ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
- cblkno[0], P_HIKEY);
-
- buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
- if (buf == InvalidBuffer)
- elog(ERROR, "bt_fixlevel[%s]: pointer disappeared (need to recreate index)", RelationGetRelationName(rel));
-
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- coff[0] = stack.bts_offset;
- blkno = BufferGetBlockNumber(buf);
- parblk[0] = blkno;
-
- /* Check/insert missed pointers */
- for (i = 1; i <= cidx; i++)
- {
- coff[i] = _bt_getoff(page, cblkno[i]);
-
- /* sanity check */
- parblk[i] = BufferGetBlockNumber(buf);
- if (coff[i] != InvalidOffsetNumber)
- {
- if (parblk[i] == parblk[i - 1] &&
- coff[i] != coff[i - 1] + 1)
- elog(ERROR, "bt_fixlevel[%s]: invalid item order(2) (need to recreate index)", RelationGetRelationName(rel));
- continue;
- }
- /* Have to check next page ? */
- if ((!P_RIGHTMOST(opaque)) &&
- coff[i - 1] == PageGetMaxOffsetNumber(page)) /* yes */
- {
- newbuf = _bt_getbuf(rel, opaque->btpo_next, BT_WRITE);
- newpage = BufferGetPage(newbuf);
- newopaque = (BTPageOpaque) PageGetSpecialPointer(newpage);
- coff[i] = _bt_getoff(newpage, cblkno[i]);
- if (coff[i] != InvalidOffsetNumber) /* found ! */
- {
- if (coff[i] != P_FIRSTDATAKEY(newopaque))
- elog(ERROR, "bt_fixlevel[%s]: invalid item order(3) (need to recreate index)", RelationGetRelationName(rel));
- _bt_relbuf(rel, buf);
- buf = newbuf;
- page = newpage;
- opaque = newopaque;
- blkno = BufferGetBlockNumber(buf);
- parblk[i] = blkno;
- continue;
- }
- /* unfound - need to insert on current page */
- _bt_relbuf(rel, newbuf);
- }
- /* insert pointer */
- ritem = (BTItem) PageGetItem(cpage[i - 1],
- PageGetItemId(cpage[i - 1], P_HIKEY));
- btitem = _bt_formitem(&(ritem->bti_itup));
- ItemPointerSet(&(btitem->bti_itup.t_tid), cblkno[i], P_HIKEY);
- itemsz = IndexTupleDSize(btitem->bti_itup)
- + (sizeof(BTItemData) - sizeof(IndexTupleData));
- itemsz = MAXALIGN(itemsz);
-
- newitemoff = coff[i - 1] + 1;
-
- if (PageGetFreeSpace(page) < itemsz)
- {
- OffsetNumber firstright;
- OffsetNumber itup_off;
- BlockNumber itup_blkno;
- bool newitemonleft;
-
- firstright = _bt_findsplitloc(rel, page,
- newitemoff, itemsz, &newitemonleft);
- newbuf = _bt_split(rel, buf, firstright,
- newitemoff, itemsz, btitem, newitemonleft,
- &itup_off, &itup_blkno);
- /* what buffer we need in ? */
- if (newitemonleft)
- _bt_relbuf(rel, newbuf);
- else
- {
- _bt_relbuf(rel, buf);
- buf = newbuf;
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- }
- blkno = BufferGetBlockNumber(buf);
- coff[i] = itup_off;
- }
- else
- {
- _bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
- coff[i] = newitemoff;
- }
-
- pfree(btitem);
- parblk[i] = blkno;
- }
-
- /* copy page with pointer to cblkno[cidx] to temp storage */
- memmove(tbuf, page, PageGetPageSize(page));
- _bt_relbuf(rel, buf);
- page = (Page) tbuf;
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- }
-
- /* Continue if current check/fix level page is rightmost */
- if (P_RIGHTMOST(opaque))
- goodbye = false;
-
- /* Pointers to child pages are Ok - right end of child level ? */
- _bt_relbuf(rel, cbuf[0]);
- _bt_relbuf(rel, cbuf[1]);
- if (cidx == 1 ||
- (cidx == 2 && (P_RIGHTMOST(copaque[2]) || goodbye)))
- {
- if (cidx == 2)
- _bt_relbuf(rel, cbuf[2]);
- return;
- }
- if (cblkno[0] == limit || cblkno[1] == limit)
- goodbye = true;
- cblkno[0] = cblkno[2];
- cbuf[0] = cbuf[2];
- cpage[0] = cpage[2];
- copaque[0] = copaque[2];
- coff[0] = coff[2];
- }
-}
-
-/*
- * Check/fix part of tree - branch - up from parent of level with blocks
- * lblkno and rblknum. We first ensure that parent level has pointers
- * to both lblkno & rblknum and if those pointers are on different
- * parent pages then do the same for parent level, etc. No locks must
- * be held on target level and upper on entry. No locks will be held
- * on exit. Stack created when traversing tree down should be provided and
- * it must points to parent level. rblkno must be on the right from lblkno.
- * (This function is special edition of more expensive _bt_fixtree(),
- * but it doesn't guarantee full consistency of tree.)
- */
-static void
-_bt_fixbranch(Relation rel, BlockNumber lblkno,
- BlockNumber rblkno, BTStack true_stack)
-{
- BlockNumber blkno = true_stack->bts_blkno;
- BTStackData stack;
- BTPageOpaque opaque;
- Buffer buf,
- rbuf;
- Page page;
- OffsetNumber offnum;
-
- true_stack = true_stack->bts_parent;
- for (;;)
- {
- buf = _bt_getbuf(rel, blkno, BT_READ);
-
- /* Check/fix parent level pointed by blkno */
- _bt_fixlevel(rel, buf, rblkno);
-
- /*
- * Here parent level should have pointers for both lblkno and
- * rblkno and we have to find them.
- */
- stack.bts_parent = NULL;
- stack.bts_blkno = blkno;
- stack.bts_offset = InvalidOffsetNumber;
- ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
- buf = _bt_getstackbuf(rel, &stack, BT_READ);
- if (buf == InvalidBuffer)
- elog(ERROR, "bt_fixbranch[%s]: left pointer unfound (need to recreate index)", RelationGetRelationName(rel));
- page = BufferGetPage(buf);
- offnum = _bt_getoff(page, rblkno);
-
- if (offnum != InvalidOffsetNumber) /* right pointer found */
- {
- if (offnum <= stack.bts_offset)
- elog(ERROR, "bt_fixbranch[%s]: invalid item order (need to recreate index)", RelationGetRelationName(rel));
- _bt_relbuf(rel, buf);
- return;
- }
-
- /* Pointers are on different parent pages - find right one */
- lblkno = BufferGetBlockNumber(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (P_RIGHTMOST(opaque))
- elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(1) (need to recreate index)", RelationGetRelationName(rel));
-
- stack.bts_parent = NULL;
- stack.bts_blkno = opaque->btpo_next;
- stack.bts_offset = InvalidOffsetNumber;
- ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
- rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
- if (rbuf == InvalidBuffer)
- elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(2) (need to recreate index)", RelationGetRelationName(rel));
- rblkno = BufferGetBlockNumber(rbuf);
- _bt_relbuf(rel, rbuf);
-
- /*
- * If we have parent item in true_stack then go up one level and
- * ensure that it has pointers to new lblkno & rblkno.
- */
- if (true_stack)
- {
- _bt_relbuf(rel, buf);
- blkno = true_stack->bts_blkno;
- true_stack = true_stack->bts_parent;
- continue;
- }
-
- /*
- * Well, we are on the level that was root or unexistent when we
- * started traversing tree down. If btpo_parent is updated then
- * we'll use it to continue, else we'll fix/restore upper levels
- * entirely.
- */
- if (!BTreeInvalidParent(opaque))
- {
- blkno = opaque->btpo_parent;
- _bt_relbuf(rel, buf);
- continue;
- }
-
- /* Have to switch to excl buf lock and re-check btpo_parent */
- _bt_relbuf(rel, buf);
- buf = _bt_getbuf(rel, blkno, BT_WRITE);
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (!BTreeInvalidParent(opaque))
- {
- blkno = opaque->btpo_parent;
- _bt_relbuf(rel, buf);
- continue;
- }
-
- /*
- * We hold excl lock on some internal page with unupdated
- * btpo_parent - time for _bt_fixup.
- */
- break;
- }
-
- elog(WARNING, "bt_fixbranch[%s]: fixing upper levels", RelationGetRelationName(rel));
- _bt_fixup(rel, buf);
-
- return;
-}
-
-/*
- * Having buf excl locked this routine walks to the left on level and
- * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
- * levels. No buffer pins/locks will be held on exit.
- */
-static void
-_bt_fixup(Relation rel, Buffer buf)
-{
- Page page;
- BTPageOpaque opaque;
- BlockNumber blkno;
-
- for (;;)
- {
- page = BufferGetPage(buf);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-
- /*
- * If someone else already created parent pages then it's time for
- * _bt_fixtree() to check upper levels and fix them, if required.
- */
- if (!BTreeInvalidParent(opaque))
- {
- blkno = opaque->btpo_parent;
- _bt_relbuf(rel, buf);
- elog(WARNING, "bt_fixup[%s]: checking/fixing upper levels", RelationGetRelationName(rel));
- _bt_fixtree(rel, blkno);
- return;
- }
- if (P_LEFTMOST(opaque))
- break;
- blkno = opaque->btpo_prev;
- _bt_relbuf(rel, buf);
- buf = _bt_getbuf(rel, blkno, BT_WRITE);
- }
-
- /*
- * Ok, we are on the leftmost page, it's write locked by us and its
- * btpo_parent points to meta page - time for _bt_fixroot().
- */
- elog(WARNING, "bt_fixup[%s]: fixing root page", RelationGetRelationName(rel));
- buf = _bt_fixroot(rel, buf, true);
- _bt_relbuf(rel, buf);
-}
-
-static OffsetNumber
-_bt_getoff(Page page, BlockNumber blkno)
-{
- BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
- OffsetNumber offnum = P_FIRSTDATAKEY(opaque);
- BlockNumber curblkno;
- ItemId itemid;
- BTItem item;
-
- for (; offnum <= maxoff; offnum++)
- {
- itemid = PageGetItemId(page, offnum);
- item = (BTItem) PageGetItem(page, itemid);
- curblkno = ItemPointerGetBlockNumber(&(item->bti_itup.t_tid));
- if (curblkno == blkno)
- return (offnum);
- }
-
- return (InvalidOffsetNumber);
-}
-
-/*
* _bt_pgaddtup() -- add a tuple to a particular page in the index.
*
* This routine adds the tuple to the page as requested. It does