summaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginbtree.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginbtree.c')
-rw-r--r--src/backend/access/gin/ginbtree.c447
1 files changed, 315 insertions, 132 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index f3c0dfd128f..7b9b85dc04c 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -18,6 +18,13 @@
#include "miscadmin.h"
#include "utils/rel.h"
+static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
+static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
+ void *insertdata, BlockNumber updateblkno,
+ Buffer childbuf, GinStatsData *buildStats);
+static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
+ bool freestack, GinStatsData *buildStats);
+
/*
* Lock buffer by needed method for search.
*/
@@ -86,6 +93,13 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
access = ginTraverseLock(stack->buffer, searchMode);
/*
+ * If we're going to modify the tree, finish any incomplete splits we
+ * encounter on the way.
+ */
+ if (!searchMode && GinPageIsIncompleteSplit(page))
+ ginFinishSplit(btree, stack, false, NULL);
+
+ /*
* ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization
*/
@@ -101,6 +115,9 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
page = BufferGetPage(stack->buffer);
+
+ if (!searchMode && GinPageIsIncompleteSplit(page))
+ ginFinishSplit(btree, stack, false, NULL);
}
if (GinPageIsLeaf(page)) /* we found, return locked page */
@@ -187,7 +204,7 @@ freeGinBtreeStack(GinBtreeStack *stack)
* child's offset in stack->parent. The root page is never released, to
* to prevent conflict with vacuum process.
*/
-void
+static void
ginFindParents(GinBtree btree, GinBtreeStack *stack)
{
Page page;
@@ -195,58 +212,55 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
BlockNumber blkno,
leftmostBlkno;
OffsetNumber offset;
- GinBtreeStack *root = stack->parent;
+ GinBtreeStack *root;
GinBtreeStack *ptr;
- if (!root)
+ /*
+ * Unwind the stack all the way up to the root, leaving only the root
+ * item.
+ *
+ * Be careful not to release the pin on the root page! The pin on root
+ * page is required to lock out concurrent vacuums on the tree.
+ */
+ root = stack->parent;
+ while (root->parent)
{
- /* XLog mode... */
- root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
- root->blkno = btree->rootBlkno;
- root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
- LockBuffer(root->buffer, GIN_EXCLUSIVE);
- root->parent = NULL;
+ ReleaseBuffer(root->buffer);
+ root = root->parent;
}
- else
- {
- /*
- * find root, we should not release root page until update is
- * finished!!
- */
- while (root->parent)
- {
- ReleaseBuffer(root->buffer);
- root = root->parent;
- }
- Assert(root->blkno == btree->rootBlkno);
- Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
- LockBuffer(root->buffer, GIN_EXCLUSIVE);
- }
+ Assert(root->blkno == btree->rootBlkno);
+ Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
root->off = InvalidOffsetNumber;
- page = BufferGetPage(root->buffer);
- Assert(!GinPageIsLeaf(page));
-
- /* check trivial case */
- if ((root->off = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidOffsetNumber)
- {
- stack->parent = root;
- return;
- }
+ blkno = root->blkno;
+ buffer = root->buffer;
+ offset = InvalidOffsetNumber;
- leftmostBlkno = blkno = btree->getLeftMostChild(btree, page);
- LockBuffer(root->buffer, GIN_UNLOCK);
- Assert(blkno != InvalidBlockNumber);
+ ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
for (;;)
{
- buffer = ReadBuffer(btree->index, blkno);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
elog(ERROR, "Lost path");
+ if (GinPageIsIncompleteSplit(page))
+ {
+ Assert(blkno != btree->rootBlkno);
+ ptr->blkno = blkno;
+ ptr->buffer = buffer;
+ /*
+ * parent may be wrong, but if so, the ginFinishSplit call will
+ * recurse to call ginFindParents again to fix it.
+ */
+ ptr->parent = root;
+ ptr->off = InvalidOffsetNumber;
+
+ ginFinishSplit(btree, ptr, false, NULL);
+ }
+
leftmostBlkno = btree->getLeftMostChild(btree, page);
while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
@@ -259,11 +273,22 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
}
buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
+
+ /* finish any incomplete splits, as above */
+ if (GinPageIsIncompleteSplit(page))
+ {
+ Assert(blkno != btree->rootBlkno);
+ ptr->blkno = blkno;
+ ptr->buffer = buffer;
+ ptr->parent = root;
+ ptr->off = InvalidOffsetNumber;
+
+ ginFinishSplit(btree, ptr, false, NULL);
+ }
}
if (blkno != InvalidBlockNumber)
{
- ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
ptr->blkno = blkno;
ptr->buffer = buffer;
ptr->parent = root; /* it may be wrong, but in next call we will
@@ -273,7 +298,9 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
return;
}
+ /* Descend down to next level */
blkno = leftmostBlkno;
+ buffer = ReadBuffer(btree->index, blkno);
}
}
@@ -284,19 +311,38 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
* the parent needs to be updated. (a root split returns true as it doesn't
* need any further action by the caller to complete)
*
- * When inserting a downlink to a internal page, the existing item at the
- * given location is updated to point to 'updateblkno'.
+ * When inserting a downlink to a internal page, 'childbuf' contains the
+ * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
+ * atomically with the insert. Also, the existing item at the given location
+ * is updated to point to 'updateblkno'.
*
* stack->buffer is locked on entry, and is kept locked.
*/
static bool
ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
- GinStatsData *buildStats)
+ Buffer childbuf, GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
- XLogRecData *rdata;
+ XLogRecData *payloadrdata;
bool fit;
+ uint16 xlflags = 0;
+ Page childpage = NULL;
+
+ if (GinPageIsData(page))
+ xlflags |= GIN_INSERT_ISDATA;
+ if (GinPageIsLeaf(page))
+ {
+ xlflags |= GIN_INSERT_ISLEAF;
+ Assert(!BufferIsValid(childbuf));
+ Assert(updateblkno == InvalidBlockNumber);
+ }
+ else
+ {
+ Assert(BufferIsValid(childbuf));
+ Assert(updateblkno != InvalidBlockNumber);
+ childpage = BufferGetPage(childbuf);
+ }
/*
* Try to put the incoming tuple on the page. If it doesn't fit,
@@ -306,17 +352,63 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
START_CRIT_SECTION();
fit = btree->placeToPage(btree, stack->buffer, stack->off,
insertdata, updateblkno,
- &rdata);
+ &payloadrdata);
if (fit)
{
MarkBufferDirty(stack->buffer);
+ /* An insert to an internal page finishes the split of the child. */
+ if (childbuf != InvalidBuffer)
+ {
+ GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
+ MarkBufferDirty(childbuf);
+ }
+
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
+ XLogRecData rdata[3];
+ ginxlogInsert xlrec;
+ BlockIdData childblknos[2];
+
+ xlrec.node = btree->index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(stack->buffer);
+ xlrec.offset = stack->off;
+ xlrec.flags = xlflags;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(ginxlogInsert);
+
+ /*
+ * Log information about child if this was an insertion of a
+ * downlink.
+ */
+ if (childbuf != InvalidBuffer)
+ {
+ rdata[0].next = &rdata[1];
+
+ BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
+ BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) childblknos;
+ rdata[1].len = sizeof(BlockIdData) * 2;
+ rdata[1].next = &rdata[2];
+
+ rdata[2].buffer = childbuf;
+ rdata[2].buffer_std = false;
+ rdata[2].data = NULL;
+ rdata[2].len = 0;
+ rdata[2].next = payloadrdata;
+ }
+ else
+ rdata[0].next = payloadrdata;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
PageSetLSN(page, recptr);
+ if (childbuf != InvalidBuffer)
+ PageSetLSN(childpage, recptr);
}
END_CRIT_SECTION();
@@ -329,13 +421,16 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
Buffer rbuffer;
Page newlpage;
BlockNumber savedRightLink;
- GinBtreeStack *parent;
- Page lpage,
- rpage;
+ Page rpage;
+ XLogRecData rdata[2];
+ ginxlogSplit data;
+ Buffer lbuffer = InvalidBuffer;
+ Page newrootpg = NULL;
END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index);
+
/* During index build, count the new page */
if (buildStats)
{
@@ -353,21 +448,50 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
insertdata, updateblkno,
- &rdata);
+ &payloadrdata);
- ((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
+ data.node = btree->index->rd_node;
+ data.rblkno = BufferGetBlockNumber(rbuffer);
+ data.flags = xlflags;
+ if (childbuf != InvalidBuffer)
+ {
+ Page childpage = BufferGetPage(childbuf);
+ GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
- parent = stack->parent;
+ data.leftChildBlkno = BufferGetBlockNumber(childbuf);
+ data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
+ }
+ else
+ data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogSplit);
+
+ if (childbuf != InvalidBuffer)
+ {
+ rdata[0].next = &rdata[1];
+
+ rdata[1].buffer = childbuf;
+ rdata[1].buffer_std = false;
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].next = payloadrdata;
+ }
+ else
+ rdata[0].next = payloadrdata;
+
+ rpage = BufferGetPage(rbuffer);
- if (parent == NULL)
+ if (stack->parent == NULL)
{
/*
* split root, so we need to allocate new left page and place
* pointer on root to left and right page
*/
- Buffer lbuffer = GinNewBuffer(btree->index);
+ lbuffer = GinNewBuffer(btree->index);
- /* During index build, count the new page */
+ /* During index build, count the newly-added root page */
if (buildStats)
{
if (btree->isData)
@@ -376,82 +500,97 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
buildStats->nEntryPages++;
}
- ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
- ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
-
- lpage = BufferGetPage(lbuffer);
- rpage = BufferGetPage(rbuffer);
+ /*
+ * root never has a right-link, so we borrow the rrlink field to
+ * store the root block number.
+ */
+ data.rrlink = BufferGetBlockNumber(stack->buffer);
+ data.lblkno = BufferGetBlockNumber(lbuffer);
+ data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
- ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
-
- START_CRIT_SECTION();
-
- GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
- PageRestoreTempPage(newlpage, lpage);
- btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
-
- MarkBufferDirty(rbuffer);
- MarkBufferDirty(lbuffer);
- MarkBufferDirty(stack->buffer);
-
- if (RelationNeedsWAL(btree->index))
- {
- XLogRecPtr recptr;
-
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
- PageSetLSN(page, recptr);
- PageSetLSN(lpage, recptr);
- PageSetLSN(rpage, recptr);
- }
-
- UnlockReleaseBuffer(rbuffer);
- UnlockReleaseBuffer(lbuffer);
- END_CRIT_SECTION();
- /* During index build, count the newly-added root page */
- if (buildStats)
- {
- if (btree->isData)
- buildStats->nDataPages++;
- else
- buildStats->nEntryPages++;
- }
+ /*
+ * Construct a new root page containing downlinks to the new left
+ * and right pages. (do this in a temporary copy first rather
+ * than overwriting the original page directly, so that we can still
+ * abort gracefully if this fails.)
+ */
+ newrootpg = PageGetTempPage(rpage);
+ GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ);
- return true;
+ btree->fillRoot(btree, newrootpg,
+ BufferGetBlockNumber(lbuffer), newlpage,
+ BufferGetBlockNumber(rbuffer), rpage);
}
else
{
/* split non-root page */
- ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
- ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
-
- lpage = BufferGetPage(stack->buffer);
- rpage = BufferGetPage(rbuffer);
+ data.rrlink = savedRightLink;
+ data.lblkno = BufferGetBlockNumber(stack->buffer);
GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+ GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
+ }
+
+ /*
+ * Ok, we have the new contents of the left page in a temporary copy
+ * now (newlpage), and the newly-allocated right block has been filled
+ * in. The original page is still unchanged.
+ *
+ * If this is a root split, we also have a temporary page containing
+ * the new contents of the root. Copy the new left page to a
+ * newly-allocated block, and initialize the (original) root page the
+ * new copy. Otherwise, copy over the temporary copy of the new left
+ * page over the old left page.
+ */
- START_CRIT_SECTION();
- PageRestoreTempPage(newlpage, lpage);
+ START_CRIT_SECTION();
- MarkBufferDirty(rbuffer);
- MarkBufferDirty(stack->buffer);
+ MarkBufferDirty(rbuffer);
- if (RelationNeedsWAL(btree->index))
- {
- XLogRecPtr recptr;
+ if (stack->parent == NULL)
+ {
+ PageRestoreTempPage(newlpage, BufferGetPage(lbuffer));
+ MarkBufferDirty(lbuffer);
+ newlpage = newrootpg;
+ }
- recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
- PageSetLSN(lpage, recptr);
- PageSetLSN(rpage, recptr);
- }
- UnlockReleaseBuffer(rbuffer);
- END_CRIT_SECTION();
+ PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer));
+ MarkBufferDirty(stack->buffer);
- return false;
+ /* write WAL record */
+ if (RelationNeedsWAL(btree->index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
+ PageSetLSN(BufferGetPage(stack->buffer), recptr);
+ PageSetLSN(rpage, recptr);
+ if (stack->parent == NULL)
+ PageSetLSN(BufferGetPage(lbuffer), recptr);
}
+ END_CRIT_SECTION();
+
+ /*
+ * We can release the lock on the right page now, but keep the
+ * original buffer locked.
+ */
+ UnlockReleaseBuffer(rbuffer);
+ if (stack->parent == NULL)
+ UnlockReleaseBuffer(lbuffer);
+
+ /*
+ * If we split the root, we're done. Otherwise the split is not
+ * complete until the downlink for the new page has been inserted to
+ * the parent.
+ */
+ if (stack->parent == NULL)
+ return true;
+ else
+ return false;
}
}
@@ -460,13 +599,27 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*
* On entry, stack->buffer is exclusively locked.
*
- * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ * If freestack is true, all the buffers are released and unlocked as we
+ * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
+ * locked, and stack is unmodified, except for possibly moving right to find
+ * the correct parent of page.
*/
-void
-ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+static void
+ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
+ GinStatsData *buildStats)
{
Page page;
bool done;
+ bool first = true;
+
+ /*
+ * freestack == false when we encounter an incompletely split page during a
+ * scan, while freestack == true is used in the normal scenario that a
+ * split is finished right after the initial insert.
+ */
+ if (!freestack)
+ elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
+ stack->blkno, RelationGetRelationName(btree->index));
/* this loop crawls up the stack until the insertion is complete */
do
@@ -475,19 +628,26 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
void *insertdata;
BlockNumber updateblkno;
- insertdata = btree->prepareDownlink(btree, stack->buffer);
- updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
-
/* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE);
+ /*
+ * If the parent page was incompletely split, finish that split first,
+ * then continue with the current one.
+ *
+ * Note: we have to finish *all* incomplete splits we encounter, even
+ * if we have to move right. Otherwise we might choose as the target
+ * a page that has no downlink in the parent, and splitting it further
+ * would fail.
+ */
+ if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
+ ginFinishSplit(btree, parent, false, buildStats);
+
/* move right if it's needed */
page = BufferGetPage(parent->buffer);
while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
{
- BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
-
- if (rightlink == InvalidBlockNumber)
+ if (GinPageRightMost(page))
{
/*
* rightmost page, but we don't find parent, we should use
@@ -501,25 +661,44 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
}
parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
- parent->blkno = rightlink;
+ parent->blkno = BufferGetBlockNumber(parent->buffer);
page = BufferGetPage(parent->buffer);
- }
- /* release the child */
- UnlockReleaseBuffer(stack->buffer);
- pfree(stack);
- stack = parent;
+ if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
+ ginFinishSplit(btree, parent, false, buildStats);
+ }
- /* insert the downlink to parent */
- done = ginPlaceToPage(btree, stack,
+ /* insert the downlink */
+ insertdata = btree->prepareDownlink(btree, stack->buffer);
+ updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
+ done = ginPlaceToPage(btree, parent,
insertdata, updateblkno,
- buildStats);
+ stack->buffer, buildStats);
pfree(insertdata);
+
+ /*
+ * If the caller requested to free the stack, unlock and release the
+ * child buffer now. Otherwise keep it pinned and locked, but if we
+ * have to recurse up the tree, we can unlock the upper pages, only
+ * keeping the page at the bottom of the stack locked.
+ */
+ if (!first || freestack)
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ if (freestack)
+ {
+ ReleaseBuffer(stack->buffer);
+ pfree(stack);
+ }
+ stack = parent;
+
+ first = false;
} while (!done);
+
+ /* unlock the parent */
LockBuffer(stack->buffer, GIN_UNLOCK);
- /* free the rest of the stack */
- freeGinBtreeStack(stack);
+ if (freestack)
+ freeGinBtreeStack(stack);
}
/*
@@ -540,14 +719,18 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
{
bool done;
+ /* If the leaf page was incompletely split, finish the split first */
+ if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
+ ginFinishSplit(btree, stack, false, buildStats);
+
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,
- buildStats);
+ InvalidBuffer, buildStats);
if (done)
{
LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
}
else
- ginFinishSplit(btree, stack, buildStats);
+ ginFinishSplit(btree, stack, true, buildStats);
}