diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
| -rw-r--r-- | src/backend/access/nbtree/nbtxlog.c | 106 |
1 files changed, 63 insertions, 43 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index c10936a8e27..7aae27dc853 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.31 2006/04/01 03:03:37 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.32 2006/04/13 03:53:05 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -51,32 +51,16 @@ log_incomplete_split(RelFileNode node, BlockNumber leftblk, } static void -forget_matching_split(Relation reln, RelFileNode node, - BlockNumber insertblk, OffsetNumber offnum, - bool is_root) +forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root) { - Buffer buffer; - Page page; - IndexTuple itup; - BlockNumber rightblk; ListCell *l; - /* Get downlink TID from page */ - buffer = XLogReadBuffer(reln, insertblk, false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); - rightblk = ItemPointerGetBlockNumber(&(itup->t_tid)); - Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); - UnlockReleaseBuffer(buffer); - foreach(l, incomplete_splits) { bt_incomplete_split *split = (bt_incomplete_split *) lfirst(l); if (RelFileNodeEquals(node, split->node) && - rightblk == split->rightblk) + downlink == split->rightblk) { if (is_root != split->is_root) elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)", @@ -87,6 +71,20 @@ forget_matching_split(Relation reln, RelFileNode node, } } +/* + * _bt_restore_page -- re-enter all the index tuples on a page + * + * The page is freshly init'd, and *from (length len) is a copy of what + * had been its upper part (pd_upper to pd_special). We assume that the + * tuples had been added to the page in item-number order, and therefore + * the one with highest item number appears first (lowest on the page). + * + * NOTE: the way this routine is coded, the rebuilt page will have the items + * in correct itemno sequence, but physically the opposite order from the + * original, because we insert them in the opposite of itemno order. This + * does not matter in any current btree code, but it's something to keep an + * eye on. Is it worth changing just on general principles? + */ static void _bt_restore_page(Page page, char *from, int len) { @@ -158,9 +156,16 @@ btree_xlog_insert(bool isleaf, bool ismeta, char *datapos; int datalen; xl_btree_metadata md; + BlockNumber downlink = 0; datapos = (char *) xlrec + SizeOfBtreeInsert; datalen = record->xl_len - SizeOfBtreeInsert; + if (!isleaf) + { + memcpy(&downlink, datapos, sizeof(BlockNumber)); + datapos += sizeof(BlockNumber); + datalen -= sizeof(BlockNumber); + } if (ismeta) { memcpy(&md, datapos, sizeof(xl_btree_metadata)); @@ -168,8 +173,7 @@ btree_xlog_insert(bool isleaf, bool ismeta, datalen -= sizeof(xl_btree_metadata); } - if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && - incomplete_splits == NIL) + if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf) return; /* nothing to do */ reln = XLogOpenRelation(xlrec->target.node); @@ -208,13 +212,8 @@ btree_xlog_insert(bool isleaf, bool ismeta, md.fastroot, md.fastlevel); /* Forget any split this insertion completes */ - if (!isleaf && incomplete_splits != NIL) - { - forget_matching_split(reln, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - false); - } + if (!isleaf) + forget_matching_split(xlrec->target.node, downlink, false); } static void @@ -224,14 +223,17 @@ btree_xlog_split(bool onleft, bool isroot, xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); Relation reln; BlockNumber targetblk; + OffsetNumber targetoff; BlockNumber leftsib; BlockNumber rightsib; + BlockNumber downlink = 0; Buffer buffer; Page page; BTPageOpaque pageop; reln = XLogOpenRelation(xlrec->target.node); targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); leftsib = (onleft) ? targetblk : xlrec->otherblk; rightsib = (onleft) ? xlrec->otherblk : targetblk; @@ -252,6 +254,16 @@ btree_xlog_split(bool onleft, bool isroot, (char *) xlrec + SizeOfBtreeSplit, xlrec->leftlen); + if (onleft && xlrec->level > 0) + { + IndexTuple itup; + + /* extract downlink in the target tuple */ + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff)); + downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); @@ -274,6 +286,16 @@ btree_xlog_split(bool onleft, bool isroot, (char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen, record->xl_len - SizeOfBtreeSplit - xlrec->leftlen); + if (!onleft && xlrec->level > 0) + { + IndexTuple itup; + + /* extract downlink in the target tuple */ + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff)); + downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); @@ -308,13 +330,8 @@ btree_xlog_split(bool onleft, bool isroot, } /* Forget any split this insertion completes */ - if (xlrec->level > 0 && incomplete_splits != NIL) - { - forget_matching_split(reln, xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - ItemPointerGetOffsetNumber(&(xlrec->target.tid)), - false); - } + if (xlrec->level > 0) + forget_matching_split(xlrec->target.node, downlink, false); /* The job ain't done till the parent link is inserted... */ log_incomplete_split(xlrec->target.node, @@ -516,6 +533,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; BTPageOpaque pageop; + BlockNumber downlink = 0; reln = XLogOpenRelation(xlrec->node); buffer = XLogReadBuffer(reln, xlrec->rootblk, true); @@ -532,9 +550,17 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) pageop->btpo_flags |= BTP_LEAF; if (record->xl_len > SizeOfBtreeNewroot) + { + IndexTuple itup; + _bt_restore_page(page, (char *) xlrec + SizeOfBtreeNewroot, record->xl_len - SizeOfBtreeNewroot); + /* extract downlink to the right-hand split page */ + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)); + downlink = ItemPointerGetBlockNumber(&(itup->t_tid)); + Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY); + } PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); @@ -546,14 +572,8 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record) xlrec->rootblk, xlrec->level); /* Check to see if this satisfies any incomplete insertions */ - if (record->xl_len > SizeOfBtreeNewroot && - incomplete_splits != NIL) - { - forget_matching_split(reln, xlrec->node, - xlrec->rootblk, - P_FIRSTKEY, - true); - } + if (record->xl_len > SizeOfBtreeNewroot) + forget_matching_split(xlrec->node, downlink, true); } |
