diff options
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/access/gin/ginxlog.c | 308 | 
1 files changed, 181 insertions, 127 deletions
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index cff5bc8bd59..f9ed583ec0f 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -77,11 +77,13 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)  	MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);  	Assert(BufferIsValid(MetaBuffer)); +	page = (Page) BufferGetPage(MetaBuffer); +  	GinInitMetabuffer(MetaBuffer); -	page = (Page) BufferGetPage(MetaBuffer);  	PageSetLSN(page, lsn);  	PageSetTLI(page, ThisTimeLineID); +	MarkBufferDirty(MetaBuffer);  	RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);  	Assert(BufferIsValid(RootBuffer)); @@ -91,11 +93,10 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)  	PageSetLSN(page, lsn);  	PageSetTLI(page, ThisTimeLineID); - -	MarkBufferDirty(MetaBuffer); -	UnlockReleaseBuffer(MetaBuffer);  	MarkBufferDirty(RootBuffer); +  	UnlockReleaseBuffer(RootBuffer); +	UnlockReleaseBuffer(MetaBuffer);  }  static void @@ -128,21 +129,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)  	Buffer		buffer;  	Page		page; +	/* first, forget any incomplete split this insertion completes */ +	if (data->isData) +	{ +		Assert(data->isDelete == FALSE); +		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) +		{ +			PostingItem *pitem; + +			pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); +			forgetIncompleteSplit(data->node, +								  PostingItemGetBlockNumber(pitem), +								  data->updateBlkno); +		} + +	} +	else +	{ +		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) +		{ +			IndexTuple	itup; + +			itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); +			forgetIncompleteSplit(data->node, +								  GinItemPointerGetBlockNumber(&itup->t_tid), +								  data->updateBlkno); +		} +	} +  	/* nothing else to do if page was backed up */  	if (record->xl_info & XLR_BKP_BLOCK_1)  		return;  	buffer = XLogReadBuffer(data->node, data->blkno, false); -	Assert(BufferIsValid(buffer)); +	if (!BufferIsValid(buffer)) +		return;					/* page was deleted, nothing to do */  	page = (Page) BufferGetPage(buffer); -	if (data->isData) +	if (!XLByteLE(lsn, PageGetLSN(page)))  	{ -		Assert(data->isDelete == FALSE); -		Assert(GinPageIsData(page)); - -		if (!XLByteLE(lsn, PageGetLSN(page))) +		if (data->isData)  		{ +			Assert(GinPageIsData(page)); +  			if (data->isLeaf)  			{  				OffsetNumber i; @@ -172,23 +201,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)  				GinDataPageAddItem(page, pitem, data->offset);  			}  		} - -		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) +		else  		{ -			PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); +			IndexTuple	itup; -			forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno); -		} +			Assert(!GinPageIsData(page)); -	} -	else -	{ -		IndexTuple	itup; - -		Assert(!GinPageIsData(page)); - -		if (!XLByteLE(lsn, PageGetLSN(page))) -		{  			if (data->updateBlkno != InvalidBlockNumber)  			{  				/* update link to right page after split */ @@ -212,20 +230,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)  				  data->node.spcNode, data->node.dbNode, data->node.relNode);  		} -		if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) -		{ -			itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); -			forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno); -		} -	} - -	if (!XLByteLE(lsn, PageGetLSN(page))) -	{  		PageSetLSN(page, lsn);  		PageSetTLI(page, ThisTimeLineID);  		MarkBufferDirty(buffer);  	} +  	UnlockReleaseBuffer(buffer);  } @@ -244,7 +254,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)  	if (data->isData)  		flags |= GIN_DATA; -	lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit); +	lbuffer = XLogReadBuffer(data->node, data->lblkno, true);  	Assert(BufferIsValid(lbuffer));  	lpage = (Page) BufferGetPage(lbuffer);  	GinInitBuffer(lbuffer, flags); @@ -321,7 +331,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)  	if (data->isRootSplit)  	{ -		Buffer		rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false); +		Buffer		rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);  		Page		rootPage = BufferGetPage(rootBuf);  		GinInitBuffer(rootBuf, flags & ~GIN_LEAF); @@ -357,45 +367,49 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)  	Buffer		buffer;  	Page		page; -	/* nothing else to do if page was backed up (and no info to do it with) */ +	/* nothing to do if page was backed up (and no info to do it with) */  	if (record->xl_info & XLR_BKP_BLOCK_1)  		return;  	buffer = XLogReadBuffer(data->node, data->blkno, false); -	Assert(BufferIsValid(buffer)); +	if (!BufferIsValid(buffer)) +		return;  	page = (Page) BufferGetPage(buffer); -	if (GinPageIsData(page)) -	{ -		memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), -			   GinSizeOfItem(page) *data->nitem); -		GinPageGetOpaque(page)->maxoff = data->nitem; -	} -	else +	if (!XLByteLE(lsn, PageGetLSN(page)))  	{ -		OffsetNumber i, -				   *tod; -		IndexTuple	itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); +		if (GinPageIsData(page)) +		{ +			memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), +				   GinSizeOfItem(page) *data->nitem); +			GinPageGetOpaque(page)->maxoff = data->nitem; +		} +		else +		{ +			OffsetNumber i, +				*tod; +			IndexTuple	itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); -		tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); -		for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) -			tod[i - 1] = i; +			tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); +			for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) +				tod[i - 1] = i; -		PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); +			PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); -		for (i = 0; i < data->nitem; i++) -		{ -			if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) -				elog(ERROR, "failed to add item to index page in %u/%u/%u", -				  data->node.spcNode, data->node.dbNode, data->node.relNode); -			itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); +			for (i = 0; i < data->nitem; i++) +			{ +				if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber) +					elog(ERROR, "failed to add item to index page in %u/%u/%u", +						 data->node.spcNode, data->node.dbNode, data->node.relNode); +				itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); +			}  		} -	} -	PageSetLSN(page, lsn); -	PageSetTLI(page, ThisTimeLineID); +		PageSetLSN(page, lsn); +		PageSetTLI(page, ThisTimeLineID); +		MarkBufferDirty(buffer); +	} -	MarkBufferDirty(buffer);  	UnlockReleaseBuffer(buffer);  } @@ -409,38 +423,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)  	if (!(record->xl_info & XLR_BKP_BLOCK_1))  	{  		buffer = XLogReadBuffer(data->node, data->blkno, false); -		page = BufferGetPage(buffer); -		Assert(GinPageIsData(page)); -		GinPageGetOpaque(page)->flags = GIN_DELETED; -		PageSetLSN(page, lsn); -		PageSetTLI(page, ThisTimeLineID); -		MarkBufferDirty(buffer); -		UnlockReleaseBuffer(buffer); +		if (BufferIsValid(buffer)) +		{ +			page = BufferGetPage(buffer); +			if (!XLByteLE(lsn, PageGetLSN(page))) +			{ +				Assert(GinPageIsData(page)); +				GinPageGetOpaque(page)->flags = GIN_DELETED; +				PageSetLSN(page, lsn); +				PageSetTLI(page, ThisTimeLineID); +				MarkBufferDirty(buffer); +			} +			UnlockReleaseBuffer(buffer); +		}  	}  	if (!(record->xl_info & XLR_BKP_BLOCK_2))  	{  		buffer = XLogReadBuffer(data->node, data->parentBlkno, false); -		page = BufferGetPage(buffer); -		Assert(GinPageIsData(page)); -		Assert(!GinPageIsLeaf(page)); -		PageDeletePostingItem(page, data->parentOffset); -		PageSetLSN(page, lsn); -		PageSetTLI(page, ThisTimeLineID); -		MarkBufferDirty(buffer); -		UnlockReleaseBuffer(buffer); +		if (BufferIsValid(buffer)) +		{ +			page = BufferGetPage(buffer); +			if (!XLByteLE(lsn, PageGetLSN(page))) +			{ +				Assert(GinPageIsData(page)); +				Assert(!GinPageIsLeaf(page)); +				PageDeletePostingItem(page, data->parentOffset); +				PageSetLSN(page, lsn); +				PageSetTLI(page, ThisTimeLineID); +				MarkBufferDirty(buffer); +			} +			UnlockReleaseBuffer(buffer); +		}  	}  	if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)  	{  		buffer = XLogReadBuffer(data->node, data->leftBlkno, false); -		page = BufferGetPage(buffer); -		Assert(GinPageIsData(page)); -		GinPageGetOpaque(page)->rightlink = data->rightLink; -		PageSetLSN(page, lsn); -		PageSetTLI(page, ThisTimeLineID); -		MarkBufferDirty(buffer); -		UnlockReleaseBuffer(buffer); +		if (BufferIsValid(buffer)) +		{ +			page = BufferGetPage(buffer); +			if (!XLByteLE(lsn, PageGetLSN(page))) +			{ +				Assert(GinPageIsData(page)); +				GinPageGetOpaque(page)->rightlink = data->rightLink; +				PageSetLSN(page, lsn); +				PageSetTLI(page, ThisTimeLineID); +				MarkBufferDirty(buffer); +			} +			UnlockReleaseBuffer(buffer); +		}  	}  } @@ -450,8 +482,11 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)  	ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);  	Buffer		metabuffer;  	Page		metapage; +	Buffer		buffer;  	metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); +	if (!BufferIsValid(metabuffer)) +		elog(PANIC, "GIN metapage disappeared");  	metapage = BufferGetPage(metabuffer);  	if (!XLByteLE(lsn, PageGetLSN(metapage))) @@ -469,40 +504,43 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)  		 */  		if (!(record->xl_info & XLR_BKP_BLOCK_1))  		{ -			Buffer		buffer = XLogReadBuffer(data->node, data->metadata.tail, false); -			Page		page = BufferGetPage(buffer); - -			if (!XLByteLE(lsn, PageGetLSN(page))) +			buffer = XLogReadBuffer(data->node, data->metadata.tail, false); +			if (BufferIsValid(buffer))  			{ -				OffsetNumber l, -							off = (PageIsEmpty(page)) ? FirstOffsetNumber : -				OffsetNumberNext(PageGetMaxOffsetNumber(page)); -				int			i, -							tupsize; -				IndexTuple	tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta)); - -				for (i = 0; i < data->ntuples; i++) +				Page		page = BufferGetPage(buffer); + +				if (!XLByteLE(lsn, PageGetLSN(page)))  				{ -					tupsize = IndexTupleSize(tuples); +					OffsetNumber l, +						off = (PageIsEmpty(page)) ? FirstOffsetNumber : +						OffsetNumberNext(PageGetMaxOffsetNumber(page)); +					int			i, +						tupsize; +					IndexTuple	tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta)); -					l = PageAddItem(page, (Item) tuples, tupsize, off, false, false); +					for (i = 0; i < data->ntuples; i++) +					{ +						tupsize = IndexTupleSize(tuples); -					if (l == InvalidOffsetNumber) -						elog(ERROR, "failed to add item to index page"); +						l = PageAddItem(page, (Item) tuples, tupsize, off, false, false); -					tuples = (IndexTuple) (((char *) tuples) + tupsize); -				} +						if (l == InvalidOffsetNumber) +							elog(ERROR, "failed to add item to index page"); -				/* -				 * Increase counter of heap tuples -				 */ -				GinPageGetOpaque(page)->maxoff++; +						tuples = (IndexTuple) (((char *) tuples) + tupsize); +					} -				PageSetLSN(page, lsn); -				PageSetTLI(page, ThisTimeLineID); -				MarkBufferDirty(buffer); +					/* +					 * Increase counter of heap tuples +					 */ +					GinPageGetOpaque(page)->maxoff++; + +					PageSetLSN(page, lsn); +					PageSetTLI(page, ThisTimeLineID); +					MarkBufferDirty(buffer); +				} +				UnlockReleaseBuffer(buffer);  			} -			UnlockReleaseBuffer(buffer);  		}  	}  	else if (data->prevTail != InvalidBlockNumber) @@ -510,19 +548,21 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)  		/*  		 * New tail  		 */ - -		Buffer		buffer = XLogReadBuffer(data->node, data->prevTail, false); -		Page		page = BufferGetPage(buffer); - -		if (!XLByteLE(lsn, PageGetLSN(page))) +		buffer = XLogReadBuffer(data->node, data->prevTail, false); +		if (BufferIsValid(buffer))  		{ -			GinPageGetOpaque(page)->rightlink = data->newRightlink; +			Page		page = BufferGetPage(buffer); -			PageSetLSN(page, lsn); -			PageSetTLI(page, ThisTimeLineID); -			MarkBufferDirty(buffer); +			if (!XLByteLE(lsn, PageGetLSN(page))) +			{ +				GinPageGetOpaque(page)->rightlink = data->newRightlink; + +				PageSetLSN(page, lsn); +				PageSetTLI(page, ThisTimeLineID); +				MarkBufferDirty(buffer); +			} +			UnlockReleaseBuffer(buffer);  		} -		UnlockReleaseBuffer(buffer);  	}  	UnlockReleaseBuffer(metabuffer); @@ -544,6 +584,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)  		return;  	buffer = XLogReadBuffer(data->node, data->blkno, true); +	Assert(BufferIsValid(buffer));  	page = BufferGetPage(buffer);  	GinInitBuffer(buffer, GIN_LIST); @@ -587,6 +628,8 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)  	int			i;  	metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); +	if (!BufferIsValid(metabuffer)) +		elog(PANIC, "GIN metapage disappeared");  	metapage = BufferGetPage(metabuffer);  	if (!XLByteLE(lsn, PageGetLSN(metapage))) @@ -600,18 +643,22 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)  	for (i = 0; i < data->ndeleted; i++)  	{  		Buffer		buffer = XLogReadBuffer(data->node, data->toDelete[i], false); -		Page		page = BufferGetPage(buffer); -		if (!XLByteLE(lsn, PageGetLSN(page))) +		if (BufferIsValid(buffer))  		{ -			GinPageGetOpaque(page)->flags = GIN_DELETED; +			Page		page = BufferGetPage(buffer); -			PageSetLSN(page, lsn); -			PageSetTLI(page, ThisTimeLineID); -			MarkBufferDirty(buffer); -		} +			if (!XLByteLE(lsn, PageGetLSN(page))) +			{ +				GinPageGetOpaque(page)->flags = GIN_DELETED; -		UnlockReleaseBuffer(buffer); +				PageSetLSN(page, lsn); +				PageSetTLI(page, ThisTimeLineID); +				MarkBufferDirty(buffer); +			} + +			UnlockReleaseBuffer(buffer); +		}  	}  	UnlockReleaseBuffer(metabuffer);  } @@ -755,6 +802,13 @@ ginContinueSplit(ginIncompleteSplit *split)  	 */  	buffer = XLogReadBuffer(split->node, split->leftBlkno, false); +	/* +	 * Failure should be impossible here, because we wrote the page earlier. +	 */ +	if (!BufferIsValid(buffer)) +		elog(PANIC, "ginContinueSplit: left block %u not found", +			 split->leftBlkno); +  	reln = CreateFakeRelcacheEntry(split->node);  	if (split->rootBlkno == GIN_ROOT_BLKNO)  | 
