summaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginbtree.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginbtree.c')
-rw-r--r--src/backend/access/gin/ginbtree.c73
1 files changed, 40 insertions, 33 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index 813f96a521b..c17c90fbc2e 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -325,9 +325,10 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
{
Page page = BufferGetPage(stack->buffer);
XLogRecData *payloadrdata;
- bool fit;
+ GinPlaceToPageRC rc;
uint16 xlflags = 0;
Page childpage = NULL;
+ Page newlpage = NULL, newrpage = NULL;
if (GinPageIsData(page))
xlflags |= GIN_INSERT_ISDATA;
@@ -345,16 +346,17 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
}
/*
- * Try to put the incoming tuple on the page. If it doesn't fit,
- * placeToPage method will return false and leave the page unmodified, and
- * we'll have to split the page.
+ * Try to put the incoming tuple on the page. placeToPage will decide
+ * if the page needs to be split.
*/
- START_CRIT_SECTION();
- fit = btree->placeToPage(btree, stack->buffer, stack->off,
- insertdata, updateblkno,
- &payloadrdata);
- if (fit)
+ rc = btree->placeToPage(btree, stack->buffer, stack,
+ insertdata, updateblkno,
+ &payloadrdata, &newlpage, &newrpage);
+ if (rc == UNMODIFIED)
+ return true;
+ else if (rc == INSERTED)
{
+ /* placeToPage did START_CRIT_SECTION() */
MarkBufferDirty(stack->buffer);
/* An insert to an internal page finishes the split of the child. */
@@ -373,7 +375,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
xlrec.node = btree->index->rd_node;
xlrec.blkno = BufferGetBlockNumber(stack->buffer);
- xlrec.offset = stack->off;
xlrec.flags = xlflags;
rdata[0].buffer = InvalidBuffer;
@@ -415,20 +416,16 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
return true;
}
- else
+ else if (rc == SPLIT)
{
/* Didn't fit, have to split */
Buffer rbuffer;
- Page newlpage;
BlockNumber savedRightLink;
- Page rpage;
XLogRecData rdata[2];
ginxlogSplit data;
Buffer lbuffer = InvalidBuffer;
Page newrootpg = NULL;
- END_CRIT_SECTION();
-
rbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
@@ -443,12 +440,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
savedRightLink = GinPageGetOpaque(page)->rightlink;
/*
- * newlpage is a pointer to memory page, it is not associated with a
- * buffer. stack->buffer is not touched yet.
+ * newlpage and newrpage are pointers to memory pages, not associated
+ * with buffers. stack->buffer is not touched yet.
*/
- newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
- insertdata, updateblkno,
- &payloadrdata);
data.node = btree->index->rd_node;
data.rblkno = BufferGetBlockNumber(rbuffer);
@@ -481,8 +475,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
else
rdata[0].next = payloadrdata;
- rpage = BufferGetPage(rbuffer);
-
if (stack->parent == NULL)
{
/*
@@ -508,7 +500,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
data.lblkno = BufferGetBlockNumber(lbuffer);
data.flags |= GIN_SPLIT_ROOT;
- GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
+ GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
/*
@@ -517,12 +509,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
* than overwriting the original page directly, so that we can still
* abort gracefully if this fails.)
*/
- newrootpg = PageGetTempPage(rpage);
- GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ);
+ newrootpg = PageGetTempPage(newrpage);
+ GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
btree->fillRoot(btree, newrootpg,
BufferGetBlockNumber(lbuffer), newlpage,
- BufferGetBlockNumber(rbuffer), rpage);
+ BufferGetBlockNumber(rbuffer), newrpage);
}
else
{
@@ -530,7 +522,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
data.rrlink = savedRightLink;
data.lblkno = BufferGetBlockNumber(stack->buffer);
- GinPageGetOpaque(rpage)->rightlink = savedRightLink;
+ GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
}
@@ -550,16 +542,24 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
START_CRIT_SECTION();
MarkBufferDirty(rbuffer);
+ MarkBufferDirty(stack->buffer);
+ /*
+ * Restore the temporary copies over the real buffers. But don't free
+ * the temporary copies yet, WAL record data points to them.
+ */
if (stack->parent == NULL)
{
- PageRestoreTempPage(newlpage, BufferGetPage(lbuffer));
MarkBufferDirty(lbuffer);
- newlpage = newrootpg;
+ memcpy(BufferGetPage(stack->buffer), newrootpg, BLCKSZ);
+ memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
+ memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
+ }
+ else
+ {
+ memcpy(BufferGetPage(stack->buffer), newlpage, BLCKSZ);
+ memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
}
-
- PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer));
- MarkBufferDirty(stack->buffer);
/* write WAL record */
if (RelationNeedsWAL(btree->index))
@@ -568,7 +568,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(BufferGetPage(stack->buffer), recptr);
- PageSetLSN(rpage, recptr);
+ PageSetLSN(BufferGetPage(rbuffer), recptr);
if (stack->parent == NULL)
PageSetLSN(BufferGetPage(lbuffer), recptr);
}
@@ -582,6 +582,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
if (stack->parent == NULL)
UnlockReleaseBuffer(lbuffer);
+ pfree(newlpage);
+ pfree(newrpage);
+ if (newrootpg)
+ pfree(newrootpg);
+
/*
* If we split the root, we're done. Otherwise the split is not
* complete until the downlink for the new page has been inserted to
@@ -592,6 +597,8 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
else
return false;
}
+ else
+ elog(ERROR, "unknown return code from GIN placeToPage method: %d", rc);
}
/*