diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2003-02-21 00:06:22 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2003-02-21 00:06:22 +0000 |
commit | 70508ba7aed76954b7e630a4952e1360c15db830 (patch) | |
tree | 5e688d748e5a183a1248153203ff8ededfd020a8 /src/backend/access/nbtree/nbtinsert.c | |
parent | 4df0f1d26f62e835bb357fa7c2e3d5de5fcbf802 (diff) |
Make btree index structure adjustments and WAL logging changes needed to
support btree compaction, as per proposal of a few days ago. btree index
pages no longer store parent links, instead they have a level indicator
(counting up from zero for leaf pages). The FixBTree recovery logic is
removed, and replaced by code that detects missing parent-level insertions
during WAL replay. Also, generate appropriate WAL entries when updating
btree metapage and when building a btree index from scratch. I believe
btree indexes are now completely WAL-legal for the first time.
initdb forced due to index and WAL changes.
Diffstat (limited to 'src/backend/access/nbtree/nbtinsert.c')
-rw-r--r-- | src/backend/access/nbtree/nbtinsert.c | 1181 |
1 files changed, 314 insertions, 867 deletions
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 99011a5c958..a93a9fed8c6 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.96 2002/09/04 20:31:09 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.97 2003/02/21 00:06:21 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -35,15 +35,6 @@ typedef struct int best_delta; /* best size delta so far */ } FindSplitData; -extern bool FixBTree; - -Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release); -static void _bt_fixtree(Relation rel, BlockNumber blkno); -static void _bt_fixbranch(Relation rel, BlockNumber lblkno, - BlockNumber rblkno, BTStack true_stack); -static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit); -static void _bt_fixup(Relation rel, Buffer buf); -static OffsetNumber _bt_getoff(Page page, BlockNumber blkno); static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); @@ -54,9 +45,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, - OffsetNumber afteritem); -static void _bt_insertuple(Relation rel, Buffer buf, - Size itemsz, BTItem btitem, OffsetNumber newitemoff); + OffsetNumber afteritem, + bool split_only_page); static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, BTItem newitem, bool newitemonleft, @@ -149,7 +139,8 @@ top: } /* do the insertion */ - res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, 0); + res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem, + 0, false); /* be tidy */ _bt_freestack(stack); @@ -320,6 +311,7 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, * right using information stored in the parent stack). * + invokes itself with the appropriate tuple for the right * child page on the parent. + * + updates the metapage if a true root or fast root is split. * * On entry, we must have the right buffer on which to do the * insertion, and the buffer must be pinned and locked. On return, @@ -358,7 +350,8 @@ _bt_insertonpg(Relation rel, int keysz, ScanKey scankey, BTItem btitem, - OffsetNumber afteritem) + OffsetNumber afteritem, + bool split_only_page) { InsertIndexResult res; Page page; @@ -458,11 +451,10 @@ _bt_insertonpg(Relation rel, */ if (PageGetFreeSpace(page) < itemsz) { - Buffer rbuf; - BlockNumber bknum = BufferGetBlockNumber(buf); - BlockNumber rbknum; bool is_root = P_ISROOT(lpageop); + bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop); bool newitemonleft; + Buffer rbuf; /* Choose the split point */ firstright = _bt_findsplitloc(rel, page, @@ -488,128 +480,127 @@ _bt_insertonpg(Relation rel, * locks for the child pages until we locate the parent, but we can * release them before doing the actual insertion (see Lehman and Yao * for the reasoning). - * - * Here we have to do something Lehman and Yao don't talk about: - * deal with a root split and construction of a new root. If our - * stack is empty then we have just split a node on what had been - * the root level when we descended the tree. If it is still the - * root then we perform a new-root construction. If it *wasn't* - * the root anymore, use the parent pointer to get up to the root - * level that someone constructed meanwhile, and find the right - * place to insert as for the normal case. *---------- */ + _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only); + } + else + { + Buffer metabuf = InvalidBuffer; + Page metapg = NULL; + BTMetaPageData *metad = NULL; - if (is_root) - { - Buffer rootbuf; - - Assert(stack == (BTStack) NULL); - /* create a new root node and release the split buffers */ - rootbuf = _bt_newroot(rel, buf, rbuf); - _bt_wrtbuf(rel, rootbuf); - _bt_wrtbuf(rel, rbuf); - _bt_wrtbuf(rel, buf); - } - else + itup_off = newitemoff; + itup_blkno = BufferGetBlockNumber(buf); + + /* + * If we are doing this insert because we split a page that was + * the only one on its tree level, but was not the root, it may + * have been the "fast root". We need to ensure that the fast root + * link points at or above the current page. We can safely acquire + * a lock on the metapage here --- see comments for _bt_newroot(). + */ + if (split_only_page) { - InsertIndexResult newres; - BTItem new_item; - BTStackData fakestack; - BTItem ritem; - Buffer pbuf; - - /* If root page was splitted */ - if (stack == (BTStack) NULL) - { - elog(LOG, "btree: concurrent ROOT page split"); - - /* - * If root page splitter failed to create new root page - * then old root' btpo_parent still points to metapage. We - * have to fix root page in this case. - */ - if (BTreeInvalidParent(lpageop)) - { - if (!FixBTree) - elog(ERROR, "bt_insertonpg[%s]: no root page found", RelationGetRelationName(rel)); - _bt_wrtbuf(rel, rbuf); - _bt_wrtnorelbuf(rel, buf); - elog(WARNING, "bt_insertonpg[%s]: root page unfound - fixing upper levels", RelationGetRelationName(rel)); - _bt_fixup(rel, buf); - goto formres; - } + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metapg = BufferGetPage(metabuf); + metad = BTPageGetMeta(metapg); - /* - * Set up a phony stack entry if we haven't got a real one - */ - stack = &fakestack; - stack->bts_blkno = lpageop->btpo_parent; - stack->bts_offset = InvalidOffsetNumber; - /* bts_btitem will be initialized below */ - stack->bts_parent = NULL; + if (metad->btm_fastlevel >= lpageop->btpo.level) + { + /* no update wanted */ + _bt_relbuf(rel, metabuf); + metabuf = InvalidBuffer; } + } - /* get high key from left page == lowest key on new right page */ - ritem = (BTItem) PageGetItem(page, - PageGetItemId(page, P_HIKEY)); + /* Do the actual update. No elog(ERROR) until changes are logged */ + START_CRIT_SECTION(); - /* form an index tuple that points at the new right page */ - new_item = _bt_formitem(&(ritem->bti_itup)); - rbknum = BufferGetBlockNumber(rbuf); - ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); + _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page"); - /* - * Find the parent buffer and get the parent page. - * - * Oops - if we were moved right then we need to change stack - * item! We want to find parent pointing to where we are, - * right ? - vadim 05/27/97 - * - * Interestingly, this means we didn't *really* need to stack the - * parent key at all; all we really care about is the saved - * block and offset as a starting point for our search... - */ - ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid), - bknum, P_HIKEY); + if (BufferIsValid(metabuf)) + { + metad->btm_fastroot = itup_blkno; + metad->btm_fastlevel = lpageop->btpo.level; + } - pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + /* XLOG stuff */ + if (!rel->rd_istemp) + { + xl_btree_insert xlrec; + xl_btree_metadata xlmeta; + uint8 xlinfo; + XLogRecPtr recptr; + XLogRecData rdata[3]; + XLogRecData *nextrdata; + BTItemData truncitem; + + xlrec.target.node = rel->rd_node; + ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfBtreeInsert; + rdata[0].next = nextrdata = &(rdata[1]); + + if (BufferIsValid(metabuf)) + { + xlmeta.root = metad->btm_root; + xlmeta.level = metad->btm_level; + xlmeta.fastroot = metad->btm_fastroot; + xlmeta.fastlevel = metad->btm_fastlevel; + + nextrdata->buffer = InvalidBuffer; + nextrdata->data = (char *) &xlmeta; + nextrdata->len = sizeof(xl_btree_metadata); + nextrdata->next = nextrdata + 1; + nextrdata++; + xlinfo = XLOG_BTREE_INSERT_META; + } + else if (P_ISLEAF(lpageop)) + xlinfo = XLOG_BTREE_INSERT_LEAF; + else + xlinfo = XLOG_BTREE_INSERT_UPPER; - /* Now we can write and unlock the children */ - _bt_wrtbuf(rel, rbuf); - _bt_wrtbuf(rel, buf); + /* Read comments in _bt_pgaddtup */ + if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) + { + truncitem = *btitem; + truncitem.bti_itup.t_info = sizeof(BTItemData); + nextrdata->data = (char *) &truncitem; + nextrdata->len = sizeof(BTItemData); + } + else + { + nextrdata->data = (char *) btitem; + nextrdata->len = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + } + nextrdata->buffer = buf; + nextrdata->next = NULL; + + recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); - if (pbuf == InvalidBuffer) + if (BufferIsValid(metabuf)) { - if (!FixBTree) - elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!" - "\n\tRecreate index %s.", RelationGetRelationName(rel)); - pfree(new_item); - elog(WARNING, "bt_insertonpg[%s]: parent page unfound - fixing branch", RelationGetRelationName(rel)); - _bt_fixbranch(rel, bknum, rbknum, stack); - goto formres; + PageSetLSN(metapg, recptr); + PageSetSUI(metapg, ThisStartUpID); } - /* Recursively update the parent */ - newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, - 0, NULL, new_item, stack->bts_offset); - /* be tidy */ - pfree(newres); - pfree(new_item); + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); } - } - else - { - itup_off = newitemoff; - itup_blkno = BufferGetBlockNumber(buf); - _bt_insertuple(rel, buf, itemsz, btitem, newitemoff); + END_CRIT_SECTION(); /* Write out the updated page and release pin/lock */ + if (BufferIsValid(metabuf)) + _bt_wrtbuf(rel, metabuf); + _bt_wrtbuf(rel, buf); } -formres:; /* by here, the new tuple is inserted at itup_blkno/itup_off */ res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); ItemPointerSet(&(res->pointerData), itup_blkno, itup_off); @@ -617,61 +608,6 @@ formres:; return res; } -static void -_bt_insertuple(Relation rel, Buffer buf, - Size itemsz, BTItem btitem, OffsetNumber newitemoff) -{ - Page page = BufferGetPage(buf); - BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); - - START_CRIT_SECTION(); - - _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page"); - - /* XLOG stuff */ - if (!rel->rd_istemp) - { - xl_btree_insert xlrec; - uint8 flag = XLOG_BTREE_INSERT; - XLogRecPtr recptr; - XLogRecData rdata[2]; - BTItemData truncitem; - - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff); - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfBtreeInsert; - rdata[0].next = &(rdata[1]); - - /* Read comments in _bt_pgaddtup */ - if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop)) - { - truncitem = *btitem; - truncitem.bti_itup.t_info = sizeof(BTItemData); - rdata[1].data = (char *) &truncitem; - rdata[1].len = sizeof(BTItemData); - } - else - { - rdata[1].data = (char *) btitem; - rdata[1].len = IndexTupleDSize(btitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - } - rdata[1].buffer = buf; - rdata[1].next = NULL; - if (P_ISLEAF(pageop)) - flag |= XLOG_BTREE_LEAF; - - recptr = XLogInsert(RM_BTREE_ID, flag, rdata); - - PageSetLSN(page, recptr); - PageSetSUI(page, ThisStartUpID); - } - - END_CRIT_SECTION(); -} - /* * _bt_split() -- split a page in the btree. * @@ -729,13 +665,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, lopaque->btpo_next = BufferGetBlockNumber(rbuf); ropaque->btpo_prev = BufferGetBlockNumber(buf); ropaque->btpo_next = oopaque->btpo_next; - - /* - * Must copy the original parent link into both new pages, even though - * it might be quite obsolete by now. We might need it if this level - * is or recently was the root (see README). - */ - lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent; + lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; /* * If the page we're splitting is not the rightmost page at its level @@ -876,34 +806,29 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, if (!rel->rd_istemp) { xl_btree_split xlrec; - int flag = (newitemonleft) ? - XLOG_BTREE_SPLEFT : XLOG_BTREE_SPLIT; - BlockNumber blkno; + uint8 xlinfo; XLogRecPtr recptr; XLogRecData rdata[4]; xlrec.target.node = rel->rd_node; ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off); if (newitemonleft) - { - blkno = BufferGetBlockNumber(rbuf); - BlockIdSet(&(xlrec.otherblk), blkno); - } + xlrec.otherblk = BufferGetBlockNumber(rbuf); else - { - blkno = BufferGetBlockNumber(buf); - BlockIdSet(&(xlrec.otherblk), blkno); - } - BlockIdSet(&(xlrec.parentblk), lopaque->btpo_parent); - BlockIdSet(&(xlrec.leftblk), lopaque->btpo_prev); - BlockIdSet(&(xlrec.rightblk), ropaque->btpo_next); + xlrec.otherblk = BufferGetBlockNumber(buf); + xlrec.leftblk = lopaque->btpo_prev; + xlrec.rightblk = ropaque->btpo_next; + xlrec.level = lopaque->btpo.level; /* * Direct access to page is not good but faster - we should - * implement some new func in page API. + * implement some new func in page API. Note we only store the + * tuples themselves, knowing that the item pointers are in the + * same order and can be reconstructed by scanning the tuples. */ xlrec.leftlen = ((PageHeader) leftpage)->pd_special - ((PageHeader) leftpage)->pd_upper; + rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfBtreeSplit; @@ -933,10 +858,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, rdata[3].next = NULL; } - if (P_ISLEAF(lopaque)) - flag |= XLOG_BTREE_LEAF; + if (P_ISROOT(oopaque)) + xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT; + else + xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R; - recptr = XLogInsert(RM_BTREE_ID, flag, rdata); + recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata); PageSetLSN(leftpage, recptr); PageSetSUI(leftpage, ThisStartUpID); @@ -1176,47 +1103,178 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, } /* + * _bt_insert_parent() -- Insert downlink into parent after a page split. + * + * On entry, buf and rbuf are the left and right split pages, which we + * still hold write locks on per the L&Y algorithm. We release the + * write locks once we have write lock on the parent page. (Any sooner, + * and it'd be possible for some other process to try to split or delete + * one of these pages, and get confused because it cannot find the downlink.) + * + * stack - stack showing how we got here. May be NULL in cases that don't + * have to be efficient (concurrent ROOT split, WAL recovery) + * is_root - we split the true root + * is_only - we split a page alone on its level (might have been fast root) + * + * This is exported so it can be called by nbtxlog.c. + */ +void +_bt_insert_parent(Relation rel, + Buffer buf, + Buffer rbuf, + BTStack stack, + bool is_root, + bool is_only) +{ + /* + * Here we have to do something Lehman and Yao don't talk about: + * deal with a root split and construction of a new root. If our + * stack is empty then we have just split a node on what had been + * the root level when we descended the tree. If it was still the + * root then we perform a new-root construction. If it *wasn't* + * the root anymore, search to find the next higher level that + * someone constructed meanwhile, and find the right place to insert + * as for the normal case. + * + * If we have to search for the parent level, we do so by + * re-descending from the root. This is not super-efficient, + * but it's rare enough not to matter. (This path is also taken + * when called from WAL recovery --- we have no stack in that case.) + */ + if (is_root) + { + Buffer rootbuf; + + Assert(stack == (BTStack) NULL); + Assert(is_only); + /* create a new root node and update the metapage */ + rootbuf = _bt_newroot(rel, buf, rbuf); + /* release the split buffers */ + _bt_wrtbuf(rel, rootbuf); + _bt_wrtbuf(rel, rbuf); + _bt_wrtbuf(rel, buf); + } + else + { + BlockNumber bknum = BufferGetBlockNumber(buf); + BlockNumber rbknum = BufferGetBlockNumber(rbuf); + Page page = BufferGetPage(buf); + InsertIndexResult newres; + BTItem new_item; + BTStackData fakestack; + BTItem ritem; + Buffer pbuf; + + if (stack == (BTStack) NULL) + { + BTPageOpaque lpageop; + + if (!InRecovery) + elog(DEBUG1, "_bt_insert_parent: concurrent ROOT page split"); + lpageop = (BTPageOpaque) PageGetSpecialPointer(page); + /* Find the leftmost page at the next level up */ + pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false); + /* Set up a phony stack entry pointing there */ + stack = &fakestack; + stack->bts_blkno = BufferGetBlockNumber(pbuf); + stack->bts_offset = InvalidOffsetNumber; + /* bts_btitem will be initialized below */ + stack->bts_parent = NULL; + _bt_relbuf(rel, pbuf); + } + + /* get high key from left page == lowest key on new right page */ + ritem = (BTItem) PageGetItem(page, + PageGetItemId(page, P_HIKEY)); + + /* form an index tuple that points at the new right page */ + new_item = _bt_formitem(&(ritem->bti_itup)); + ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY); + + /* + * Find the parent buffer and get the parent page. + * + * Oops - if we were moved right then we need to change stack + * item! We want to find parent pointing to where we are, + * right ? - vadim 05/27/97 + */ + ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid), + bknum, P_HIKEY); + + pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); + + /* Now we can write and unlock the children */ + _bt_wrtbuf(rel, rbuf); + _bt_wrtbuf(rel, buf); + + /* Check for error only after writing children */ + if (pbuf == InvalidBuffer) + elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!" + "\n\tRecreate index %s.", RelationGetRelationName(rel)); + + /* Recursively update the parent */ + newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, + 0, NULL, new_item, stack->bts_offset, + is_only); + + /* be tidy */ + pfree(newres); + pfree(new_item); + } +} + +/* * _bt_getstackbuf() -- Walk back up the tree one step, and find the item * we last looked at in the parent. * - * This is possible because we save a bit image of the last item - * we looked at in the parent, and the update algorithm guarantees - * that if items above us in the tree move, they only move right. + * This is possible because we save the downlink from the parent item, + * which is enough to uniquely identify it. Insertions into the parent + * level could cause the item to move right; deletions could cause it + * to move left, but not left of the page we previously found it in. * - * Also, re-set bts_blkno & bts_offset if changed. + * Adjusts bts_blkno & bts_offset if changed. + * + * Returns InvalidBuffer if item not found (should not happen). */ static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access) { BlockNumber blkno; - Buffer buf; - OffsetNumber start, - offnum, - maxoff; - Page page; - ItemId itemid; - BTItem item; - BTPageOpaque opaque; + OffsetNumber start; blkno = stack->bts_blkno; - buf = _bt_getbuf(rel, blkno, access); - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - maxoff = PageGetMaxOffsetNumber(page); - start = stack->bts_offset; - /* - * _bt_insertonpg set bts_offset to InvalidOffsetNumber in the case of - * concurrent ROOT page split. Also, watch out for possibility that - * page has a high key now when it didn't before. - */ - if (start < P_FIRSTDATAKEY(opaque)) - start = P_FIRSTDATAKEY(opaque); - for (;;) { - /* see if it's on this page */ + Buffer buf; + Page page; + BTPageOpaque opaque; + OffsetNumber offnum, + minoff, + maxoff; + ItemId itemid; + BTItem item; + + buf = _bt_getbuf(rel, blkno, access); + page = BufferGetPage(buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + + /* + * start = InvalidOffsetNumber means "search the whole page". + * We need this test anyway due to possibility that + * page has a high key now when it didn't before. + */ + if (start < minoff) + start = minoff; + + /* + * These loops will check every item on the page --- but in an order + * that's attuned to the probability of where it actually is. Scan + * to the right first, then to the left. + */ for (offnum = start; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) @@ -1232,23 +1290,32 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access) } } + for (offnum = OffsetNumberPrev(start); + offnum >= minoff; + offnum = OffsetNumberPrev(offnum)) + { + itemid = PageGetItemId(page, offnum); + item = (BTItem) PageGetItem(page, itemid); + if (BTItemSame(item, &stack->bts_btitem)) + { + /* Return accurate pointer to where link is now */ + stack->bts_blkno = blkno; + stack->bts_offset = offnum; + return buf; + } + } + /* - * by here, the item we're looking for moved right at least one - * page + * The item we're looking for moved right at least one page. */ if (P_RIGHTMOST(opaque)) { _bt_relbuf(rel, buf); return (InvalidBuffer); } - blkno = opaque->btpo_next; + start = InvalidOffsetNumber; _bt_relbuf(rel, buf); - buf = _bt_getbuf(rel, blkno, access); - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - maxoff = PageGetMaxOffsetNumber(page); - start = P_FIRSTDATAKEY(opaque); } } @@ -1289,6 +1356,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) Page metapg; BTMetaPageData *metad; + lbkno = BufferGetBlockNumber(lbuf); + rbkno = BufferGetBlockNumber(rbuf); + lpage = BufferGetPage(lbuf); + rpage = BufferGetPage(rbuf); + /* get a new root page */ rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rootpage = BufferGetPage(rootbuf); @@ -1303,22 +1375,15 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) /* set btree special data */ rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE; - rootopaque->btpo_flags |= BTP_ROOT; - rootopaque->btpo_parent = BTREE_METAPAGE; + rootopaque->btpo_flags = BTP_ROOT; + rootopaque->btpo.level = + ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1; - lbkno = BufferGetBlockNumber(lbuf); - rbkno = BufferGetBlockNumber(rbuf); - lpage = BufferGetPage(lbuf); - rpage = BufferGetPage(rbuf); - - /* - * Make sure pages in old root level have valid parent links --- we - * will need this in _bt_insertonpg() if a concurrent root split - * happens (see README). - */ - ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent = - ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent = - rootblknum; + /* update metapage data */ + metad->btm_root = rootblknum; + metad->btm_level = rootopaque->btpo.level; + metad->btm_fastroot = rootblknum; + metad->btm_fastlevel = rootopaque->btpo.level; /* * Create downlink item for left page (old root). Since this will be @@ -1356,9 +1421,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) elog(PANIC, "btree: failed to add rightkey to new root page"); pfree(new_item); - metad->btm_root = rootblknum; - (metad->btm_level)++; - /* XLOG stuff */ if (!rel->rd_istemp) { @@ -1367,8 +1429,9 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) XLogRecData rdata[2]; xlrec.node = rel->rd_node; + xlrec.rootblk = rootblknum; xlrec.level = metad->btm_level; - BlockIdSet(&(xlrec.rootblk), rootblknum); + rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfBtreeNewroot; @@ -1390,8 +1453,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) PageSetSUI(rootpage, ThisStartUpID); PageSetLSN(metapg, recptr); PageSetSUI(metapg, ThisStartUpID); - - /* we changed their btpo_parent */ PageSetLSN(lpage, recptr); PageSetSUI(lpage, ThisStartUpID); PageSetLSN(rpage, recptr); @@ -1407,620 +1468,6 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) } /* - * In the event old root page was splitted but no new one was created we - * build required parent levels keeping write lock on old root page. - * Note: it's assumed that old root page' btpo_parent points to meta page, - * ie not to parent page. On exit, new root page buffer is write locked. - * If "release" is TRUE then oldrootbuf will be released immediately - * after upper level is builded. - */ -Buffer -_bt_fixroot(Relation rel, Buffer oldrootbuf, bool release) -{ - Buffer rootbuf; - BlockNumber rootblk; - Page rootpage; - XLogRecPtr rootLSN; - Page oldrootpage = BufferGetPage(oldrootbuf); - BTPageOpaque oldrootopaque = (BTPageOpaque) - PageGetSpecialPointer(oldrootpage); - Buffer buf, - leftbuf, - rightbuf; - Page page, - leftpage, - rightpage; - BTPageOpaque opaque, - leftopaque, - rightopaque; - OffsetNumber newitemoff; - BTItem btitem, - ritem; - Size itemsz; - - if (!P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque)) - elog(ERROR, "bt_fixroot: not valid old root page"); - - /* Read right neighbor and create new root page */ - leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE); - leftpage = BufferGetPage(leftbuf); - leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); - rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf); - rootpage = BufferGetPage(rootbuf); - rootLSN = PageGetLSN(rootpage); - rootblk = BufferGetBlockNumber(rootbuf); - - /* parent page where to insert pointers */ - buf = rootbuf; - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - - /* - * Now read other pages (if any) on level and add them to new root. - * Here we break one of our locking rules - never hold lock on parent - * page when acquiring lock on its child, - but we free from deadlock: - * - * If concurrent process will split one of pages on this level then it - * will see either btpo_parent == metablock or btpo_parent == rootblk. - * In first case it will give up its locks and walk to the leftmost - * page (oldrootbuf) in _bt_fixup() - ie it will wait for us and let - * us continue. In second case it will try to lock rootbuf keeping its - * locks on buffers we already passed, also waiting for us. If we'll - * have to unlock rootbuf (split it) and that process will have to - * split page of new level we created (level of rootbuf) then it will - * wait while we create upper level. Etc. - */ - while (!P_RIGHTMOST(leftopaque)) - { - rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE); - rightpage = BufferGetPage(rightbuf); - rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); - - /* - * Update LSN & StartUpID of child page buffer to ensure that it - * will be written on disk after flushing log record for new root - * creation. Unfortunately, for the moment (?) we do not log this - * operation and so possibly break our rule to log entire page - * content on first after checkpoint modification. - */ - HOLD_INTERRUPTS(); - rightopaque->btpo_parent = rootblk; - if (XLByteLT(PageGetLSN(rightpage), rootLSN)) - PageSetLSN(rightpage, rootLSN); - PageSetSUI(rightpage, ThisStartUpID); - RESUME_INTERRUPTS(); - - ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY)); - btitem = _bt_formitem(&(ritem->bti_itup)); - ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY); - itemsz = IndexTupleDSize(btitem->bti_itup) - + (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - - newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page)); - - if (PageGetFreeSpace(page) < itemsz) - { - Buffer newbuf; - OffsetNumber firstright; - OffsetNumber itup_off; - BlockNumber itup_blkno; - bool newitemonleft; - - firstright = _bt_findsplitloc(rel, page, - newitemoff, itemsz, &newitemonleft); - newbuf = _bt_split(rel, buf, firstright, - newitemoff, itemsz, btitem, newitemonleft, - &itup_off, &itup_blkno); - /* Keep lock on new "root" buffer ! */ - if (buf != rootbuf) - _bt_relbuf(rel, buf); - buf = newbuf; - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - } - else - _bt_insertuple(rel, buf, itemsz, btitem, newitemoff); - - /* give up left buffer */ - _bt_wrtbuf(rel, leftbuf); - pfree(btitem); - leftbuf = rightbuf; - leftpage = rightpage; - leftopaque = rightopaque; - } - - /* give up rightmost page buffer */ - _bt_wrtbuf(rel, leftbuf); - - /* - * Here we hold locks on old root buffer, new root buffer we've - * created with _bt_newroot() - rootbuf, - and buf we've used for last - * insert ops - buf. If rootbuf != buf then we have to create at least - * one more level. And if "release" is TRUE then we give up - * oldrootbuf. - */ - if (release) - _bt_wrtbuf(rel, oldrootbuf); - - if (rootbuf != buf) - { - _bt_wrtbuf(rel, buf); - return (_bt_fixroot(rel, rootbuf, true)); - } - - return (rootbuf); -} - -/* - * Using blkno of leftmost page on a level inside tree this func - * checks/fixes tree from this level up to the root page. - */ -static void -_bt_fixtree(Relation rel, BlockNumber blkno) -{ - Buffer buf; - Page page; - BTPageOpaque opaque; - BlockNumber pblkno; - - for (;;) - { - buf = _bt_getbuf(rel, blkno, BT_READ); - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - if (!P_LEFTMOST(opaque) || P_ISLEAF(opaque)) - elog(ERROR, "bt_fixtree[%s]: invalid start page (need to recreate index)", RelationGetRelationName(rel)); - pblkno = opaque->btpo_parent; - - /* check/fix entire level */ - _bt_fixlevel(rel, buf, InvalidBlockNumber); - - /* - * No pins/locks are held here. Re-read start page if its - * btpo_parent pointed to meta page else go up one level. - * - * XXX have to catch InvalidBlockNumber at the moment -:( - */ - if (pblkno == BTREE_METAPAGE || pblkno == InvalidBlockNumber) - { - buf = _bt_getbuf(rel, blkno, BT_WRITE); - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - if (P_ISROOT(opaque)) - { - /* Tree is Ok now */ - _bt_relbuf(rel, buf); - return; - } - /* Call _bt_fixroot() if there is no upper level */ - if (BTreeInvalidParent(opaque)) - { - elog(WARNING, "bt_fixtree[%s]: fixing root page", RelationGetRelationName(rel)); - buf = _bt_fixroot(rel, buf, true); - _bt_relbuf(rel, buf); - return; - } - /* Have to go up one level */ - pblkno = opaque->btpo_parent; - _bt_relbuf(rel, buf); - } - blkno = pblkno; - } - -} - -/* - * Check/fix level starting from page in buffer buf up to block - * limit on *child* level (or till rightmost child page if limit - * is InvalidBlockNumber). Start buffer must be read locked. - * No pins/locks are held on exit. - */ -static void -_bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) -{ - BlockNumber blkno = BufferGetBlockNumber(buf); - Page page; - BTPageOpaque opaque; - BlockNumber cblkno[3]; - OffsetNumber coff[3]; - Buffer cbuf[3]; - Page cpage[3]; - BTPageOpaque copaque[3]; - BTItem btitem; - int cidx, - i; - bool goodbye = false; - char tbuf[BLCKSZ]; - - page = BufferGetPage(buf); - /* copy page to temp storage */ - memmove(tbuf, page, PageGetPageSize(page)); - _bt_relbuf(rel, buf); - - page = (Page) tbuf; - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - - /* Initialize first child data */ - coff[0] = P_FIRSTDATAKEY(opaque); - if (coff[0] > PageGetMaxOffsetNumber(page)) - elog(ERROR, "bt_fixlevel[%s]: invalid maxoff on start page (need to recreate index)", RelationGetRelationName(rel)); - btitem = (BTItem) PageGetItem(page, PageGetItemId(page, coff[0])); - cblkno[0] = ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)); - cbuf[0] = _bt_getbuf(rel, cblkno[0], BT_READ); - cpage[0] = BufferGetPage(cbuf[0]); - copaque[0] = (BTPageOpaque) PageGetSpecialPointer(cpage[0]); - if (P_LEFTMOST(opaque) && !P_LEFTMOST(copaque[0])) - elog(ERROR, "bt_fixtlevel[%s]: non-leftmost child page of leftmost parent (need to recreate index)", RelationGetRelationName(rel)); - /* caller should take care and avoid this */ - if (P_RIGHTMOST(copaque[0])) - elog(ERROR, "bt_fixtlevel[%s]: invalid start child (need to recreate index)", RelationGetRelationName(rel)); - - for (;;) - { - /* - * Read up to 2 more child pages and look for pointers to them in - * *saved* parent page - */ - coff[1] = coff[2] = InvalidOffsetNumber; - for (cidx = 0; cidx < 2;) - { - cidx++; - cblkno[cidx] = (copaque[cidx - 1])->btpo_next; - cbuf[cidx] = _bt_getbuf(rel, cblkno[cidx], BT_READ); - cpage[cidx] = BufferGetPage(cbuf[cidx]); - copaque[cidx] = (BTPageOpaque) PageGetSpecialPointer(cpage[cidx]); - coff[cidx] = _bt_getoff(page, cblkno[cidx]); - - /* sanity check */ - if (coff[cidx] != InvalidOffsetNumber) - { - for (i = cidx - 1; i >= 0; i--) - { - if (coff[i] == InvalidOffsetNumber) - continue; - if (coff[cidx] != coff[i] + 1) - elog(ERROR, "bt_fixlevel[%s]: invalid item order(1) (need to recreate index)", RelationGetRelationName(rel)); - break; - } - } - - if (P_RIGHTMOST(copaque[cidx])) - break; - } - - /* - * Read parent page and insert missed pointers. - */ - if (coff[1] == InvalidOffsetNumber || - (cidx == 2 && coff[2] == InvalidOffsetNumber)) - { - Buffer newbuf; - Page newpage; - BTPageOpaque newopaque; - BTItem ritem; - Size itemsz; - OffsetNumber newitemoff; - BlockNumber parblk[3]; - BTStackData stack; - - stack.bts_parent = NULL; - stack.bts_blkno = blkno; - stack.bts_offset = InvalidOffsetNumber; - ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), - cblkno[0], P_HIKEY); - - buf = _bt_getstackbuf(rel, &stack, BT_WRITE); - if (buf == InvalidBuffer) - elog(ERROR, "bt_fixlevel[%s]: pointer disappeared (need to recreate index)", RelationGetRelationName(rel)); - - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - coff[0] = stack.bts_offset; - blkno = BufferGetBlockNumber(buf); - parblk[0] = blkno; - - /* Check/insert missed pointers */ - for (i = 1; i <= cidx; i++) - { - coff[i] = _bt_getoff(page, cblkno[i]); - - /* sanity check */ - parblk[i] = BufferGetBlockNumber(buf); - if (coff[i] != InvalidOffsetNumber) - { - if (parblk[i] == parblk[i - 1] && - coff[i] != coff[i - 1] + 1) - elog(ERROR, "bt_fixlevel[%s]: invalid item order(2) (need to recreate index)", RelationGetRelationName(rel)); - continue; - } - /* Have to check next page ? */ - if ((!P_RIGHTMOST(opaque)) && - coff[i - 1] == PageGetMaxOffsetNumber(page)) /* yes */ - { - newbuf = _bt_getbuf(rel, opaque->btpo_next, BT_WRITE); - newpage = BufferGetPage(newbuf); - newopaque = (BTPageOpaque) PageGetSpecialPointer(newpage); - coff[i] = _bt_getoff(newpage, cblkno[i]); - if (coff[i] != InvalidOffsetNumber) /* found ! */ - { - if (coff[i] != P_FIRSTDATAKEY(newopaque)) - elog(ERROR, "bt_fixlevel[%s]: invalid item order(3) (need to recreate index)", RelationGetRelationName(rel)); - _bt_relbuf(rel, buf); - buf = newbuf; - page = newpage; - opaque = newopaque; - blkno = BufferGetBlockNumber(buf); - parblk[i] = blkno; - continue; - } - /* unfound - need to insert on current page */ - _bt_relbuf(rel, newbuf); - } - /* insert pointer */ - ritem = (BTItem) PageGetItem(cpage[i - 1], - PageGetItemId(cpage[i - 1], P_HIKEY)); - btitem = _bt_formitem(&(ritem->bti_itup)); - ItemPointerSet(&(btitem->bti_itup.t_tid), cblkno[i], P_HIKEY); - itemsz = IndexTupleDSize(btitem->bti_itup) - + (sizeof(BTItemData) - sizeof(IndexTupleData)); - itemsz = MAXALIGN(itemsz); - - newitemoff = coff[i - 1] + 1; - - if (PageGetFreeSpace(page) < itemsz) - { - OffsetNumber firstright; - OffsetNumber itup_off; - BlockNumber itup_blkno; - bool newitemonleft; - - firstright = _bt_findsplitloc(rel, page, - newitemoff, itemsz, &newitemonleft); - newbuf = _bt_split(rel, buf, firstright, - newitemoff, itemsz, btitem, newitemonleft, - &itup_off, &itup_blkno); - /* what buffer we need in ? */ - if (newitemonleft) - _bt_relbuf(rel, newbuf); - else - { - _bt_relbuf(rel, buf); - buf = newbuf; - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - } - blkno = BufferGetBlockNumber(buf); - coff[i] = itup_off; - } - else - { - _bt_insertuple(rel, buf, itemsz, btitem, newitemoff); - coff[i] = newitemoff; - } - - pfree(btitem); - parblk[i] = blkno; - } - - /* copy page with pointer to cblkno[cidx] to temp storage */ - memmove(tbuf, page, PageGetPageSize(page)); - _bt_relbuf(rel, buf); - page = (Page) tbuf; - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - } - - /* Continue if current check/fix level page is rightmost */ - if (P_RIGHTMOST(opaque)) - goodbye = false; - - /* Pointers to child pages are Ok - right end of child level ? */ - _bt_relbuf(rel, cbuf[0]); - _bt_relbuf(rel, cbuf[1]); - if (cidx == 1 || - (cidx == 2 && (P_RIGHTMOST(copaque[2]) || goodbye))) - { - if (cidx == 2) - _bt_relbuf(rel, cbuf[2]); - return; - } - if (cblkno[0] == limit || cblkno[1] == limit) - goodbye = true; - cblkno[0] = cblkno[2]; - cbuf[0] = cbuf[2]; - cpage[0] = cpage[2]; - copaque[0] = copaque[2]; - coff[0] = coff[2]; - } -} - -/* - * Check/fix part of tree - branch - up from parent of level with blocks - * lblkno and rblknum. We first ensure that parent level has pointers - * to both lblkno & rblknum and if those pointers are on different - * parent pages then do the same for parent level, etc. No locks must - * be held on target level and upper on entry. No locks will be held - * on exit. Stack created when traversing tree down should be provided and - * it must points to parent level. rblkno must be on the right from lblkno. - * (This function is special edition of more expensive _bt_fixtree(), - * but it doesn't guarantee full consistency of tree.) - */ -static void -_bt_fixbranch(Relation rel, BlockNumber lblkno, - BlockNumber rblkno, BTStack true_stack) -{ - BlockNumber blkno = true_stack->bts_blkno; - BTStackData stack; - BTPageOpaque opaque; - Buffer buf, - rbuf; - Page page; - OffsetNumber offnum; - - true_stack = true_stack->bts_parent; - for (;;) - { - buf = _bt_getbuf(rel, blkno, BT_READ); - - /* Check/fix parent level pointed by blkno */ - _bt_fixlevel(rel, buf, rblkno); - - /* - * Here parent level should have pointers for both lblkno and - * rblkno and we have to find them. - */ - stack.bts_parent = NULL; - stack.bts_blkno = blkno; - stack.bts_offset = InvalidOffsetNumber; - ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY); - buf = _bt_getstackbuf(rel, &stack, BT_READ); - if (buf == InvalidBuffer) - elog(ERROR, "bt_fixbranch[%s]: left pointer unfound (need to recreate index)", RelationGetRelationName(rel)); - page = BufferGetPage(buf); - offnum = _bt_getoff(page, rblkno); - - if (offnum != InvalidOffsetNumber) /* right pointer found */ - { - if (offnum <= stack.bts_offset) - elog(ERROR, "bt_fixbranch[%s]: invalid item order (need to recreate index)", RelationGetRelationName(rel)); - _bt_relbuf(rel, buf); - return; - } - - /* Pointers are on different parent pages - find right one */ - lblkno = BufferGetBlockNumber(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - if (P_RIGHTMOST(opaque)) - elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(1) (need to recreate index)", RelationGetRelationName(rel)); - - stack.bts_parent = NULL; - stack.bts_blkno = opaque->btpo_next; - stack.bts_offset = InvalidOffsetNumber; - ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY); - rbuf = _bt_getstackbuf(rel, &stack, BT_READ); - if (rbuf == InvalidBuffer) - elog(ERROR, "bt_fixbranch[%s]: right pointer unfound(2) (need to recreate index)", RelationGetRelationName(rel)); - rblkno = BufferGetBlockNumber(rbuf); - _bt_relbuf(rel, rbuf); - - /* - * If we have parent item in true_stack then go up one level and - * ensure that it has pointers to new lblkno & rblkno. - */ - if (true_stack) - { - _bt_relbuf(rel, buf); - blkno = true_stack->bts_blkno; - true_stack = true_stack->bts_parent; - continue; - } - - /* - * Well, we are on the level that was root or unexistent when we - * started traversing tree down. If btpo_parent is updated then - * we'll use it to continue, else we'll fix/restore upper levels - * entirely. - */ - if (!BTreeInvalidParent(opaque)) - { - blkno = opaque->btpo_parent; - _bt_relbuf(rel, buf); - continue; - } - - /* Have to switch to excl buf lock and re-check btpo_parent */ - _bt_relbuf(rel, buf); - buf = _bt_getbuf(rel, blkno, BT_WRITE); - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - if (!BTreeInvalidParent(opaque)) - { - blkno = opaque->btpo_parent; - _bt_relbuf(rel, buf); - continue; - } - - /* - * We hold excl lock on some internal page with unupdated - * btpo_parent - time for _bt_fixup. - */ - break; - } - - elog(WARNING, "bt_fixbranch[%s]: fixing upper levels", RelationGetRelationName(rel)); - _bt_fixup(rel, buf); - - return; -} - -/* - * Having buf excl locked this routine walks to the left on level and - * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper - * levels. No buffer pins/locks will be held on exit. - */ -static void -_bt_fixup(Relation rel, Buffer buf) -{ - Page page; - BTPageOpaque opaque; - BlockNumber blkno; - - for (;;) - { - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - - /* - * If someone else already created parent pages then it's time for - * _bt_fixtree() to check upper levels and fix them, if required. - */ - if (!BTreeInvalidParent(opaque)) - { - blkno = opaque->btpo_parent; - _bt_relbuf(rel, buf); - elog(WARNING, "bt_fixup[%s]: checking/fixing upper levels", RelationGetRelationName(rel)); - _bt_fixtree(rel, blkno); - return; - } - if (P_LEFTMOST(opaque)) - break; - blkno = opaque->btpo_prev; - _bt_relbuf(rel, buf); - buf = _bt_getbuf(rel, blkno, BT_WRITE); - } - - /* - * Ok, we are on the leftmost page, it's write locked by us and its - * btpo_parent points to meta page - time for _bt_fixroot(). - */ - elog(WARNING, "bt_fixup[%s]: fixing root page", RelationGetRelationName(rel)); - buf = _bt_fixroot(rel, buf, true); - _bt_relbuf(rel, buf); -} - -static OffsetNumber -_bt_getoff(Page page, BlockNumber blkno) -{ - BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); - OffsetNumber maxoff = PageGetMaxOffsetNumber(page); - OffsetNumber offnum = P_FIRSTDATAKEY(opaque); - BlockNumber curblkno; - ItemId itemid; - BTItem item; - - for (; offnum <= maxoff; offnum++) - { - itemid = PageGetItemId(page, offnum); - item = (BTItem) PageGetItem(page, itemid); - curblkno = ItemPointerGetBlockNumber(&(item->bti_itup.t_tid)); - if (curblkno == blkno) - return (offnum); - } - - return (InvalidOffsetNumber); -} - -/* * _bt_pgaddtup() -- add a tuple to a particular page in the index. * * This routine adds the tuple to the page as requested. It does |