diff options
Diffstat (limited to 'src/backend/access/gin/ginbtree.c')
-rw-r--r-- | src/backend/access/gin/ginbtree.c | 73 |
1 files changed, 40 insertions, 33 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c index 813f96a521b..c17c90fbc2e 100644 --- a/src/backend/access/gin/ginbtree.c +++ b/src/backend/access/gin/ginbtree.c @@ -325,9 +325,10 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, { Page page = BufferGetPage(stack->buffer); XLogRecData *payloadrdata; - bool fit; + GinPlaceToPageRC rc; uint16 xlflags = 0; Page childpage = NULL; + Page newlpage = NULL, newrpage = NULL; if (GinPageIsData(page)) xlflags |= GIN_INSERT_ISDATA; @@ -345,16 +346,17 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, } /* - * Try to put the incoming tuple on the page. If it doesn't fit, - * placeToPage method will return false and leave the page unmodified, and - * we'll have to split the page. + * Try to put the incoming tuple on the page. placeToPage will decide + * if the page needs to be split. */ - START_CRIT_SECTION(); - fit = btree->placeToPage(btree, stack->buffer, stack->off, - insertdata, updateblkno, - &payloadrdata); - if (fit) + rc = btree->placeToPage(btree, stack->buffer, stack, + insertdata, updateblkno, + &payloadrdata, &newlpage, &newrpage); + if (rc == UNMODIFIED) + return true; + else if (rc == INSERTED) { + /* placeToPage did START_CRIT_SECTION() */ MarkBufferDirty(stack->buffer); /* An insert to an internal page finishes the split of the child. */ @@ -373,7 +375,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, xlrec.node = btree->index->rd_node; xlrec.blkno = BufferGetBlockNumber(stack->buffer); - xlrec.offset = stack->off; xlrec.flags = xlflags; rdata[0].buffer = InvalidBuffer; @@ -415,20 +416,16 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, return true; } - else + else if (rc == SPLIT) { /* Didn't fit, have to split */ Buffer rbuffer; - Page newlpage; BlockNumber savedRightLink; - Page rpage; XLogRecData rdata[2]; ginxlogSplit data; Buffer lbuffer = InvalidBuffer; Page newrootpg = NULL; - END_CRIT_SECTION(); - rbuffer = GinNewBuffer(btree->index); /* During index build, count the new page */ @@ -443,12 +440,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, savedRightLink = GinPageGetOpaque(page)->rightlink; /* - * newlpage is a pointer to memory page, it is not associated with a - * buffer. stack->buffer is not touched yet. + * newlpage and newrpage are pointers to memory pages, not associated + * with buffers. stack->buffer is not touched yet. */ - newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, - insertdata, updateblkno, - &payloadrdata); data.node = btree->index->rd_node; data.rblkno = BufferGetBlockNumber(rbuffer); @@ -481,8 +475,6 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, else rdata[0].next = payloadrdata; - rpage = BufferGetPage(rbuffer); - if (stack->parent == NULL) { /* @@ -508,7 +500,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, data.lblkno = BufferGetBlockNumber(lbuffer); data.flags |= GIN_SPLIT_ROOT; - GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber; + GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber; GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); /* @@ -517,12 +509,12 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, * than overwriting the original page directly, so that we can still * abort gracefully if this fails.) */ - newrootpg = PageGetTempPage(rpage); - GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ); + newrootpg = PageGetTempPage(newrpage); + GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ); btree->fillRoot(btree, newrootpg, BufferGetBlockNumber(lbuffer), newlpage, - BufferGetBlockNumber(rbuffer), rpage); + BufferGetBlockNumber(rbuffer), newrpage); } else { @@ -530,7 +522,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, data.rrlink = savedRightLink; data.lblkno = BufferGetBlockNumber(stack->buffer); - GinPageGetOpaque(rpage)->rightlink = savedRightLink; + GinPageGetOpaque(newrpage)->rightlink = savedRightLink; GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT; GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); } @@ -550,16 +542,24 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, START_CRIT_SECTION(); MarkBufferDirty(rbuffer); + MarkBufferDirty(stack->buffer); + /* + * Restore the temporary copies over the real buffers. But don't free + * the temporary copies yet, WAL record data points to them. + */ if (stack->parent == NULL) { - PageRestoreTempPage(newlpage, BufferGetPage(lbuffer)); MarkBufferDirty(lbuffer); - newlpage = newrootpg; + memcpy(BufferGetPage(stack->buffer), newrootpg, BLCKSZ); + memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ); + memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ); + } + else + { + memcpy(BufferGetPage(stack->buffer), newlpage, BLCKSZ); + memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ); } - - PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer)); - MarkBufferDirty(stack->buffer); /* write WAL record */ if (RelationNeedsWAL(btree->index)) @@ -568,7 +568,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); PageSetLSN(BufferGetPage(stack->buffer), recptr); - PageSetLSN(rpage, recptr); + PageSetLSN(BufferGetPage(rbuffer), recptr); if (stack->parent == NULL) PageSetLSN(BufferGetPage(lbuffer), recptr); } @@ -582,6 +582,11 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, if (stack->parent == NULL) UnlockReleaseBuffer(lbuffer); + pfree(newlpage); + pfree(newrpage); + if (newrootpg) + pfree(newrootpg); + /* * If we split the root, we're done. Otherwise the split is not * complete until the downlink for the new page has been inserted to @@ -592,6 +597,8 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, else return false; } + else + elog(ERROR, "unknown return code from GIN placeToPage method: %d", rc); } /* |