summaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgvacuum.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2011-12-17 16:41:16 -0500
committerTom Lane <tgl@sss.pgh.pa.us>2011-12-17 16:42:30 -0500
commit8daeb5ddd698f661eb118f8e874e7c68cfd6ae09 (patch)
tree765599b73e45a6ca5529398489f31a534ab1924e /src/backend/access/spgist/spgvacuum.c
parent19fc0fe3ae7861a8b0d3ab8b67bd01fde33bf2da (diff)
Add SP-GiST (space-partitioned GiST) index access method.
SP-GiST is comparable to GiST in flexibility, but supports non-balanced partitioned search structures rather than balanced trees. As described at PGCon 2011, this new indexing structure can beat GiST in both index build time and query speed for search problems that it is well matched to. There are a number of areas that could still use improvement, but at this point the code seems committable. Teodor Sigaev and Oleg Bartunov, with considerable revisions by Tom Lane
Diffstat (limited to 'src/backend/access/spgist/spgvacuum.c')
-rw-r--r--src/backend/access/spgist/spgvacuum.c755
1 files changed, 755 insertions, 0 deletions
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
new file mode 100644
index 00000000000..90d59920eb6
--- /dev/null
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -0,0 +1,755 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgvacuum.c
+ * vacuum for SP-GiST
+ *
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/spgist/spgvacuum.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/spgist_private.h"
+#include "access/transam.h"
+#include "catalog/storage.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/indexfsm.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+
+
+/* local state for vacuum operations */
+typedef struct spgBulkDeleteState
+{
+ /* Parameters passed in to spgvacuumscan */
+ IndexVacuumInfo *info;
+ IndexBulkDeleteResult *stats;
+ IndexBulkDeleteCallback callback;
+ void *callback_state;
+ /* Additional working state */
+ SpGistState spgstate;
+ TransactionId OldestXmin;
+ BlockNumber lastFilledBlock;
+} spgBulkDeleteState;
+
+
+/*
+ * Vacuum a regular (non-root) leaf page
+ *
+ * We must delete tuples that are targeted for deletion by the VACUUM,
+ * but not move any tuples that are referenced by outside links; we assume
+ * those are the ones that are heads of chains.
+ */
+static void
+vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer)
+{
+ Page page = BufferGetPage(buffer);
+ spgxlogVacuumLeaf xlrec;
+ XLogRecData rdata[8];
+ OffsetNumber toDead[MaxIndexTuplesPerPage];
+ OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
+ OffsetNumber moveSrc[MaxIndexTuplesPerPage];
+ OffsetNumber moveDest[MaxIndexTuplesPerPage];
+ OffsetNumber chainSrc[MaxIndexTuplesPerPage];
+ OffsetNumber chainDest[MaxIndexTuplesPerPage];
+ OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
+ bool deletable[MaxIndexTuplesPerPage + 1];
+ int nDeletable;
+ OffsetNumber i,
+ max = PageGetMaxOffsetNumber(page);
+
+ memset(predecessor, 0, sizeof(predecessor));
+ memset(deletable, 0, sizeof(deletable));
+ nDeletable = 0;
+
+ /* Scan page, identify tuples to delete, accumulate stats */
+ for (i = FirstOffsetNumber; i <= max; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, i));
+ if (lt->tupstate == SPGIST_LIVE)
+ {
+ Assert(ItemPointerIsValid(&lt->heapPtr));
+
+ if (bds->callback(&lt->heapPtr, bds->callback_state))
+ {
+ bds->stats->tuples_removed += 1;
+ deletable[i] = true;
+ nDeletable++;
+ }
+ else
+ {
+ bds->stats->num_index_tuples += 1;
+ }
+
+ /* Form predecessor map, too */
+ if (lt->nextOffset != InvalidOffsetNumber)
+ {
+ /* paranoia about corrupted chain links */
+ if (lt->nextOffset < FirstOffsetNumber ||
+ lt->nextOffset > max ||
+ predecessor[lt->nextOffset] != InvalidOffsetNumber)
+ elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
+ BufferGetBlockNumber(buffer),
+ RelationGetRelationName(index));
+ predecessor[lt->nextOffset] = i;
+ }
+ }
+ else
+ {
+ Assert(lt->nextOffset == InvalidOffsetNumber);
+ }
+ }
+
+ if (nDeletable == 0)
+ return; /* nothing more to do */
+
+ /*----------
+ * Figure out exactly what we have to do. We do this separately from
+ * actually modifying the page, mainly so that we have a representation
+ * that can be dumped into WAL and then the replay code can do exactly
+ * the same thing. The output of this step consists of six arrays
+ * describing four kinds of operations, to be performed in this order:
+ *
+ * toDead[]: tuple numbers to be replaced with DEAD tuples
+ * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
+ * moveSrc[]: tuple numbers that need to be relocated to another offset
+ * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
+ * moveDest[]: new locations for moveSrc tuples
+ * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
+ * chainDest[]: new values of nextOffset for chainSrc members
+ *
+ * It's easiest to figure out what we have to do by processing tuple
+ * chains, so we iterate over all the tuples (not just the deletable
+ * ones!) to identify chain heads, then chase down each chain and make
+ * work item entries for deletable tuples within the chain.
+ *----------
+ */
+ xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
+
+ for (i = FirstOffsetNumber; i <= max; i++)
+ {
+ SpGistLeafTuple head;
+ bool interveningDeletable;
+ OffsetNumber prevLive;
+ OffsetNumber j;
+
+ head = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, i));
+ if (head->tupstate != SPGIST_LIVE)
+ continue; /* can't be a chain member */
+ if (predecessor[i] != 0)
+ continue; /* not a chain head */
+
+ /* initialize ... */
+ interveningDeletable = false;
+ prevLive = deletable[i] ? InvalidOffsetNumber : i;
+
+ /* scan down the chain ... */
+ j = head->nextOffset;
+ while (j != InvalidOffsetNumber)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, j));
+ if (lt->tupstate != SPGIST_LIVE)
+ {
+ /* all tuples in chain should be live */
+ elog(ERROR, "unexpected SPGiST tuple state: %d",
+ lt->tupstate);
+ }
+
+ if (deletable[j])
+ {
+ /* This tuple should be replaced by a placeholder */
+ toPlaceholder[xlrec.nPlaceholder] = j;
+ xlrec.nPlaceholder++;
+ /* previous live tuple's chain link will need an update */
+ interveningDeletable = true;
+ }
+ else if (prevLive == InvalidOffsetNumber)
+ {
+ /*
+ * This is the first live tuple in the chain. It has
+ * to move to the head position.
+ */
+ moveSrc[xlrec.nMove] = j;
+ moveDest[xlrec.nMove] = i;
+ xlrec.nMove++;
+ /* Chain updates will be applied after the move */
+ prevLive = i;
+ interveningDeletable = false;
+ }
+ else
+ {
+ /*
+ * Second or later live tuple. Arrange to re-chain it to the
+ * previous live one, if there was a gap.
+ */
+ if (interveningDeletable)
+ {
+ chainSrc[xlrec.nChain] = prevLive;
+ chainDest[xlrec.nChain] = j;
+ xlrec.nChain++;
+ }
+ prevLive = j;
+ interveningDeletable = false;
+ }
+
+ j = lt->nextOffset;
+ }
+
+ if (prevLive == InvalidOffsetNumber)
+ {
+ /* The chain is entirely removable, so we need a DEAD tuple */
+ toDead[xlrec.nDead] = i;
+ xlrec.nDead++;
+ }
+ else if (interveningDeletable)
+ {
+ /* One or more deletions at end of chain, so close it off */
+ chainSrc[xlrec.nChain] = prevLive;
+ chainDest[xlrec.nChain] = InvalidOffsetNumber;
+ xlrec.nChain++;
+ }
+ }
+
+ /* sanity check ... */
+ if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
+ elog(ERROR, "inconsistent counts of deletable tuples");
+
+ /* Prepare WAL record */
+ xlrec.node = index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(buffer);
+ STORE_STATE(&bds->spgstate, xlrec.stateSrc);
+
+ ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
+ /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
+ ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1);
+ ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2);
+ ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3);
+ ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4);
+ ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5);
+ ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6);
+ ACCEPT_RDATA_BUFFER(buffer, 7);
+
+ /* Do the updates */
+ START_CRIT_SECTION();
+
+ spgPageIndexMultiDelete(&bds->spgstate, page,
+ toDead, xlrec.nDead,
+ SPGIST_DEAD, SPGIST_DEAD,
+ InvalidBlockNumber, InvalidOffsetNumber);
+
+ spgPageIndexMultiDelete(&bds->spgstate, page,
+ toPlaceholder, xlrec.nPlaceholder,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber, InvalidOffsetNumber);
+
+ /*
+ * We implement the move step by swapping the item pointers of the
+ * source and target tuples, then replacing the newly-source tuples
+ * with placeholders. This is perhaps unduly friendly with the page
+ * data representation, but it's fast and doesn't risk page overflow
+ * when a tuple to be relocated is large.
+ */
+ for (i = 0; i < xlrec.nMove; i++)
+ {
+ ItemId idSrc = PageGetItemId(page, moveSrc[i]);
+ ItemId idDest = PageGetItemId(page, moveDest[i]);
+ ItemIdData tmp;
+
+ tmp = *idSrc;
+ *idSrc = *idDest;
+ *idDest = tmp;
+ }
+
+ spgPageIndexMultiDelete(&bds->spgstate, page,
+ moveSrc, xlrec.nMove,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber, InvalidOffsetNumber);
+
+ for (i = 0; i < xlrec.nChain; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, chainSrc[i]));
+ Assert(lt->tupstate == SPGIST_LIVE);
+ lt->nextOffset = chainDest[i];
+ }
+
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Vacuum the root page when it is a leaf
+ *
+ * On the root, we just delete any dead leaf tuples; no fancy business
+ */
+static void
+vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
+{
+ Page page = BufferGetPage(buffer);
+ spgxlogVacuumRoot xlrec;
+ XLogRecData rdata[3];
+ OffsetNumber toDelete[MaxIndexTuplesPerPage];
+ OffsetNumber i,
+ max = PageGetMaxOffsetNumber(page);
+
+ xlrec.nDelete = 0;
+
+ /* Scan page, identify tuples to delete, accumulate stats */
+ for (i = FirstOffsetNumber; i <= max; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, i));
+ if (lt->tupstate == SPGIST_LIVE)
+ {
+ Assert(ItemPointerIsValid(&lt->heapPtr));
+
+ if (bds->callback(&lt->heapPtr, bds->callback_state))
+ {
+ bds->stats->tuples_removed += 1;
+ toDelete[xlrec.nDelete] = i;
+ xlrec.nDelete++;
+ }
+ else
+ {
+ bds->stats->num_index_tuples += 1;
+ }
+ }
+ else
+ {
+ /* all tuples on root should be live */
+ elog(ERROR, "unexpected SPGiST tuple state: %d",
+ lt->tupstate);
+ }
+ }
+
+ if (xlrec.nDelete == 0)
+ return; /* nothing more to do */
+
+ /* Prepare WAL record */
+ xlrec.node = index->rd_node;
+ STORE_STATE(&bds->spgstate, xlrec.stateSrc);
+
+ ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
+ /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
+ ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1);
+ ACCEPT_RDATA_BUFFER(buffer, 2);
+
+ /* Do the update */
+ START_CRIT_SECTION();
+
+ /* The tuple numbers are in order, so we can use PageIndexMultiDelete */
+ PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
+
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Clean up redirect and placeholder tuples on the given page
+ *
+ * Redirect tuples can be marked placeholder once they're old enough.
+ * Placeholder tuples can be removed if it won't change the offsets of
+ * non-placeholder ones.
+ *
+ * Unlike the routines above, this works on both leaf and inner pages.
+ */
+static void
+vacuumRedirectAndPlaceholder(Relation index, Buffer buffer,
+ TransactionId OldestXmin)
+{
+ Page page = BufferGetPage(buffer);
+ SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
+ OffsetNumber i,
+ max = PageGetMaxOffsetNumber(page),
+ firstPlaceholder = InvalidOffsetNumber;
+ bool hasNonPlaceholder = false;
+ bool hasUpdate = false;
+ OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
+ OffsetNumber itemnos[MaxIndexTuplesPerPage];
+ spgxlogVacuumRedirect xlrec;
+ XLogRecData rdata[3];
+
+ xlrec.node = index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(buffer);
+ xlrec.nToPlaceholder = 0;
+
+ START_CRIT_SECTION();
+
+ /*
+ * Scan backwards to convert old redirection tuples to placeholder tuples,
+ * and identify location of last non-placeholder tuple while at it.
+ */
+ for (i = max;
+ i >= FirstOffsetNumber &&
+ (opaque->nRedirection > 0 || !hasNonPlaceholder);
+ i--)
+ {
+ SpGistDeadTuple dt;
+
+ dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
+
+ if (dt->tupstate == SPGIST_REDIRECT &&
+ TransactionIdPrecedes(dt->xid, OldestXmin))
+ {
+ dt->tupstate = SPGIST_PLACEHOLDER;
+ Assert(opaque->nRedirection > 0);
+ opaque->nRedirection--;
+ opaque->nPlaceholder++;
+
+ ItemPointerSetInvalid(&dt->pointer);
+
+ itemToPlaceholder[xlrec.nToPlaceholder] = i;
+ xlrec.nToPlaceholder++;
+
+ hasUpdate = true;
+ }
+
+ if (dt->tupstate == SPGIST_PLACEHOLDER)
+ {
+ if (!hasNonPlaceholder)
+ firstPlaceholder = i;
+ }
+ else
+ {
+ hasNonPlaceholder = true;
+ }
+ }
+
+ /*
+ * Any placeholder tuples at the end of page can safely be removed. We
+ * can't remove ones before the last non-placeholder, though, because we
+ * can't alter the offset numbers of non-placeholder tuples.
+ */
+ if (firstPlaceholder != InvalidOffsetNumber)
+ {
+ /*
+ * We do not store this array to rdata because it's easy to recreate.
+ */
+ for (i = firstPlaceholder; i <= max; i++)
+ itemnos[i - firstPlaceholder] = i;
+
+ i = max - firstPlaceholder + 1;
+ Assert(opaque->nPlaceholder >= i);
+ opaque->nPlaceholder -= i;
+
+ /* The array is surely sorted, so can use PageIndexMultiDelete */
+ PageIndexMultiDelete(page, itemnos, i);
+
+ hasUpdate = true;
+ }
+
+ xlrec.firstPlaceholder = firstPlaceholder;
+
+ if (hasUpdate)
+ MarkBufferDirty(buffer);
+
+ if (hasUpdate && RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
+ ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1);
+ ACCEPT_RDATA_BUFFER(buffer, 2);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Process one page during a bulkdelete scan
+ */
+static void
+spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
+{
+ Relation index = bds->info->index;
+ Buffer buffer;
+ Page page;
+
+ /* call vacuum_delay_point while not holding any buffer lock */
+ vacuum_delay_point();
+
+ buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, bds->info->strategy);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = (Page) BufferGetPage(buffer);
+
+ if (PageIsNew(page))
+ {
+ /*
+ * We found an all-zero page, which could happen if the database
+ * crashed just after extending the file. Initialize and recycle it.
+ */
+ SpGistInitBuffer(buffer, 0);
+ SpGistPageSetDeleted(page);
+ /* We don't bother to WAL-log this action; easy to redo */
+ MarkBufferDirty(buffer);
+ }
+ else if (SpGistPageIsDeleted(page))
+ {
+ /* nothing to do */
+ }
+ else if (SpGistPageIsLeaf(page))
+ {
+ if (blkno == SPGIST_HEAD_BLKNO)
+ {
+ vacuumLeafRoot(bds, index, buffer);
+ /* no need for vacuumRedirectAndPlaceholder */
+ }
+ else
+ {
+ vacuumLeafPage(bds, index, buffer);
+ vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin);
+ }
+ }
+ else
+ {
+ /* inner page */
+ vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin);
+ }
+
+ /*
+ * The root page must never be deleted, nor marked as available in FSM,
+ * because we don't want it ever returned by a search for a place to
+ * put a new tuple. Otherwise, check for empty/deletable page, and
+ * make sure FSM knows about it.
+ */
+ if (blkno != SPGIST_HEAD_BLKNO)
+ {
+ /* If page is now empty, mark it deleted */
+ if (PageIsEmpty(page) && !SpGistPageIsDeleted(page))
+ {
+ SpGistPageSetDeleted(page);
+ /* We don't bother to WAL-log this action; easy to redo */
+ MarkBufferDirty(buffer);
+ }
+
+ if (SpGistPageIsDeleted(page))
+ {
+ RecordFreeIndexPage(index, blkno);
+ bds->stats->pages_deleted++;
+ }
+ else
+ bds->lastFilledBlock = blkno;
+ }
+
+ SpGistSetLastUsedPage(index, buffer);
+
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Perform a bulkdelete scan
+ */
+static void
+spgvacuumscan(spgBulkDeleteState *bds)
+{
+ Relation index = bds->info->index;
+ bool needLock;
+ BlockNumber num_pages,
+ blkno;
+
+ /* Finish setting up spgBulkDeleteState */
+ initSpGistState(&bds->spgstate, index);
+ bds->OldestXmin = GetOldestXmin(true, false);
+ bds->lastFilledBlock = SPGIST_HEAD_BLKNO;
+
+ /*
+ * Reset counts that will be incremented during the scan; needed in case
+ * of multiple scans during a single VACUUM command
+ */
+ bds->stats->estimated_count = false;
+ bds->stats->num_index_tuples = 0;
+ bds->stats->pages_deleted = 0;
+
+ /* We can skip locking for new or temp relations */
+ needLock = !RELATION_IS_LOCAL(index);
+
+ /*
+ * The outer loop iterates over all index pages except the metapage, in
+ * physical order (we hope the kernel will cooperate in providing
+ * read-ahead for speed). It is critical that we visit all leaf pages,
+ * including ones added after we start the scan, else we might fail to
+ * delete some deletable tuples. See more extensive comments about
+ * this in btvacuumscan().
+ */
+ blkno = SPGIST_HEAD_BLKNO;
+ for (;;)
+ {
+ /* Get the current relation length */
+ if (needLock)
+ LockRelationForExtension(index, ExclusiveLock);
+ num_pages = RelationGetNumberOfBlocks(index);
+ if (needLock)
+ UnlockRelationForExtension(index, ExclusiveLock);
+
+ /* Quit if we've scanned the whole relation */
+ if (blkno >= num_pages)
+ break;
+ /* Iterate over pages, then loop back to recheck length */
+ for (; blkno < num_pages; blkno++)
+ {
+ spgvacuumpage(bds, blkno);
+ }
+ }
+
+ /* Propagate local lastUsedPage cache to metablock */
+ SpGistUpdateMetaPage(index);
+
+ /*
+ * Truncate index if possible
+ *
+ * XXX disabled because it's unsafe due to possible concurrent inserts.
+ * We'd have to rescan the pages to make sure they're still empty, and it
+ * doesn't seem worth it. Note that btree doesn't do this either.
+ */
+#ifdef NOT_USED
+ if (num_pages > bds->lastFilledBlock + 1)
+ {
+ BlockNumber lastBlock = num_pages - 1;
+
+ num_pages = bds->lastFilledBlock + 1;
+ RelationTruncate(index, num_pages);
+ bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
+ bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
+ }
+#endif
+
+ /* Report final stats */
+ bds->stats->num_pages = num_pages;
+ bds->stats->pages_free = bds->stats->pages_deleted;
+}
+
+/*
+ * Bulk deletion of all index entries pointing to a set of heap tuples.
+ * The set of target tuples is specified via a callback routine that tells
+ * whether any given heap tuple (identified by ItemPointer) is being deleted.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+Datum
+spgbulkdelete(PG_FUNCTION_ARGS)
+{
+ IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
+ void *callback_state = (void *) PG_GETARG_POINTER(3);
+ spgBulkDeleteState bds;
+
+ /* allocate stats if first time through, else re-use existing struct */
+ if (stats == NULL)
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ bds.info = info;
+ bds.stats = stats;
+ bds.callback = callback;
+ bds.callback_state = callback_state;
+
+ spgvacuumscan(&bds);
+
+ PG_RETURN_POINTER(stats);
+}
+
+/* Dummy callback to delete no tuples during spgvacuumcleanup */
+static bool
+dummy_callback(ItemPointer itemptr, void *state)
+{
+ return false;
+}
+
+/*
+ * Post-VACUUM cleanup.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+Datum
+spgvacuumcleanup(PG_FUNCTION_ARGS)
+{
+ IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+ Relation index = info->index;
+ spgBulkDeleteState bds;
+
+ /* No-op in ANALYZE ONLY mode */
+ if (info->analyze_only)
+ PG_RETURN_POINTER(stats);
+
+ /*
+ * We don't need to scan the index if there was a preceding bulkdelete
+ * pass. Otherwise, make a pass that won't delete any live tuples, but
+ * might still accomplish useful stuff with redirect/placeholder cleanup,
+ * and in any case will provide stats.
+ */
+ if (stats == NULL)
+ {
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ bds.info = info;
+ bds.stats = stats;
+ bds.callback = dummy_callback;
+ bds.callback_state = NULL;
+
+ spgvacuumscan(&bds);
+ }
+
+ /* Finally, vacuum the FSM */
+ IndexFreeSpaceMapVacuum(index);
+
+ /*
+ * It's quite possible for us to be fooled by concurrent page splits into
+ * double-counting some index tuples, so disbelieve any total that exceeds
+ * the underlying heap's count ... if we know that accurately. Otherwise
+ * this might just make matters worse.
+ */
+ if (!info->estimated_count)
+ {
+ if (stats->num_index_tuples > info->num_heap_tuples)
+ stats->num_index_tuples = info->num_heap_tuples;
+ }
+
+ PG_RETURN_POINTER(stats);
+}