summaryrefslogtreecommitdiff
path: root/src/backend/access/spgist/spgvacuum.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/spgist/spgvacuum.c')
-rw-r--r--src/backend/access/spgist/spgvacuum.c755
1 files changed, 755 insertions, 0 deletions
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
new file mode 100644
index 00000000000..90d59920eb6
--- /dev/null
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -0,0 +1,755 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgvacuum.c
+ * vacuum for SP-GiST
+ *
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/spgist/spgvacuum.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/spgist_private.h"
+#include "access/transam.h"
+#include "catalog/storage.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/indexfsm.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+
+
+/* local state for vacuum operations */
+typedef struct spgBulkDeleteState
+{
+ /* Parameters passed in to spgvacuumscan */
+ IndexVacuumInfo *info;
+ IndexBulkDeleteResult *stats;
+ IndexBulkDeleteCallback callback;
+ void *callback_state;
+ /* Additional working state */
+ SpGistState spgstate;
+ TransactionId OldestXmin;
+ BlockNumber lastFilledBlock;
+} spgBulkDeleteState;
+
+
+/*
+ * Vacuum a regular (non-root) leaf page
+ *
+ * We must delete tuples that are targeted for deletion by the VACUUM,
+ * but not move any tuples that are referenced by outside links; we assume
+ * those are the ones that are heads of chains.
+ */
+static void
+vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer)
+{
+ Page page = BufferGetPage(buffer);
+ spgxlogVacuumLeaf xlrec;
+ XLogRecData rdata[8];
+ OffsetNumber toDead[MaxIndexTuplesPerPage];
+ OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
+ OffsetNumber moveSrc[MaxIndexTuplesPerPage];
+ OffsetNumber moveDest[MaxIndexTuplesPerPage];
+ OffsetNumber chainSrc[MaxIndexTuplesPerPage];
+ OffsetNumber chainDest[MaxIndexTuplesPerPage];
+ OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
+ bool deletable[MaxIndexTuplesPerPage + 1];
+ int nDeletable;
+ OffsetNumber i,
+ max = PageGetMaxOffsetNumber(page);
+
+ memset(predecessor, 0, sizeof(predecessor));
+ memset(deletable, 0, sizeof(deletable));
+ nDeletable = 0;
+
+ /* Scan page, identify tuples to delete, accumulate stats */
+ for (i = FirstOffsetNumber; i <= max; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, i));
+ if (lt->tupstate == SPGIST_LIVE)
+ {
+ Assert(ItemPointerIsValid(&lt->heapPtr));
+
+ if (bds->callback(&lt->heapPtr, bds->callback_state))
+ {
+ bds->stats->tuples_removed += 1;
+ deletable[i] = true;
+ nDeletable++;
+ }
+ else
+ {
+ bds->stats->num_index_tuples += 1;
+ }
+
+ /* Form predecessor map, too */
+ if (lt->nextOffset != InvalidOffsetNumber)
+ {
+ /* paranoia about corrupted chain links */
+ if (lt->nextOffset < FirstOffsetNumber ||
+ lt->nextOffset > max ||
+ predecessor[lt->nextOffset] != InvalidOffsetNumber)
+ elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
+ BufferGetBlockNumber(buffer),
+ RelationGetRelationName(index));
+ predecessor[lt->nextOffset] = i;
+ }
+ }
+ else
+ {
+ Assert(lt->nextOffset == InvalidOffsetNumber);
+ }
+ }
+
+ if (nDeletable == 0)
+ return; /* nothing more to do */
+
+ /*----------
+ * Figure out exactly what we have to do. We do this separately from
+ * actually modifying the page, mainly so that we have a representation
+ * that can be dumped into WAL and then the replay code can do exactly
+ * the same thing. The output of this step consists of six arrays
+ * describing four kinds of operations, to be performed in this order:
+ *
+ * toDead[]: tuple numbers to be replaced with DEAD tuples
+ * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
+ * moveSrc[]: tuple numbers that need to be relocated to another offset
+ * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
+ * moveDest[]: new locations for moveSrc tuples
+ * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
+ * chainDest[]: new values of nextOffset for chainSrc members
+ *
+ * It's easiest to figure out what we have to do by processing tuple
+ * chains, so we iterate over all the tuples (not just the deletable
+ * ones!) to identify chain heads, then chase down each chain and make
+ * work item entries for deletable tuples within the chain.
+ *----------
+ */
+ xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
+
+ for (i = FirstOffsetNumber; i <= max; i++)
+ {
+ SpGistLeafTuple head;
+ bool interveningDeletable;
+ OffsetNumber prevLive;
+ OffsetNumber j;
+
+ head = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, i));
+ if (head->tupstate != SPGIST_LIVE)
+ continue; /* can't be a chain member */
+ if (predecessor[i] != 0)
+ continue; /* not a chain head */
+
+ /* initialize ... */
+ interveningDeletable = false;
+ prevLive = deletable[i] ? InvalidOffsetNumber : i;
+
+ /* scan down the chain ... */
+ j = head->nextOffset;
+ while (j != InvalidOffsetNumber)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, j));
+ if (lt->tupstate != SPGIST_LIVE)
+ {
+ /* all tuples in chain should be live */
+ elog(ERROR, "unexpected SPGiST tuple state: %d",
+ lt->tupstate);
+ }
+
+ if (deletable[j])
+ {
+ /* This tuple should be replaced by a placeholder */
+ toPlaceholder[xlrec.nPlaceholder] = j;
+ xlrec.nPlaceholder++;
+ /* previous live tuple's chain link will need an update */
+ interveningDeletable = true;
+ }
+ else if (prevLive == InvalidOffsetNumber)
+ {
+ /*
+ * This is the first live tuple in the chain. It has
+ * to move to the head position.
+ */
+ moveSrc[xlrec.nMove] = j;
+ moveDest[xlrec.nMove] = i;
+ xlrec.nMove++;
+ /* Chain updates will be applied after the move */
+ prevLive = i;
+ interveningDeletable = false;
+ }
+ else
+ {
+ /*
+ * Second or later live tuple. Arrange to re-chain it to the
+ * previous live one, if there was a gap.
+ */
+ if (interveningDeletable)
+ {
+ chainSrc[xlrec.nChain] = prevLive;
+ chainDest[xlrec.nChain] = j;
+ xlrec.nChain++;
+ }
+ prevLive = j;
+ interveningDeletable = false;
+ }
+
+ j = lt->nextOffset;
+ }
+
+ if (prevLive == InvalidOffsetNumber)
+ {
+ /* The chain is entirely removable, so we need a DEAD tuple */
+ toDead[xlrec.nDead] = i;
+ xlrec.nDead++;
+ }
+ else if (interveningDeletable)
+ {
+ /* One or more deletions at end of chain, so close it off */
+ chainSrc[xlrec.nChain] = prevLive;
+ chainDest[xlrec.nChain] = InvalidOffsetNumber;
+ xlrec.nChain++;
+ }
+ }
+
+ /* sanity check ... */
+ if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
+ elog(ERROR, "inconsistent counts of deletable tuples");
+
+ /* Prepare WAL record */
+ xlrec.node = index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(buffer);
+ STORE_STATE(&bds->spgstate, xlrec.stateSrc);
+
+ ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
+ /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
+ ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1);
+ ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2);
+ ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3);
+ ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4);
+ ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5);
+ ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6);
+ ACCEPT_RDATA_BUFFER(buffer, 7);
+
+ /* Do the updates */
+ START_CRIT_SECTION();
+
+ spgPageIndexMultiDelete(&bds->spgstate, page,
+ toDead, xlrec.nDead,
+ SPGIST_DEAD, SPGIST_DEAD,
+ InvalidBlockNumber, InvalidOffsetNumber);
+
+ spgPageIndexMultiDelete(&bds->spgstate, page,
+ toPlaceholder, xlrec.nPlaceholder,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber, InvalidOffsetNumber);
+
+ /*
+ * We implement the move step by swapping the item pointers of the
+ * source and target tuples, then replacing the newly-source tuples
+ * with placeholders. This is perhaps unduly friendly with the page
+ * data representation, but it's fast and doesn't risk page overflow
+ * when a tuple to be relocated is large.
+ */
+ for (i = 0; i < xlrec.nMove; i++)
+ {
+ ItemId idSrc = PageGetItemId(page, moveSrc[i]);
+ ItemId idDest = PageGetItemId(page, moveDest[i]);
+ ItemIdData tmp;
+
+ tmp = *idSrc;
+ *idSrc = *idDest;
+ *idDest = tmp;
+ }
+
+ spgPageIndexMultiDelete(&bds->spgstate, page,
+ moveSrc, xlrec.nMove,
+ SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
+ InvalidBlockNumber, InvalidOffsetNumber);
+
+ for (i = 0; i < xlrec.nChain; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, chainSrc[i]));
+ Assert(lt->tupstate == SPGIST_LIVE);
+ lt->nextOffset = chainDest[i];
+ }
+
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Vacuum the root page when it is a leaf
+ *
+ * On the root, we just delete any dead leaf tuples; no fancy business
+ */
+static void
+vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
+{
+ Page page = BufferGetPage(buffer);
+ spgxlogVacuumRoot xlrec;
+ XLogRecData rdata[3];
+ OffsetNumber toDelete[MaxIndexTuplesPerPage];
+ OffsetNumber i,
+ max = PageGetMaxOffsetNumber(page);
+
+ xlrec.nDelete = 0;
+
+ /* Scan page, identify tuples to delete, accumulate stats */
+ for (i = FirstOffsetNumber; i <= max; i++)
+ {
+ SpGistLeafTuple lt;
+
+ lt = (SpGistLeafTuple) PageGetItem(page,
+ PageGetItemId(page, i));
+ if (lt->tupstate == SPGIST_LIVE)
+ {
+ Assert(ItemPointerIsValid(&lt->heapPtr));
+
+ if (bds->callback(&lt->heapPtr, bds->callback_state))
+ {
+ bds->stats->tuples_removed += 1;
+ toDelete[xlrec.nDelete] = i;
+ xlrec.nDelete++;
+ }
+ else
+ {
+ bds->stats->num_index_tuples += 1;
+ }
+ }
+ else
+ {
+ /* all tuples on root should be live */
+ elog(ERROR, "unexpected SPGiST tuple state: %d",
+ lt->tupstate);
+ }
+ }
+
+ if (xlrec.nDelete == 0)
+ return; /* nothing more to do */
+
+ /* Prepare WAL record */
+ xlrec.node = index->rd_node;
+ STORE_STATE(&bds->spgstate, xlrec.stateSrc);
+
+ ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
+ /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
+ ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1);
+ ACCEPT_RDATA_BUFFER(buffer, 2);
+
+ /* Do the update */
+ START_CRIT_SECTION();
+
+ /* The tuple numbers are in order, so we can use PageIndexMultiDelete */
+ PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
+
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Clean up redirect and placeholder tuples on the given page
+ *
+ * Redirect tuples can be marked placeholder once they're old enough.
+ * Placeholder tuples can be removed if it won't change the offsets of
+ * non-placeholder ones.
+ *
+ * Unlike the routines above, this works on both leaf and inner pages.
+ */
+static void
+vacuumRedirectAndPlaceholder(Relation index, Buffer buffer,
+ TransactionId OldestXmin)
+{
+ Page page = BufferGetPage(buffer);
+ SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
+ OffsetNumber i,
+ max = PageGetMaxOffsetNumber(page),
+ firstPlaceholder = InvalidOffsetNumber;
+ bool hasNonPlaceholder = false;
+ bool hasUpdate = false;
+ OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
+ OffsetNumber itemnos[MaxIndexTuplesPerPage];
+ spgxlogVacuumRedirect xlrec;
+ XLogRecData rdata[3];
+
+ xlrec.node = index->rd_node;
+ xlrec.blkno = BufferGetBlockNumber(buffer);
+ xlrec.nToPlaceholder = 0;
+
+ START_CRIT_SECTION();
+
+ /*
+ * Scan backwards to convert old redirection tuples to placeholder tuples,
+ * and identify location of last non-placeholder tuple while at it.
+ */
+ for (i = max;
+ i >= FirstOffsetNumber &&
+ (opaque->nRedirection > 0 || !hasNonPlaceholder);
+ i--)
+ {
+ SpGistDeadTuple dt;
+
+ dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
+
+ if (dt->tupstate == SPGIST_REDIRECT &&
+ TransactionIdPrecedes(dt->xid, OldestXmin))
+ {
+ dt->tupstate = SPGIST_PLACEHOLDER;
+ Assert(opaque->nRedirection > 0);
+ opaque->nRedirection--;
+ opaque->nPlaceholder++;
+
+ ItemPointerSetInvalid(&dt->pointer);
+
+ itemToPlaceholder[xlrec.nToPlaceholder] = i;
+ xlrec.nToPlaceholder++;
+
+ hasUpdate = true;
+ }
+
+ if (dt->tupstate == SPGIST_PLACEHOLDER)
+ {
+ if (!hasNonPlaceholder)
+ firstPlaceholder = i;
+ }
+ else
+ {
+ hasNonPlaceholder = true;
+ }
+ }
+
+ /*
+ * Any placeholder tuples at the end of page can safely be removed. We
+ * can't remove ones before the last non-placeholder, though, because we
+ * can't alter the offset numbers of non-placeholder tuples.
+ */
+ if (firstPlaceholder != InvalidOffsetNumber)
+ {
+ /*
+ * We do not store this array to rdata because it's easy to recreate.
+ */
+ for (i = firstPlaceholder; i <= max; i++)
+ itemnos[i - firstPlaceholder] = i;
+
+ i = max - firstPlaceholder + 1;
+ Assert(opaque->nPlaceholder >= i);
+ opaque->nPlaceholder -= i;
+
+ /* The array is surely sorted, so can use PageIndexMultiDelete */
+ PageIndexMultiDelete(page, itemnos, i);
+
+ hasUpdate = true;
+ }
+
+ xlrec.firstPlaceholder = firstPlaceholder;
+
+ if (hasUpdate)
+ MarkBufferDirty(buffer);
+
+ if (hasUpdate && RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
+ ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1);
+ ACCEPT_RDATA_BUFFER(buffer, 2);
+
+ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata);
+
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Process one page during a bulkdelete scan
+ */
+static void
+spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
+{
+ Relation index = bds->info->index;
+ Buffer buffer;
+ Page page;
+
+ /* call vacuum_delay_point while not holding any buffer lock */
+ vacuum_delay_point();
+
+ buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, bds->info->strategy);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = (Page) BufferGetPage(buffer);
+
+ if (PageIsNew(page))
+ {
+ /*
+ * We found an all-zero page, which could happen if the database
+ * crashed just after extending the file. Initialize and recycle it.
+ */
+ SpGistInitBuffer(buffer, 0);
+ SpGistPageSetDeleted(page);
+ /* We don't bother to WAL-log this action; easy to redo */
+ MarkBufferDirty(buffer);
+ }
+ else if (SpGistPageIsDeleted(page))
+ {
+ /* nothing to do */
+ }
+ else if (SpGistPageIsLeaf(page))
+ {
+ if (blkno == SPGIST_HEAD_BLKNO)
+ {
+ vacuumLeafRoot(bds, index, buffer);
+ /* no need for vacuumRedirectAndPlaceholder */
+ }
+ else
+ {
+ vacuumLeafPage(bds, index, buffer);
+ vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin);
+ }
+ }
+ else
+ {
+ /* inner page */
+ vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin);
+ }
+
+ /*
+ * The root page must never be deleted, nor marked as available in FSM,
+ * because we don't want it ever returned by a search for a place to
+ * put a new tuple. Otherwise, check for empty/deletable page, and
+ * make sure FSM knows about it.
+ */
+ if (blkno != SPGIST_HEAD_BLKNO)
+ {
+ /* If page is now empty, mark it deleted */
+ if (PageIsEmpty(page) && !SpGistPageIsDeleted(page))
+ {
+ SpGistPageSetDeleted(page);
+ /* We don't bother to WAL-log this action; easy to redo */
+ MarkBufferDirty(buffer);
+ }
+
+ if (SpGistPageIsDeleted(page))
+ {
+ RecordFreeIndexPage(index, blkno);
+ bds->stats->pages_deleted++;
+ }
+ else
+ bds->lastFilledBlock = blkno;
+ }
+
+ SpGistSetLastUsedPage(index, buffer);
+
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Perform a bulkdelete scan
+ */
+static void
+spgvacuumscan(spgBulkDeleteState *bds)
+{
+ Relation index = bds->info->index;
+ bool needLock;
+ BlockNumber num_pages,
+ blkno;
+
+ /* Finish setting up spgBulkDeleteState */
+ initSpGistState(&bds->spgstate, index);
+ bds->OldestXmin = GetOldestXmin(true, false);
+ bds->lastFilledBlock = SPGIST_HEAD_BLKNO;
+
+ /*
+ * Reset counts that will be incremented during the scan; needed in case
+ * of multiple scans during a single VACUUM command
+ */
+ bds->stats->estimated_count = false;
+ bds->stats->num_index_tuples = 0;
+ bds->stats->pages_deleted = 0;
+
+ /* We can skip locking for new or temp relations */
+ needLock = !RELATION_IS_LOCAL(index);
+
+ /*
+ * The outer loop iterates over all index pages except the metapage, in
+ * physical order (we hope the kernel will cooperate in providing
+ * read-ahead for speed). It is critical that we visit all leaf pages,
+ * including ones added after we start the scan, else we might fail to
+ * delete some deletable tuples. See more extensive comments about
+ * this in btvacuumscan().
+ */
+ blkno = SPGIST_HEAD_BLKNO;
+ for (;;)
+ {
+ /* Get the current relation length */
+ if (needLock)
+ LockRelationForExtension(index, ExclusiveLock);
+ num_pages = RelationGetNumberOfBlocks(index);
+ if (needLock)
+ UnlockRelationForExtension(index, ExclusiveLock);
+
+ /* Quit if we've scanned the whole relation */
+ if (blkno >= num_pages)
+ break;
+ /* Iterate over pages, then loop back to recheck length */
+ for (; blkno < num_pages; blkno++)
+ {
+ spgvacuumpage(bds, blkno);
+ }
+ }
+
+ /* Propagate local lastUsedPage cache to metablock */
+ SpGistUpdateMetaPage(index);
+
+ /*
+ * Truncate index if possible
+ *
+ * XXX disabled because it's unsafe due to possible concurrent inserts.
+ * We'd have to rescan the pages to make sure they're still empty, and it
+ * doesn't seem worth it. Note that btree doesn't do this either.
+ */
+#ifdef NOT_USED
+ if (num_pages > bds->lastFilledBlock + 1)
+ {
+ BlockNumber lastBlock = num_pages - 1;
+
+ num_pages = bds->lastFilledBlock + 1;
+ RelationTruncate(index, num_pages);
+ bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
+ bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
+ }
+#endif
+
+ /* Report final stats */
+ bds->stats->num_pages = num_pages;
+ bds->stats->pages_free = bds->stats->pages_deleted;
+}
+
+/*
+ * Bulk deletion of all index entries pointing to a set of heap tuples.
+ * The set of target tuples is specified via a callback routine that tells
+ * whether any given heap tuple (identified by ItemPointer) is being deleted.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+Datum
+spgbulkdelete(PG_FUNCTION_ARGS)
+{
+ IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+ IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
+ void *callback_state = (void *) PG_GETARG_POINTER(3);
+ spgBulkDeleteState bds;
+
+ /* allocate stats if first time through, else re-use existing struct */
+ if (stats == NULL)
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ bds.info = info;
+ bds.stats = stats;
+ bds.callback = callback;
+ bds.callback_state = callback_state;
+
+ spgvacuumscan(&bds);
+
+ PG_RETURN_POINTER(stats);
+}
+
+/* Dummy callback to delete no tuples during spgvacuumcleanup */
+static bool
+dummy_callback(ItemPointer itemptr, void *state)
+{
+ return false;
+}
+
+/*
+ * Post-VACUUM cleanup.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+Datum
+spgvacuumcleanup(PG_FUNCTION_ARGS)
+{
+ IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
+ IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
+ Relation index = info->index;
+ spgBulkDeleteState bds;
+
+ /* No-op in ANALYZE ONLY mode */
+ if (info->analyze_only)
+ PG_RETURN_POINTER(stats);
+
+ /*
+ * We don't need to scan the index if there was a preceding bulkdelete
+ * pass. Otherwise, make a pass that won't delete any live tuples, but
+ * might still accomplish useful stuff with redirect/placeholder cleanup,
+ * and in any case will provide stats.
+ */
+ if (stats == NULL)
+ {
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ bds.info = info;
+ bds.stats = stats;
+ bds.callback = dummy_callback;
+ bds.callback_state = NULL;
+
+ spgvacuumscan(&bds);
+ }
+
+ /* Finally, vacuum the FSM */
+ IndexFreeSpaceMapVacuum(index);
+
+ /*
+ * It's quite possible for us to be fooled by concurrent page splits into
+ * double-counting some index tuples, so disbelieve any total that exceeds
+ * the underlying heap's count ... if we know that accurately. Otherwise
+ * this might just make matters worse.
+ */
+ if (!info->estimated_count)
+ {
+ if (stats->num_index_tuples > info->num_heap_tuples)
+ stats->num_index_tuples = info->num_heap_tuples;
+ }
+
+ PG_RETURN_POINTER(stats);
+}