diff options
Diffstat (limited to 'src/backend/access/spgist/spgvacuum.c')
-rw-r--r-- | src/backend/access/spgist/spgvacuum.c | 755 |
1 files changed, 755 insertions, 0 deletions
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c new file mode 100644 index 00000000000..90d59920eb6 --- /dev/null +++ b/src/backend/access/spgist/spgvacuum.c @@ -0,0 +1,755 @@ +/*------------------------------------------------------------------------- + * + * spgvacuum.c + * vacuum for SP-GiST + * + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgvacuum.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/spgist_private.h" +#include "access/transam.h" +#include "catalog/storage.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/indexfsm.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" + + +/* local state for vacuum operations */ +typedef struct spgBulkDeleteState +{ + /* Parameters passed in to spgvacuumscan */ + IndexVacuumInfo *info; + IndexBulkDeleteResult *stats; + IndexBulkDeleteCallback callback; + void *callback_state; + /* Additional working state */ + SpGistState spgstate; + TransactionId OldestXmin; + BlockNumber lastFilledBlock; +} spgBulkDeleteState; + + +/* + * Vacuum a regular (non-root) leaf page + * + * We must delete tuples that are targeted for deletion by the VACUUM, + * but not move any tuples that are referenced by outside links; we assume + * those are the ones that are heads of chains. + */ +static void +vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer) +{ + Page page = BufferGetPage(buffer); + spgxlogVacuumLeaf xlrec; + XLogRecData rdata[8]; + OffsetNumber toDead[MaxIndexTuplesPerPage]; + OffsetNumber toPlaceholder[MaxIndexTuplesPerPage]; + OffsetNumber moveSrc[MaxIndexTuplesPerPage]; + OffsetNumber moveDest[MaxIndexTuplesPerPage]; + OffsetNumber chainSrc[MaxIndexTuplesPerPage]; + OffsetNumber chainDest[MaxIndexTuplesPerPage]; + OffsetNumber predecessor[MaxIndexTuplesPerPage + 1]; + bool deletable[MaxIndexTuplesPerPage + 1]; + int nDeletable; + OffsetNumber i, + max = PageGetMaxOffsetNumber(page); + + memset(predecessor, 0, sizeof(predecessor)); + memset(deletable, 0, sizeof(deletable)); + nDeletable = 0; + + /* Scan page, identify tuples to delete, accumulate stats */ + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, i)); + if (lt->tupstate == SPGIST_LIVE) + { + Assert(ItemPointerIsValid(<->heapPtr)); + + if (bds->callback(<->heapPtr, bds->callback_state)) + { + bds->stats->tuples_removed += 1; + deletable[i] = true; + nDeletable++; + } + else + { + bds->stats->num_index_tuples += 1; + } + + /* Form predecessor map, too */ + if (lt->nextOffset != InvalidOffsetNumber) + { + /* paranoia about corrupted chain links */ + if (lt->nextOffset < FirstOffsetNumber || + lt->nextOffset > max || + predecessor[lt->nextOffset] != InvalidOffsetNumber) + elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"", + BufferGetBlockNumber(buffer), + RelationGetRelationName(index)); + predecessor[lt->nextOffset] = i; + } + } + else + { + Assert(lt->nextOffset == InvalidOffsetNumber); + } + } + + if (nDeletable == 0) + return; /* nothing more to do */ + + /*---------- + * Figure out exactly what we have to do. We do this separately from + * actually modifying the page, mainly so that we have a representation + * that can be dumped into WAL and then the replay code can do exactly + * the same thing. The output of this step consists of six arrays + * describing four kinds of operations, to be performed in this order: + * + * toDead[]: tuple numbers to be replaced with DEAD tuples + * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples + * moveSrc[]: tuple numbers that need to be relocated to another offset + * (replacing the tuple there) and then replaced with PLACEHOLDER tuples + * moveDest[]: new locations for moveSrc tuples + * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates + * chainDest[]: new values of nextOffset for chainSrc members + * + * It's easiest to figure out what we have to do by processing tuple + * chains, so we iterate over all the tuples (not just the deletable + * ones!) to identify chain heads, then chase down each chain and make + * work item entries for deletable tuples within the chain. + *---------- + */ + xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0; + + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple head; + bool interveningDeletable; + OffsetNumber prevLive; + OffsetNumber j; + + head = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, i)); + if (head->tupstate != SPGIST_LIVE) + continue; /* can't be a chain member */ + if (predecessor[i] != 0) + continue; /* not a chain head */ + + /* initialize ... */ + interveningDeletable = false; + prevLive = deletable[i] ? InvalidOffsetNumber : i; + + /* scan down the chain ... */ + j = head->nextOffset; + while (j != InvalidOffsetNumber) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, j)); + if (lt->tupstate != SPGIST_LIVE) + { + /* all tuples in chain should be live */ + elog(ERROR, "unexpected SPGiST tuple state: %d", + lt->tupstate); + } + + if (deletable[j]) + { + /* This tuple should be replaced by a placeholder */ + toPlaceholder[xlrec.nPlaceholder] = j; + xlrec.nPlaceholder++; + /* previous live tuple's chain link will need an update */ + interveningDeletable = true; + } + else if (prevLive == InvalidOffsetNumber) + { + /* + * This is the first live tuple in the chain. It has + * to move to the head position. + */ + moveSrc[xlrec.nMove] = j; + moveDest[xlrec.nMove] = i; + xlrec.nMove++; + /* Chain updates will be applied after the move */ + prevLive = i; + interveningDeletable = false; + } + else + { + /* + * Second or later live tuple. Arrange to re-chain it to the + * previous live one, if there was a gap. + */ + if (interveningDeletable) + { + chainSrc[xlrec.nChain] = prevLive; + chainDest[xlrec.nChain] = j; + xlrec.nChain++; + } + prevLive = j; + interveningDeletable = false; + } + + j = lt->nextOffset; + } + + if (prevLive == InvalidOffsetNumber) + { + /* The chain is entirely removable, so we need a DEAD tuple */ + toDead[xlrec.nDead] = i; + xlrec.nDead++; + } + else if (interveningDeletable) + { + /* One or more deletions at end of chain, so close it off */ + chainSrc[xlrec.nChain] = prevLive; + chainDest[xlrec.nChain] = InvalidOffsetNumber; + xlrec.nChain++; + } + } + + /* sanity check ... */ + if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove) + elog(ERROR, "inconsistent counts of deletable tuples"); + + /* Prepare WAL record */ + xlrec.node = index->rd_node; + xlrec.blkno = BufferGetBlockNumber(buffer); + STORE_STATE(&bds->spgstate, xlrec.stateSrc); + + ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); + /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ + ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1); + ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2); + ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3); + ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4); + ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5); + ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6); + ACCEPT_RDATA_BUFFER(buffer, 7); + + /* Do the updates */ + START_CRIT_SECTION(); + + spgPageIndexMultiDelete(&bds->spgstate, page, + toDead, xlrec.nDead, + SPGIST_DEAD, SPGIST_DEAD, + InvalidBlockNumber, InvalidOffsetNumber); + + spgPageIndexMultiDelete(&bds->spgstate, page, + toPlaceholder, xlrec.nPlaceholder, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, InvalidOffsetNumber); + + /* + * We implement the move step by swapping the item pointers of the + * source and target tuples, then replacing the newly-source tuples + * with placeholders. This is perhaps unduly friendly with the page + * data representation, but it's fast and doesn't risk page overflow + * when a tuple to be relocated is large. + */ + for (i = 0; i < xlrec.nMove; i++) + { + ItemId idSrc = PageGetItemId(page, moveSrc[i]); + ItemId idDest = PageGetItemId(page, moveDest[i]); + ItemIdData tmp; + + tmp = *idSrc; + *idSrc = *idDest; + *idDest = tmp; + } + + spgPageIndexMultiDelete(&bds->spgstate, page, + moveSrc, xlrec.nMove, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, InvalidOffsetNumber); + + for (i = 0; i < xlrec.nChain; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, chainSrc[i])); + Assert(lt->tupstate == SPGIST_LIVE); + lt->nextOffset = chainDest[i]; + } + + MarkBufferDirty(buffer); + + if (RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata); + + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + + END_CRIT_SECTION(); +} + +/* + * Vacuum the root page when it is a leaf + * + * On the root, we just delete any dead leaf tuples; no fancy business + */ +static void +vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) +{ + Page page = BufferGetPage(buffer); + spgxlogVacuumRoot xlrec; + XLogRecData rdata[3]; + OffsetNumber toDelete[MaxIndexTuplesPerPage]; + OffsetNumber i, + max = PageGetMaxOffsetNumber(page); + + xlrec.nDelete = 0; + + /* Scan page, identify tuples to delete, accumulate stats */ + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, i)); + if (lt->tupstate == SPGIST_LIVE) + { + Assert(ItemPointerIsValid(<->heapPtr)); + + if (bds->callback(<->heapPtr, bds->callback_state)) + { + bds->stats->tuples_removed += 1; + toDelete[xlrec.nDelete] = i; + xlrec.nDelete++; + } + else + { + bds->stats->num_index_tuples += 1; + } + } + else + { + /* all tuples on root should be live */ + elog(ERROR, "unexpected SPGiST tuple state: %d", + lt->tupstate); + } + } + + if (xlrec.nDelete == 0) + return; /* nothing more to do */ + + /* Prepare WAL record */ + xlrec.node = index->rd_node; + STORE_STATE(&bds->spgstate, xlrec.stateSrc); + + ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); + /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ + ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1); + ACCEPT_RDATA_BUFFER(buffer, 2); + + /* Do the update */ + START_CRIT_SECTION(); + + /* The tuple numbers are in order, so we can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, toDelete, xlrec.nDelete); + + MarkBufferDirty(buffer); + + if (RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata); + + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + + END_CRIT_SECTION(); +} + +/* + * Clean up redirect and placeholder tuples on the given page + * + * Redirect tuples can be marked placeholder once they're old enough. + * Placeholder tuples can be removed if it won't change the offsets of + * non-placeholder ones. + * + * Unlike the routines above, this works on both leaf and inner pages. + */ +static void +vacuumRedirectAndPlaceholder(Relation index, Buffer buffer, + TransactionId OldestXmin) +{ + Page page = BufferGetPage(buffer); + SpGistPageOpaque opaque = SpGistPageGetOpaque(page); + OffsetNumber i, + max = PageGetMaxOffsetNumber(page), + firstPlaceholder = InvalidOffsetNumber; + bool hasNonPlaceholder = false; + bool hasUpdate = false; + OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage]; + OffsetNumber itemnos[MaxIndexTuplesPerPage]; + spgxlogVacuumRedirect xlrec; + XLogRecData rdata[3]; + + xlrec.node = index->rd_node; + xlrec.blkno = BufferGetBlockNumber(buffer); + xlrec.nToPlaceholder = 0; + + START_CRIT_SECTION(); + + /* + * Scan backwards to convert old redirection tuples to placeholder tuples, + * and identify location of last non-placeholder tuple while at it. + */ + for (i = max; + i >= FirstOffsetNumber && + (opaque->nRedirection > 0 || !hasNonPlaceholder); + i--) + { + SpGistDeadTuple dt; + + dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i)); + + if (dt->tupstate == SPGIST_REDIRECT && + TransactionIdPrecedes(dt->xid, OldestXmin)) + { + dt->tupstate = SPGIST_PLACEHOLDER; + Assert(opaque->nRedirection > 0); + opaque->nRedirection--; + opaque->nPlaceholder++; + + ItemPointerSetInvalid(&dt->pointer); + + itemToPlaceholder[xlrec.nToPlaceholder] = i; + xlrec.nToPlaceholder++; + + hasUpdate = true; + } + + if (dt->tupstate == SPGIST_PLACEHOLDER) + { + if (!hasNonPlaceholder) + firstPlaceholder = i; + } + else + { + hasNonPlaceholder = true; + } + } + + /* + * Any placeholder tuples at the end of page can safely be removed. We + * can't remove ones before the last non-placeholder, though, because we + * can't alter the offset numbers of non-placeholder tuples. + */ + if (firstPlaceholder != InvalidOffsetNumber) + { + /* + * We do not store this array to rdata because it's easy to recreate. + */ + for (i = firstPlaceholder; i <= max; i++) + itemnos[i - firstPlaceholder] = i; + + i = max - firstPlaceholder + 1; + Assert(opaque->nPlaceholder >= i); + opaque->nPlaceholder -= i; + + /* The array is surely sorted, so can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, itemnos, i); + + hasUpdate = true; + } + + xlrec.firstPlaceholder = firstPlaceholder; + + if (hasUpdate) + MarkBufferDirty(buffer); + + if (hasUpdate && RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + + ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); + ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1); + ACCEPT_RDATA_BUFFER(buffer, 2); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata); + + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + + END_CRIT_SECTION(); +} + +/* + * Process one page during a bulkdelete scan + */ +static void +spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno) +{ + Relation index = bds->info->index; + Buffer buffer; + Page page; + + /* call vacuum_delay_point while not holding any buffer lock */ + vacuum_delay_point(); + + buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, + RBM_NORMAL, bds->info->strategy); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + if (PageIsNew(page)) + { + /* + * We found an all-zero page, which could happen if the database + * crashed just after extending the file. Initialize and recycle it. + */ + SpGistInitBuffer(buffer, 0); + SpGistPageSetDeleted(page); + /* We don't bother to WAL-log this action; easy to redo */ + MarkBufferDirty(buffer); + } + else if (SpGistPageIsDeleted(page)) + { + /* nothing to do */ + } + else if (SpGistPageIsLeaf(page)) + { + if (blkno == SPGIST_HEAD_BLKNO) + { + vacuumLeafRoot(bds, index, buffer); + /* no need for vacuumRedirectAndPlaceholder */ + } + else + { + vacuumLeafPage(bds, index, buffer); + vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin); + } + } + else + { + /* inner page */ + vacuumRedirectAndPlaceholder(index, buffer, bds->OldestXmin); + } + + /* + * The root page must never be deleted, nor marked as available in FSM, + * because we don't want it ever returned by a search for a place to + * put a new tuple. Otherwise, check for empty/deletable page, and + * make sure FSM knows about it. + */ + if (blkno != SPGIST_HEAD_BLKNO) + { + /* If page is now empty, mark it deleted */ + if (PageIsEmpty(page) && !SpGistPageIsDeleted(page)) + { + SpGistPageSetDeleted(page); + /* We don't bother to WAL-log this action; easy to redo */ + MarkBufferDirty(buffer); + } + + if (SpGistPageIsDeleted(page)) + { + RecordFreeIndexPage(index, blkno); + bds->stats->pages_deleted++; + } + else + bds->lastFilledBlock = blkno; + } + + SpGistSetLastUsedPage(index, buffer); + + UnlockReleaseBuffer(buffer); +} + +/* + * Perform a bulkdelete scan + */ +static void +spgvacuumscan(spgBulkDeleteState *bds) +{ + Relation index = bds->info->index; + bool needLock; + BlockNumber num_pages, + blkno; + + /* Finish setting up spgBulkDeleteState */ + initSpGistState(&bds->spgstate, index); + bds->OldestXmin = GetOldestXmin(true, false); + bds->lastFilledBlock = SPGIST_HEAD_BLKNO; + + /* + * Reset counts that will be incremented during the scan; needed in case + * of multiple scans during a single VACUUM command + */ + bds->stats->estimated_count = false; + bds->stats->num_index_tuples = 0; + bds->stats->pages_deleted = 0; + + /* We can skip locking for new or temp relations */ + needLock = !RELATION_IS_LOCAL(index); + + /* + * The outer loop iterates over all index pages except the metapage, in + * physical order (we hope the kernel will cooperate in providing + * read-ahead for speed). It is critical that we visit all leaf pages, + * including ones added after we start the scan, else we might fail to + * delete some deletable tuples. See more extensive comments about + * this in btvacuumscan(). + */ + blkno = SPGIST_HEAD_BLKNO; + for (;;) + { + /* Get the current relation length */ + if (needLock) + LockRelationForExtension(index, ExclusiveLock); + num_pages = RelationGetNumberOfBlocks(index); + if (needLock) + UnlockRelationForExtension(index, ExclusiveLock); + + /* Quit if we've scanned the whole relation */ + if (blkno >= num_pages) + break; + /* Iterate over pages, then loop back to recheck length */ + for (; blkno < num_pages; blkno++) + { + spgvacuumpage(bds, blkno); + } + } + + /* Propagate local lastUsedPage cache to metablock */ + SpGistUpdateMetaPage(index); + + /* + * Truncate index if possible + * + * XXX disabled because it's unsafe due to possible concurrent inserts. + * We'd have to rescan the pages to make sure they're still empty, and it + * doesn't seem worth it. Note that btree doesn't do this either. + */ +#ifdef NOT_USED + if (num_pages > bds->lastFilledBlock + 1) + { + BlockNumber lastBlock = num_pages - 1; + + num_pages = bds->lastFilledBlock + 1; + RelationTruncate(index, num_pages); + bds->stats->pages_removed += lastBlock - bds->lastFilledBlock; + bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock; + } +#endif + + /* Report final stats */ + bds->stats->num_pages = num_pages; + bds->stats->pages_free = bds->stats->pages_deleted; +} + +/* + * Bulk deletion of all index entries pointing to a set of heap tuples. + * The set of target tuples is specified via a callback routine that tells + * whether any given heap tuple (identified by ItemPointer) is being deleted. + * + * Result: a palloc'd struct containing statistical info for VACUUM displays. + */ +Datum +spgbulkdelete(PG_FUNCTION_ARGS) +{ + IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); + IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); + IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); + void *callback_state = (void *) PG_GETARG_POINTER(3); + spgBulkDeleteState bds; + + /* allocate stats if first time through, else re-use existing struct */ + if (stats == NULL) + stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + bds.info = info; + bds.stats = stats; + bds.callback = callback; + bds.callback_state = callback_state; + + spgvacuumscan(&bds); + + PG_RETURN_POINTER(stats); +} + +/* Dummy callback to delete no tuples during spgvacuumcleanup */ +static bool +dummy_callback(ItemPointer itemptr, void *state) +{ + return false; +} + +/* + * Post-VACUUM cleanup. + * + * Result: a palloc'd struct containing statistical info for VACUUM displays. + */ +Datum +spgvacuumcleanup(PG_FUNCTION_ARGS) +{ + IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); + IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); + Relation index = info->index; + spgBulkDeleteState bds; + + /* No-op in ANALYZE ONLY mode */ + if (info->analyze_only) + PG_RETURN_POINTER(stats); + + /* + * We don't need to scan the index if there was a preceding bulkdelete + * pass. Otherwise, make a pass that won't delete any live tuples, but + * might still accomplish useful stuff with redirect/placeholder cleanup, + * and in any case will provide stats. + */ + if (stats == NULL) + { + stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + bds.info = info; + bds.stats = stats; + bds.callback = dummy_callback; + bds.callback_state = NULL; + + spgvacuumscan(&bds); + } + + /* Finally, vacuum the FSM */ + IndexFreeSpaceMapVacuum(index); + + /* + * It's quite possible for us to be fooled by concurrent page splits into + * double-counting some index tuples, so disbelieve any total that exceeds + * the underlying heap's count ... if we know that accurately. Otherwise + * this might just make matters worse. + */ + if (!info->estimated_count) + { + if (stats->num_index_tuples > info->num_heap_tuples) + stats->num_index_tuples = info->num_heap_tuples; + } + + PG_RETURN_POINTER(stats); +} |