summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2010-10-11 19:04:59 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2010-10-11 19:04:59 -0400
commit2afd044c9491c484d0b2b6910b17896a64a08c01 (patch)
treef5713f015b73f54eac56fc559acde6d26fcd669f
parent10a829dea87c0378b68833d016de639d6605db94 (diff)
Fix assorted bugs in GIN's WAL replay logic.
The original coding was quite sloppy about handling the case where XLogReadBuffer fails (because the page has since been deleted). This would result in either "bad buffer id: 0" or an Assert failure during replay, if indeed the page were no longer there. In a couple of places it also neglected to check whether the change had already been applied, which would probably result in corrupted index contents. I believe that bug #5703 is an instance of the first problem. These issues could show up without replication, but only if you were unfortunate enough to crash between modification of a GIN index and the next checkpoint. Back-patch to 8.2, which is as far back as GIN has WAL support.
-rw-r--r--src/backend/access/gin/ginxlog.c198
1 files changed, 118 insertions, 80 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index 37d04cfb7e9..b7b89e4e49d 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -121,22 +121,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* first, forget any incomplete split this insertion completes */
+ if (data->isData)
+ {
+ Assert(data->isDelete == FALSE);
+ if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+ {
+ PostingItem *pitem;
+
+ pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+ forgetIncompleteSplit(data->node,
+ PostingItemGetBlockNumber(pitem),
+ data->updateBlkno);
+ }
+ }
+ else
+ {
+ if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+ {
+ IndexTuple itup;
+
+ itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+ forgetIncompleteSplit(data->node,
+ GinItemPointerGetBlockNumber(&itup->t_tid),
+ data->updateBlkno);
+ }
+ }
+
/* nothing else to do if page was backed up */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
reln = XLogOpenRelation(data->node);
buffer = XLogReadBuffer(reln, data->blkno, false);
- Assert(BufferIsValid(buffer));
+ if (!BufferIsValid(buffer))
+ return;
page = (Page) BufferGetPage(buffer);
- if (data->isData)
+ if (!XLByteLE(lsn, PageGetLSN(page)))
{
- Assert(data->isDelete == FALSE);
- Assert(GinPageIsData(page));
-
- if (!XLByteLE(lsn, PageGetLSN(page)))
+ if (data->isData)
{
+ Assert(GinPageIsData(page));
+
if (data->isLeaf)
{
OffsetNumber i;
@@ -166,23 +193,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
GinDataPageAddItem(page, pitem, data->offset);
}
}
-
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+ else
{
- PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
- forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
- }
+ IndexTuple itup;
- }
- else
- {
- IndexTuple itup;
+ Assert(!GinPageIsData(page));
- Assert(!GinPageIsData(page));
-
- if (!XLByteLE(lsn, PageGetLSN(page)))
- {
if (data->updateBlkno != InvalidBlockNumber)
{
/* update link to right page after split */
@@ -203,23 +219,15 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
+ data->node.spcNode, data->node.dbNode, data->node.relNode);
}
- if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
- {
- itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
- forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
- }
- }
-
- if (!XLByteLE(lsn, PageGetLSN(page)))
- {
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
}
+
UnlockReleaseBuffer(buffer);
}
@@ -241,7 +249,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isData)
flags |= GIN_DATA;
- lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
+ lbuffer = XLogReadBuffer(reln, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer);
GinInitBuffer(lbuffer, flags);
@@ -318,7 +326,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isRootSplit)
{
- Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
+ Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -355,46 +363,51 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
- /* nothing else to do if page was backed up (and no info to do it with) */
+ /* nothing to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
reln = XLogOpenRelation(data->node);
buffer = XLogReadBuffer(reln, data->blkno, false);
- Assert(BufferIsValid(buffer));
+ if (!BufferIsValid(buffer))
+ return;
page = (Page) BufferGetPage(buffer);
- if (GinPageIsData(page))
- {
- memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
- GinSizeOfItem(page) *data->nitem);
- GinPageGetOpaque(page)->maxoff = data->nitem;
- }
- else
+ if (!XLByteLE(lsn, PageGetLSN(page)))
{
- OffsetNumber i,
- *tod;
- IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+ if (GinPageIsData(page))
+ {
+ memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+ GinSizeOfItem(page) *data->nitem);
+ GinPageGetOpaque(page)->maxoff = data->nitem;
+ }
+ else
+ {
+ OffsetNumber i,
+ *tod;
+ IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
- tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
- for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
- tod[i - 1] = i;
+ tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
+ for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
+ tod[i - 1] = i;
- PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+ PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
- for (i = 0; i < data->nitem; i++)
- {
- if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
- elog(ERROR, "failed to add item to index page in %u/%u/%u",
- data->node.spcNode, data->node.dbNode, data->node.relNode);
- itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+ for (i = 0; i < data->nitem; i++)
+ {
+ if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page in %u/%u/%u",
+ data->node.spcNode, data->node.dbNode, data->node.relNode);
+ itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+ }
}
- }
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+
+ MarkBufferDirty(buffer);
+ }
- MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
@@ -411,38 +424,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
if (!(record->xl_info & XLR_BKP_BLOCK_1))
{
buffer = XLogReadBuffer(reln, data->blkno, false);
- page = BufferGetPage(buffer);
- Assert(GinPageIsData(page));
- GinPageGetOpaque(page)->flags = GIN_DELETED;
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->flags = GIN_DELETED;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
}
if (!(record->xl_info & XLR_BKP_BLOCK_2))
{
buffer = XLogReadBuffer(reln, data->parentBlkno, false);
- page = BufferGetPage(buffer);
- Assert(GinPageIsData(page));
- Assert(!GinPageIsLeaf(page));
- PageDeletePostingItem(page, data->parentOffset);
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ Assert(GinPageIsData(page));
+ Assert(!GinPageIsLeaf(page));
+ PageDeletePostingItem(page, data->parentOffset);
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
}
if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
{
buffer = XLogReadBuffer(reln, data->leftBlkno, false);
- page = BufferGetPage(buffer);
- Assert(GinPageIsData(page));
- GinPageGetOpaque(page)->rightlink = data->rightLink;
- PageSetLSN(page, lsn);
- PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ Assert(GinPageIsData(page));
+ GinPageGetOpaque(page)->rightlink = data->rightLink;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
+ }
}
}
@@ -560,6 +591,13 @@ ginContinueSplit(ginIncompleteSplit *split)
buffer = XLogReadBuffer(reln, split->leftBlkno, false);
+ /*
+ * Failure should be impossible here, because we wrote the page earlier.
+ */
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "ginContinueSplit: left block %u not found",
+ split->leftBlkno);
+
if (split->rootBlkno == GIN_ROOT_BLKNO)
{
prepareEntryScan(&btree, reln, (Datum) 0, NULL);