summaryrefslogtreecommitdiff
path: root/src/backend/access/nbtree/nbtxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r--src/backend/access/nbtree/nbtxlog.c106
1 files changed, 63 insertions, 43 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index c10936a8e27..7aae27dc853 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.31 2006/04/01 03:03:37 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.32 2006/04/13 03:53:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,32 +51,16 @@ log_incomplete_split(RelFileNode node, BlockNumber leftblk,
}
static void
-forget_matching_split(Relation reln, RelFileNode node,
- BlockNumber insertblk, OffsetNumber offnum,
- bool is_root)
+forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
{
- Buffer buffer;
- Page page;
- IndexTuple itup;
- BlockNumber rightblk;
ListCell *l;
- /* Get downlink TID from page */
- buffer = XLogReadBuffer(reln, insertblk, false);
- if (!BufferIsValid(buffer))
- return;
- page = (Page) BufferGetPage(buffer);
- itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
- rightblk = ItemPointerGetBlockNumber(&(itup->t_tid));
- Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
- UnlockReleaseBuffer(buffer);
-
foreach(l, incomplete_splits)
{
bt_incomplete_split *split = (bt_incomplete_split *) lfirst(l);
if (RelFileNodeEquals(node, split->node) &&
- rightblk == split->rightblk)
+ downlink == split->rightblk)
{
if (is_root != split->is_root)
elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
@@ -87,6 +71,20 @@ forget_matching_split(Relation reln, RelFileNode node,
}
}
+/*
+ * _bt_restore_page -- re-enter all the index tuples on a page
+ *
+ * The page is freshly init'd, and *from (length len) is a copy of what
+ * had been its upper part (pd_upper to pd_special). We assume that the
+ * tuples had been added to the page in item-number order, and therefore
+ * the one with highest item number appears first (lowest on the page).
+ *
+ * NOTE: the way this routine is coded, the rebuilt page will have the items
+ * in correct itemno sequence, but physically the opposite order from the
+ * original, because we insert them in the opposite of itemno order. This
+ * does not matter in any current btree code, but it's something to keep an
+ * eye on. Is it worth changing just on general principles?
+ */
static void
_bt_restore_page(Page page, char *from, int len)
{
@@ -158,9 +156,16 @@ btree_xlog_insert(bool isleaf, bool ismeta,
char *datapos;
int datalen;
xl_btree_metadata md;
+ BlockNumber downlink = 0;
datapos = (char *) xlrec + SizeOfBtreeInsert;
datalen = record->xl_len - SizeOfBtreeInsert;
+ if (!isleaf)
+ {
+ memcpy(&downlink, datapos, sizeof(BlockNumber));
+ datapos += sizeof(BlockNumber);
+ datalen -= sizeof(BlockNumber);
+ }
if (ismeta)
{
memcpy(&md, datapos, sizeof(xl_btree_metadata));
@@ -168,8 +173,7 @@ btree_xlog_insert(bool isleaf, bool ismeta,
datalen -= sizeof(xl_btree_metadata);
}
- if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta &&
- incomplete_splits == NIL)
+ if ((record->xl_info & XLR_BKP_BLOCK_1) && !ismeta && isleaf)
return; /* nothing to do */
reln = XLogOpenRelation(xlrec->target.node);
@@ -208,13 +212,8 @@ btree_xlog_insert(bool isleaf, bool ismeta,
md.fastroot, md.fastlevel);
/* Forget any split this insertion completes */
- if (!isleaf && incomplete_splits != NIL)
- {
- forget_matching_split(reln, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
- false);
- }
+ if (!isleaf)
+ forget_matching_split(xlrec->target.node, downlink, false);
}
static void
@@ -224,14 +223,17 @@ btree_xlog_split(bool onleft, bool isroot,
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
Relation reln;
BlockNumber targetblk;
+ OffsetNumber targetoff;
BlockNumber leftsib;
BlockNumber rightsib;
+ BlockNumber downlink = 0;
Buffer buffer;
Page page;
BTPageOpaque pageop;
reln = XLogOpenRelation(xlrec->target.node);
targetblk = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ targetoff = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
leftsib = (onleft) ? targetblk : xlrec->otherblk;
rightsib = (onleft) ? xlrec->otherblk : targetblk;
@@ -252,6 +254,16 @@ btree_xlog_split(bool onleft, bool isroot,
(char *) xlrec + SizeOfBtreeSplit,
xlrec->leftlen);
+ if (onleft && xlrec->level > 0)
+ {
+ IndexTuple itup;
+
+ /* extract downlink in the target tuple */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
+ downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -274,6 +286,16 @@ btree_xlog_split(bool onleft, bool isroot,
(char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
+ if (!onleft && xlrec->level > 0)
+ {
+ IndexTuple itup;
+
+ /* extract downlink in the target tuple */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, targetoff));
+ downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
@@ -308,13 +330,8 @@ btree_xlog_split(bool onleft, bool isroot,
}
/* Forget any split this insertion completes */
- if (xlrec->level > 0 && incomplete_splits != NIL)
- {
- forget_matching_split(reln, xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
- false);
- }
+ if (xlrec->level > 0)
+ forget_matching_split(xlrec->target.node, downlink, false);
/* The job ain't done till the parent link is inserted... */
log_incomplete_split(xlrec->target.node,
@@ -516,6 +533,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
BTPageOpaque pageop;
+ BlockNumber downlink = 0;
reln = XLogOpenRelation(xlrec->node);
buffer = XLogReadBuffer(reln, xlrec->rootblk, true);
@@ -532,9 +550,17 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
pageop->btpo_flags |= BTP_LEAF;
if (record->xl_len > SizeOfBtreeNewroot)
+ {
+ IndexTuple itup;
+
_bt_restore_page(page,
(char *) xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
+ /* extract downlink to the right-hand split page */
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
+ downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
+ Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ }
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
@@ -546,14 +572,8 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
xlrec->rootblk, xlrec->level);
/* Check to see if this satisfies any incomplete insertions */
- if (record->xl_len > SizeOfBtreeNewroot &&
- incomplete_splits != NIL)
- {
- forget_matching_split(reln, xlrec->node,
- xlrec->rootblk,
- P_FIRSTKEY,
- true);
- }
+ if (record->xl_len > SizeOfBtreeNewroot)
+ forget_matching_split(xlrec->node, downlink, true);
}