summaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtree.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtree.c')
-rw-r--r--src/backend/access/nbtree/nbtree.c1189
1 files changed, 0 insertions, 1189 deletions
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
deleted file mode 100644
index 46787e577a1..00000000000
--- a/src/backend/access/nbtree/nbtree.c
+++ /dev/null
@@ -1,1189 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * nbtree.c
- * Implementation of Lehman and Yao's btree management algorithm for
- * Postgres.
- *
- * NOTES
- * This file contains only the public interface routines.
- *
- *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- * IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.91 2002/06/20 20:29:25 momjian Exp $
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/genam.h"
-#include "access/heapam.h"
-#include "access/nbtree.h"
-#include "catalog/index.h"
-#include "executor/executor.h"
-#include "miscadmin.h"
-#include "storage/sinval.h"
-#include "access/xlogutils.h"
-
-
-/* Working state for btbuild and its callback */
-typedef struct
-{
- bool usefast;
- bool isUnique;
- bool haveDead;
- Relation heapRel;
- BTSpool *spool;
-
- /*
- * spool2 is needed only when the index is an unique index. Dead
- * tuples are put into spool2 instead of spool in order to avoid
- * uniqueness check.
- */
- BTSpool *spool2;
- double indtuples;
-} BTBuildState;
-
-
-bool BuildingBtree = false; /* see comment in btbuild() */
-bool FastBuild = true; /* use SORT instead of insertion build */
-
-/*
- * TEMPORARY FLAG FOR TESTING NEW FIX TREE
- * CODE WITHOUT AFFECTING ANYONE ELSE
- */
-bool FixBTree = true;
-
-static void _bt_restscan(IndexScanDesc scan);
-static void btbuildCallback(Relation index,
- HeapTuple htup,
- Datum *attdata,
- char *nulls,
- bool tupleIsAlive,
- void *state);
-
-
-/*
- * AtEOXact_nbtree() --- clean up nbtree subsystem at xact abort or commit.
- */
-void
-AtEOXact_nbtree(void)
-{
- /*
- * Note: these actions should only be necessary during xact abort; but
- * they can't hurt during a commit.
- */
-
- /* If we were building a btree, we ain't anymore. */
- BuildingBtree = false;
-}
-
-
-/*
- * btbuild() -- build a new btree index.
- *
- * We use a global variable to record the fact that we're creating
- * a new index. This is used to avoid high-concurrency locking,
- * since the index won't be visible until this transaction commits
- * and since building is guaranteed to be single-threaded.
- */
-Datum
-btbuild(PG_FUNCTION_ARGS)
-{
- Relation heap = (Relation) PG_GETARG_POINTER(0);
- Relation index = (Relation) PG_GETARG_POINTER(1);
- IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
- double reltuples;
- BTBuildState buildstate;
-
- /* set flag to disable locking */
- BuildingBtree = true;
-
- /*
- * bootstrap processing does something strange, so don't use
- * sort/build for initial catalog indices. at some point i need to
- * look harder at this. (there is some kind of incremental processing
- * going on there.) -- pma 08/29/95
- */
- buildstate.usefast = (FastBuild && IsNormalProcessingMode());
- buildstate.isUnique = indexInfo->ii_Unique;
- buildstate.haveDead = false;
- buildstate.heapRel = heap;
- buildstate.spool = NULL;
- buildstate.spool2 = NULL;
- buildstate.indtuples = 0;
-
-#ifdef BTREE_BUILD_STATS
- if (Show_btree_build_stats)
- ResetUsage();
-#endif /* BTREE_BUILD_STATS */
-
- /*
- * We expect to be called exactly once for any index relation. If
- * that's not the case, big trouble's what we have.
- */
- if (RelationGetNumberOfBlocks(index) != 0)
- elog(ERROR, "%s already contains data",
- RelationGetRelationName(index));
-
- /* initialize the btree index metadata page */
- _bt_metapinit(index);
-
- if (buildstate.usefast)
- {
- buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique);
-
- /*
- * Different from spool, the uniqueness isn't checked for spool2.
- */
- if (indexInfo->ii_Unique)
- buildstate.spool2 = _bt_spoolinit(index, false);
- }
-
- /* do the heap scan */
- reltuples = IndexBuildHeapScan(heap, index, indexInfo,
- btbuildCallback, (void *) &buildstate);
-
- /* okay, all heap tuples are indexed */
- if (buildstate.spool2 && !buildstate.haveDead)
- {
- /* spool2 turns out to be unnecessary */
- _bt_spooldestroy(buildstate.spool2);
- buildstate.spool2 = NULL;
- }
-
- /*
- * if we are doing bottom-up btree build, finish the build by (1)
- * completing the sort of the spool file, (2) inserting the sorted
- * tuples into btree pages and (3) building the upper levels.
- */
- if (buildstate.usefast)
- {
- _bt_leafbuild(buildstate.spool, buildstate.spool2);
- _bt_spooldestroy(buildstate.spool);
- if (buildstate.spool2)
- _bt_spooldestroy(buildstate.spool2);
- }
-
-#ifdef BTREE_BUILD_STATS
- if (Show_btree_build_stats)
- {
- ShowUsage("BTREE BUILD STATS");
- ResetUsage();
- }
-#endif /* BTREE_BUILD_STATS */
-
- /* all done */
- BuildingBtree = false;
-
- /*
- * Since we just counted the tuples in the heap, we update its stats
- * in pg_class to guarantee that the planner takes advantage of the
- * index we just created. But, only update statistics during normal
- * index definitions, not for indices on system catalogs created
- * during bootstrap processing. We must close the relations before
- * updating statistics to guarantee that the relcache entries are
- * flushed when we increment the command counter in UpdateStats(). But
- * we do not release any locks on the relations; those will be held
- * until end of transaction.
- */
- if (IsNormalProcessingMode())
- {
- Oid hrelid = RelationGetRelid(heap);
- Oid irelid = RelationGetRelid(index);
-
- heap_close(heap, NoLock);
- index_close(index);
- UpdateStats(hrelid, reltuples);
- UpdateStats(irelid, buildstate.indtuples);
- }
-
- PG_RETURN_VOID();
-}
-
-/*
- * Per-tuple callback from IndexBuildHeapScan
- */
-static void
-btbuildCallback(Relation index,
- HeapTuple htup,
- Datum *attdata,
- char *nulls,
- bool tupleIsAlive,
- void *state)
-{
- BTBuildState *buildstate = (BTBuildState *) state;
- IndexTuple itup;
- BTItem btitem;
- InsertIndexResult res;
-
- /* form an index tuple and point it at the heap tuple */
- itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
- itup->t_tid = htup->t_self;
-
- btitem = _bt_formitem(itup);
-
- /*
- * if we are doing bottom-up btree build, we insert the index into a
- * spool file for subsequent processing. otherwise, we insert into
- * the btree.
- */
- if (buildstate->usefast)
- {
- if (tupleIsAlive || buildstate->spool2 == NULL)
- _bt_spool(btitem, buildstate->spool);
- else
- {
- /* dead tuples are put into spool2 */
- buildstate->haveDead = true;
- _bt_spool(btitem, buildstate->spool2);
- }
- }
- else
- {
- res = _bt_doinsert(index, btitem,
- buildstate->isUnique, buildstate->heapRel);
- if (res)
- pfree(res);
- }
-
- buildstate->indtuples += 1;
-
- pfree(btitem);
- pfree(itup);
-}
-
-/*
- * btinsert() -- insert an index tuple into a btree.
- *
- * Descend the tree recursively, find the appropriate location for our
- * new tuple, put it there, set its unique OID as appropriate, and
- * return an InsertIndexResult to the caller.
- */
-Datum
-btinsert(PG_FUNCTION_ARGS)
-{
- Relation rel = (Relation) PG_GETARG_POINTER(0);
- Datum *datum = (Datum *) PG_GETARG_POINTER(1);
- char *nulls = (char *) PG_GETARG_POINTER(2);
- ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
- Relation heapRel = (Relation) PG_GETARG_POINTER(4);
- bool checkUnique = PG_GETARG_BOOL(5);
- InsertIndexResult res;
- BTItem btitem;
- IndexTuple itup;
-
- /* generate an index tuple */
- itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
- itup->t_tid = *ht_ctid;
- btitem = _bt_formitem(itup);
-
- res = _bt_doinsert(rel, btitem, checkUnique, heapRel);
-
- pfree(btitem);
- pfree(itup);
-
- PG_RETURN_POINTER(res);
-}
-
-/*
- * btgettuple() -- Get the next tuple in the scan.
- */
-Datum
-btgettuple(PG_FUNCTION_ARGS)
-{
- IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
- ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
- BTScanOpaque so = (BTScanOpaque) scan->opaque;
- Page page;
- OffsetNumber offnum;
- bool res;
-
- /*
- * If we've already initialized this scan, we can just advance it in
- * the appropriate direction. If we haven't done so yet, we call a
- * routine to get the first item in the scan.
- */
- if (ItemPointerIsValid(&(scan->currentItemData)))
- {
- /*
- * Restore scan position using heap TID returned by previous call
- * to btgettuple(). _bt_restscan() re-grabs the read lock on the
- * buffer, too.
- */
- _bt_restscan(scan);
- /*
- * Check to see if we should kill the previously-fetched tuple.
- */
- if (scan->kill_prior_tuple)
- {
- /*
- * Yes, so mark it by setting the LP_DELETE bit in the item flags.
- */
- offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- page = BufferGetPage(so->btso_curbuf);
- PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
- /*
- * Since this can be redone later if needed, it's treated the
- * same as a commit-hint-bit status update for heap tuples:
- * we mark the buffer dirty but don't make a WAL log entry.
- */
- SetBufferCommitInfoNeedsSave(so->btso_curbuf);
- }
- /*
- * Now continue the scan.
- */
- res = _bt_next(scan, dir);
- }
- else
- res = _bt_first(scan, dir);
-
- /*
- * Skip killed tuples if asked to.
- */
- if (scan->ignore_killed_tuples)
- {
- while (res)
- {
- offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- page = BufferGetPage(so->btso_curbuf);
- if (!ItemIdDeleted(PageGetItemId(page, offnum)))
- break;
- res = _bt_next(scan, dir);
- }
- }
-
- /*
- * Save heap TID to use it in _bt_restscan. Then release the read
- * lock on the buffer so that we aren't blocking other backends.
- *
- * NOTE: we do keep the pin on the buffer! This is essential to ensure
- * that someone else doesn't delete the index entry we are stopped on.
- */
- if (res)
- {
- ((BTScanOpaque) scan->opaque)->curHeapIptr = scan->xs_ctup.t_self;
- LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
- BUFFER_LOCK_UNLOCK);
- }
-
- PG_RETURN_BOOL(res);
-}
-
-/*
- * btbeginscan() -- start a scan on a btree index
- */
-Datum
-btbeginscan(PG_FUNCTION_ARGS)
-{
- Relation rel = (Relation) PG_GETARG_POINTER(0);
- int keysz = PG_GETARG_INT32(1);
- ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2);
- IndexScanDesc scan;
-
- /* get the scan */
- scan = RelationGetIndexScan(rel, keysz, scankey);
-
- PG_RETURN_POINTER(scan);
-}
-
-/*
- * btrescan() -- rescan an index relation
- */
-Datum
-btrescan(PG_FUNCTION_ARGS)
-{
- IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
- ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
- ItemPointer iptr;
- BTScanOpaque so;
-
- so = (BTScanOpaque) scan->opaque;
-
- if (so == NULL) /* if called from btbeginscan */
- {
- so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
- so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
- ItemPointerSetInvalid(&(so->curHeapIptr));
- ItemPointerSetInvalid(&(so->mrkHeapIptr));
- if (scan->numberOfKeys > 0)
- so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
- else
- so->keyData = (ScanKey) NULL;
- scan->opaque = so;
- }
-
- /* we aren't holding any read locks, but gotta drop the pins */
- if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
- {
- ReleaseBuffer(so->btso_curbuf);
- so->btso_curbuf = InvalidBuffer;
- ItemPointerSetInvalid(&(so->curHeapIptr));
- ItemPointerSetInvalid(iptr);
- }
-
- if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
- {
- ReleaseBuffer(so->btso_mrkbuf);
- so->btso_mrkbuf = InvalidBuffer;
- ItemPointerSetInvalid(&(so->mrkHeapIptr));
- ItemPointerSetInvalid(iptr);
- }
-
- /*
- * Reset the scan keys. Note that keys ordering stuff moved to
- * _bt_first. - vadim 05/05/97
- */
- so->numberOfKeys = scan->numberOfKeys;
- if (scan->numberOfKeys > 0)
- {
- memmove(scan->keyData,
- scankey,
- scan->numberOfKeys * sizeof(ScanKeyData));
- memmove(so->keyData,
- scankey,
- so->numberOfKeys * sizeof(ScanKeyData));
- }
-
- PG_RETURN_VOID();
-}
-
-void
-btmovescan(IndexScanDesc scan, Datum v)
-{
- ItemPointer iptr;
- BTScanOpaque so;
-
- so = (BTScanOpaque) scan->opaque;
-
- /* we aren't holding any read locks, but gotta drop the pin */
- if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
- {
- ReleaseBuffer(so->btso_curbuf);
- so->btso_curbuf = InvalidBuffer;
- ItemPointerSetInvalid(iptr);
- }
-
- so->keyData[0].sk_argument = v;
-}
-
-/*
- * btendscan() -- close down a scan
- */
-Datum
-btendscan(PG_FUNCTION_ARGS)
-{
- IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
- ItemPointer iptr;
- BTScanOpaque so;
-
- so = (BTScanOpaque) scan->opaque;
-
- /* we aren't holding any read locks, but gotta drop the pins */
- if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
- {
- if (BufferIsValid(so->btso_curbuf))
- ReleaseBuffer(so->btso_curbuf);
- so->btso_curbuf = InvalidBuffer;
- ItemPointerSetInvalid(iptr);
- }
-
- if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
- {
- if (BufferIsValid(so->btso_mrkbuf))
- ReleaseBuffer(so->btso_mrkbuf);
- so->btso_mrkbuf = InvalidBuffer;
- ItemPointerSetInvalid(iptr);
- }
-
- if (so->keyData != (ScanKey) NULL)
- pfree(so->keyData);
- pfree(so);
-
- PG_RETURN_VOID();
-}
-
-/*
- * btmarkpos() -- save current scan position
- */
-Datum
-btmarkpos(PG_FUNCTION_ARGS)
-{
- IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
- ItemPointer iptr;
- BTScanOpaque so;
-
- so = (BTScanOpaque) scan->opaque;
-
- /* we aren't holding any read locks, but gotta drop the pin */
- if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
- {
- ReleaseBuffer(so->btso_mrkbuf);
- so->btso_mrkbuf = InvalidBuffer;
- ItemPointerSetInvalid(iptr);
- }
-
- /* bump pin on current buffer for assignment to mark buffer */
- if (ItemPointerIsValid(&(scan->currentItemData)))
- {
- so->btso_mrkbuf = ReadBuffer(scan->indexRelation,
- BufferGetBlockNumber(so->btso_curbuf));
- scan->currentMarkData = scan->currentItemData;
- so->mrkHeapIptr = so->curHeapIptr;
- }
-
- PG_RETURN_VOID();
-}
-
-/*
- * btrestrpos() -- restore scan to last saved position
- */
-Datum
-btrestrpos(PG_FUNCTION_ARGS)
-{
- IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
- ItemPointer iptr;
- BTScanOpaque so;
-
- so = (BTScanOpaque) scan->opaque;
-
- /* we aren't holding any read locks, but gotta drop the pin */
- if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
- {
- ReleaseBuffer(so->btso_curbuf);
- so->btso_curbuf = InvalidBuffer;
- ItemPointerSetInvalid(iptr);
- }
-
- /* bump pin on marked buffer */
- if (ItemPointerIsValid(&(scan->currentMarkData)))
- {
- so->btso_curbuf = ReadBuffer(scan->indexRelation,
- BufferGetBlockNumber(so->btso_mrkbuf));
- scan->currentItemData = scan->currentMarkData;
- so->curHeapIptr = so->mrkHeapIptr;
- }
-
- PG_RETURN_VOID();
-}
-
-/*
- * Bulk deletion of all index entries pointing to a set of heap tuples.
- * The set of target tuples is specified via a callback routine that tells
- * whether any given heap tuple (identified by ItemPointer) is being deleted.
- *
- * Result: a palloc'd struct containing statistical info for VACUUM displays.
- */
-Datum
-btbulkdelete(PG_FUNCTION_ARGS)
-{
- Relation rel = (Relation) PG_GETARG_POINTER(0);
- IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
- void *callback_state = (void *) PG_GETARG_POINTER(2);
- IndexBulkDeleteResult *result;
- BlockNumber num_pages;
- double tuples_removed;
- double num_index_tuples;
- IndexScanDesc scan;
- BTScanOpaque so;
- ItemPointer current;
-
- tuples_removed = 0;
- num_index_tuples = 0;
-
- /*
- * We use a standard IndexScanDesc scan object, but to speed up the
- * loop, we skip most of the wrapper layers of index_getnext and
- * instead call _bt_step directly. This implies holding buffer lock
- * on a target page throughout the loop over the page's tuples.
- * Initially, we have a read lock acquired by _bt_step when we stepped
- * onto the page. If we find a tuple we need to delete, we trade in
- * the read lock for an exclusive write lock; after that, we hold the
- * write lock until we step off the page (fortunately, _bt_relbuf
- * doesn't care which kind of lock it's releasing). This should
- * minimize the amount of work needed per page.
- */
- scan = index_beginscan(NULL, rel, SnapshotAny, 0, (ScanKey) NULL);
- so = (BTScanOpaque) scan->opaque;
- current = &(scan->currentItemData);
-
- /* Use _bt_first to get started, then _bt_step to remaining tuples */
- if (_bt_first(scan, ForwardScanDirection))
- {
- Buffer buf;
- BlockNumber lockedBlock = InvalidBlockNumber;
-
- /* we have the buffer pinned and locked */
- buf = so->btso_curbuf;
- Assert(BufferIsValid(buf));
-
- do
- {
- Page page;
- BlockNumber blkno;
- OffsetNumber offnum;
- BTItem btitem;
- BTPageOpaque opaque;
- IndexTuple itup;
- ItemPointer htup;
-
- CHECK_FOR_INTERRUPTS();
-
- /* current is the next index tuple */
- blkno = ItemPointerGetBlockNumber(current);
- offnum = ItemPointerGetOffsetNumber(current);
- page = BufferGetPage(buf);
- btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
- itup = &btitem->bti_itup;
- htup = &(itup->t_tid);
-
- if (callback(htup, callback_state))
- {
- /*
- * If this is first deletion on this page, trade in read
- * lock for a really-exclusive write lock. Then, step
- * back one and re-examine the item, because other backends
- * might have inserted item(s) while we weren't holding
- * the lock!
- *
- * We assume that only concurrent insertions, not deletions,
- * can occur while we're not holding the page lock (the caller
- * should hold a suitable relation lock to ensure this).
- * Therefore, the item we want to delete is either in the
- * same slot as before, or some slot to its right.
- * Rechecking the same slot is necessary and sufficient to
- * get back in sync after any insertions.
- */
- if (blkno != lockedBlock)
- {
- LockBuffer(buf, BUFFER_LOCK_UNLOCK);
- LockBufferForCleanup(buf);
- lockedBlock = blkno;
- }
- else
- {
- /* Okay to delete the item from the page */
- _bt_itemdel(rel, buf, current);
-
- /* Mark buffer dirty, but keep the lock and pin */
- WriteNoReleaseBuffer(buf);
-
- tuples_removed += 1;
- }
-
- /*
- * In either case, we now need to back up the scan one item,
- * so that the next cycle will re-examine the same offnum on
- * this page.
- *
- * For now, just hack the current-item index. Will need to
- * be smarter when deletion includes removal of empty
- * index pages.
- *
- * We must decrement ip_posid in all cases but one: if the
- * page was formerly rightmost but was split while we didn't
- * hold the lock, and ip_posid is pointing to item 1, then
- * ip_posid now points at the high key not a valid data item.
- * In this case we do want to step forward.
- */
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- if (current->ip_posid >= P_FIRSTDATAKEY(opaque))
- current->ip_posid--;
- }
- else
- num_index_tuples += 1;
- } while (_bt_step(scan, &buf, ForwardScanDirection));
- }
-
- index_endscan(scan);
-
- /* return statistics */
- num_pages = RelationGetNumberOfBlocks(rel);
-
- result = (IndexBulkDeleteResult *) palloc(sizeof(IndexBulkDeleteResult));
- result->num_pages = num_pages;
- result->tuples_removed = tuples_removed;
- result->num_index_tuples = num_index_tuples;
-
- PG_RETURN_POINTER(result);
-}
-
-/*
- * Restore scan position when btgettuple is called to continue a scan.
- */
-static void
-_bt_restscan(IndexScanDesc scan)
-{
- Relation rel = scan->indexRelation;
- BTScanOpaque so = (BTScanOpaque) scan->opaque;
- Buffer buf = so->btso_curbuf;
- Page page;
- ItemPointer current = &(scan->currentItemData);
- OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
- maxoff;
- BTPageOpaque opaque;
- ItemPointerData target = so->curHeapIptr;
- BTItem item;
- BlockNumber blkno;
-
- /*
- * Get back the read lock we were holding on the buffer. (We still
- * have a reference-count pin on it, so need not get that.)
- */
- LockBuffer(buf, BT_READ);
-
- page = BufferGetPage(buf);
- maxoff = PageGetMaxOffsetNumber(page);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-
- /*
- * We use this as flag when first index tuple on page is deleted but
- * we do not move left (this would slowdown vacuum) - so we set
- * current->ip_posid before first index tuple on the current page
- * (_bt_step will move it right)...
- */
- if (!ItemPointerIsValid(&target))
- {
- ItemPointerSetOffsetNumber(current,
- OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
- return;
- }
-
- /*
- * The item we were on may have moved right due to insertions. Find it
- * again.
- */
- for (;;)
- {
- /* Check for item on this page */
- for (;
- offnum <= maxoff;
- offnum = OffsetNumberNext(offnum))
- {
- item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
- if (item->bti_itup.t_tid.ip_blkid.bi_hi ==
- target.ip_blkid.bi_hi &&
- item->bti_itup.t_tid.ip_blkid.bi_lo ==
- target.ip_blkid.bi_lo &&
- item->bti_itup.t_tid.ip_posid == target.ip_posid)
- {
- current->ip_posid = offnum;
- return;
- }
- }
-
- /*
- * By here, the item we're looking for moved right at least one
- * page
- */
- if (P_RIGHTMOST(opaque))
- elog(FATAL, "_bt_restscan: my bits moved right off the end of the world!"
- "\n\tRecreate index %s.", RelationGetRelationName(rel));
-
- blkno = opaque->btpo_next;
- _bt_relbuf(rel, buf);
- buf = _bt_getbuf(rel, blkno, BT_READ);
- page = BufferGetPage(buf);
- maxoff = PageGetMaxOffsetNumber(page);
- opaque = (BTPageOpaque) PageGetSpecialPointer(page);
- offnum = P_FIRSTDATAKEY(opaque);
- ItemPointerSet(current, blkno, offnum);
- so->btso_curbuf = buf;
- }
-}
-
-static void
-_bt_restore_page(Page page, char *from, int len)
-{
- BTItemData btdata;
- Size itemsz;
- char *end = from + len;
-
- for (; from < end;)
- {
- memcpy(&btdata, from, sizeof(BTItemData));
- itemsz = IndexTupleDSize(btdata.bti_itup) +
- (sizeof(BTItemData) - sizeof(IndexTupleData));
- itemsz = MAXALIGN(itemsz);
- if (PageAddItem(page, (Item) from, itemsz,
- FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "_bt_restore_page: can't add item to page");
- from += itemsz;
- }
-}
-
-static void
-btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_btree_delete *xlrec;
- Relation reln;
- Buffer buffer;
- Page page;
-
- if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
- return;
-
- xlrec = (xl_btree_delete *) XLogRecGetData(record);
- reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
- if (!RelationIsValid(reln))
- return;
- buffer = XLogReadBuffer(false, reln,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_delete_redo: block unfound");
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "btree_delete_redo: uninitialized page");
-
- if (XLByteLE(lsn, PageGetLSN(page)))
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
-
- PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
-
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
-
- return;
-}
-
-static void
-btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_btree_insert *xlrec;
- Relation reln;
- Buffer buffer;
- Page page;
- BTPageOpaque pageop;
-
- if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
- return;
-
- xlrec = (xl_btree_insert *) XLogRecGetData(record);
- reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
- if (!RelationIsValid(reln))
- return;
- buffer = XLogReadBuffer(false, reln,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un");
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
-
- if (redo)
- {
- if (XLByteLE(lsn, PageGetLSN(page)))
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
- if (PageAddItem(page, (Item) ((char *) xlrec + SizeOfBtreeInsert),
- record->xl_len - SizeOfBtreeInsert,
- ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
- LP_USED) == InvalidOffsetNumber)
- elog(PANIC, "btree_insert_redo: failed to add item");
-
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
- }
- else
- {
- if (XLByteLT(PageGetLSN(page), lsn))
- elog(PANIC, "btree_insert_undo: bad page LSN");
-
- if (!P_ISLEAF(pageop))
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
-
- elog(PANIC, "btree_insert_undo: unimplemented");
- }
-
- return;
-}
-
-static void
-btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
- Relation reln;
- BlockNumber blkno;
- Buffer buffer;
- Page page;
- BTPageOpaque pageop;
- char *op = (redo) ? "redo" : "undo";
- bool isleaf = (record->xl_info & XLOG_BTREE_LEAF);
-
- reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
- if (!RelationIsValid(reln))
- return;
-
- /* Left (original) sibling */
- blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
- BlockIdGetBlockNumber(&(xlrec->otherblk));
- buffer = XLogReadBuffer(false, reln, blkno);
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_split_%s: lost left sibling", op);
-
- page = (Page) BufferGetPage(buffer);
- if (redo)
- _bt_pageinit(page, BufferGetPageSize(buffer));
- else if (PageIsNew((PageHeader) page))
- elog(PANIC, "btree_split_undo: uninitialized left sibling");
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
-
- if (redo)
- {
- pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
- pageop->btpo_prev = BlockIdGetBlockNumber(&(xlrec->leftblk));
- if (onleft)
- pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
- else
- pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
- pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
-
- _bt_restore_page(page, (char *) xlrec + SizeOfBtreeSplit, xlrec->leftlen);
-
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
- }
- else
-/* undo */
- {
- if (XLByteLT(PageGetLSN(page), lsn))
- elog(PANIC, "btree_split_undo: bad left sibling LSN");
- elog(PANIC, "btree_split_undo: unimplemented");
- }
-
- /* Right (new) sibling */
- blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
- ItemPointerGetBlockNumber(&(xlrec->target.tid));
- buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_split_%s: lost right sibling", op);
-
- page = (Page) BufferGetPage(buffer);
- if (redo)
- _bt_pageinit(page, BufferGetPageSize(buffer));
- else if (PageIsNew((PageHeader) page))
- elog(PANIC, "btree_split_undo: uninitialized right sibling");
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
-
- if (redo)
- {
- pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
- pageop->btpo_prev = (onleft) ?
- ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
- BlockIdGetBlockNumber(&(xlrec->otherblk));
- pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
- pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
-
- _bt_restore_page(page,
- (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
- record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
-
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
- }
- else
-/* undo */
- {
- if (XLByteLT(PageGetLSN(page), lsn))
- elog(PANIC, "btree_split_undo: bad right sibling LSN");
- elog(PANIC, "btree_split_undo: unimplemented");
- }
-
- if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
- return;
-
- /* Right (next) page */
- blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
- if (blkno == P_NONE)
- return;
-
- buffer = XLogReadBuffer(false, reln, blkno);
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_split_redo: lost next right page");
-
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "btree_split_redo: uninitialized next right page");
-
- if (XLByteLE(lsn, PageGetLSN(page)))
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
- pageop->btpo_prev = (onleft) ?
- BlockIdGetBlockNumber(&(xlrec->otherblk)) :
- ItemPointerGetBlockNumber(&(xlrec->target.tid));
-
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
-}
-
-static void
-btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
- Relation reln;
- Buffer buffer;
- Page page;
- BTPageOpaque pageop;
- Buffer metabuf;
- Page metapg;
- BTMetaPageData md;
-
- if (!redo)
- return;
-
- reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
- if (!RelationIsValid(reln))
- return;
- buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_newroot_redo: no root page");
- metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
- if (!BufferIsValid(buffer))
- elog(PANIC, "btree_newroot_redo: no metapage");
- page = (Page) BufferGetPage(buffer);
- _bt_pageinit(page, BufferGetPageSize(buffer));
- pageop = (BTPageOpaque) PageGetSpecialPointer(page);
-
- pageop->btpo_flags |= BTP_ROOT;
- pageop->btpo_prev = pageop->btpo_next = P_NONE;
- pageop->btpo_parent = BTREE_METAPAGE;
-
- if (record->xl_info & XLOG_BTREE_LEAF)
- pageop->btpo_flags |= BTP_LEAF;
-
- if (record->xl_len > SizeOfBtreeNewroot)
- _bt_restore_page(page,
- (char *) xlrec + SizeOfBtreeNewroot,
- record->xl_len - SizeOfBtreeNewroot);
-
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
-
- metapg = BufferGetPage(metabuf);
- _bt_pageinit(metapg, BufferGetPageSize(metabuf));
- md.btm_magic = BTREE_MAGIC;
- md.btm_version = BTREE_VERSION;
- md.btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
- md.btm_level = xlrec->level;
- memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
-
- pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
- pageop->btpo_flags = BTP_META;
-
- PageSetLSN(metapg, lsn);
- PageSetSUI(metapg, ThisStartUpID);
- UnlockAndWriteBuffer(metabuf);
-}
-
-void
-btree_redo(XLogRecPtr lsn, XLogRecord *record)
-{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
- info &= ~XLOG_BTREE_LEAF;
- if (info == XLOG_BTREE_DELETE)
- btree_xlog_delete(true, lsn, record);
- else if (info == XLOG_BTREE_INSERT)
- btree_xlog_insert(true, lsn, record);
- else if (info == XLOG_BTREE_SPLIT)
- btree_xlog_split(true, false, lsn, record); /* new item on the right */
- else if (info == XLOG_BTREE_SPLEFT)
- btree_xlog_split(true, true, lsn, record); /* new item on the left */
- else if (info == XLOG_BTREE_NEWROOT)
- btree_xlog_newroot(true, lsn, record);
- else
- elog(PANIC, "btree_redo: unknown op code %u", info);
-}
-
-void
-btree_undo(XLogRecPtr lsn, XLogRecord *record)
-{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
- info &= ~XLOG_BTREE_LEAF;
- if (info == XLOG_BTREE_DELETE)
- btree_xlog_delete(false, lsn, record);
- else if (info == XLOG_BTREE_INSERT)
- btree_xlog_insert(false, lsn, record);
- else if (info == XLOG_BTREE_SPLIT)
- btree_xlog_split(false, false, lsn, record); /* new item on the right */
- else if (info == XLOG_BTREE_SPLEFT)
- btree_xlog_split(false, true, lsn, record); /* new item on the left */
- else if (info == XLOG_BTREE_NEWROOT)
- btree_xlog_newroot(false, lsn, record);
- else
- elog(PANIC, "btree_undo: unknown op code %u", info);
-}
-
-static void
-out_target(char *buf, xl_btreetid *target)
-{
- sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
- target->node.tblNode, target->node.relNode,
- ItemPointerGetBlockNumber(&(target->tid)),
- ItemPointerGetOffsetNumber(&(target->tid)));
-}
-
-void
-btree_desc(char *buf, uint8 xl_info, char *rec)
-{
- uint8 info = xl_info & ~XLR_INFO_MASK;
-
- info &= ~XLOG_BTREE_LEAF;
- if (info == XLOG_BTREE_INSERT)
- {
- xl_btree_insert *xlrec = (xl_btree_insert *) rec;
-
- strcat(buf, "insert: ");
- out_target(buf, &(xlrec->target));
- }
- else if (info == XLOG_BTREE_DELETE)
- {
- xl_btree_delete *xlrec = (xl_btree_delete *) rec;
-
- strcat(buf, "delete: ");
- out_target(buf, &(xlrec->target));
- }
- else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
- {
- xl_btree_split *xlrec = (xl_btree_split *) rec;
-
- sprintf(buf + strlen(buf), "split(%s): ",
- (info == XLOG_BTREE_SPLIT) ? "right" : "left");
- out_target(buf, &(xlrec->target));
- sprintf(buf + strlen(buf), "; oth %u; rgh %u",
- BlockIdGetBlockNumber(&xlrec->otherblk),
- BlockIdGetBlockNumber(&xlrec->rightblk));
- }
- else if (info == XLOG_BTREE_NEWROOT)
- {
- xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
-
- sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
- xlrec->node.tblNode, xlrec->node.relNode,
- BlockIdGetBlockNumber(&xlrec->rootblk));
- }
- else
- strcat(buf, "UNKNOWN");
-}