summaryrefslogtreecommitdiff
path: root/src/backend/access/heap
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap')
-rw-r--r--src/backend/access/heap/Makefile4
-rw-r--r--src/backend/access/heap/heapam.c153
-rw-r--r--src/backend/access/heap/visibilitymap.c478
3 files changed, 622 insertions, 13 deletions
diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile
index 66175ae7da8..dc33054641e 100644
--- a/src/backend/access/heap/Makefile
+++ b/src/backend/access/heap/Makefile
@@ -4,7 +4,7 @@
# Makefile for access/heap
#
# IDENTIFICATION
-# $PostgreSQL: pgsql/src/backend/access/heap/Makefile,v 1.18 2008/02/19 10:30:06 petere Exp $
+# $PostgreSQL: pgsql/src/backend/access/heap/Makefile,v 1.19 2008/12/03 13:05:22 heikki Exp $
#
#-------------------------------------------------------------------------
@@ -12,6 +12,6 @@ subdir = src/backend/access/heap
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o
+OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f6d75c6e2b2..c561e8f960b 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.270 2008/11/19 10:34:50 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.271 2008/12/03 13:05:22 heikki Exp $
*
*
* INTERFACE ROUTINES
@@ -47,6 +47,7 @@
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
+#include "access/visibilitymap.h"
#include "access/xact.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
@@ -195,6 +196,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
int ntup;
OffsetNumber lineoff;
ItemId lpp;
+ bool all_visible;
Assert(page < scan->rs_nblocks);
@@ -233,20 +235,32 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
+ /*
+ * If the all-visible flag indicates that all tuples on the page are
+ * visible to everyone, we can skip the per-tuple visibility tests.
+ */
+ all_visible = PageIsAllVisible(dp);
+
for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
lineoff <= lines;
lineoff++, lpp++)
{
if (ItemIdIsNormal(lpp))
{
- HeapTupleData loctup;
bool valid;
- loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
- loctup.t_len = ItemIdGetLength(lpp);
- ItemPointerSet(&(loctup.t_self), page, lineoff);
+ if (all_visible)
+ valid = true;
+ else
+ {
+ HeapTupleData loctup;
+
+ loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ loctup.t_len = ItemIdGetLength(lpp);
+ ItemPointerSet(&(loctup.t_self), page, lineoff);
- valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+ valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+ }
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
@@ -1860,6 +1874,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
TransactionId xid = GetCurrentTransactionId();
HeapTuple heaptup;
Buffer buffer;
+ bool all_visible_cleared = false;
if (relation->rd_rel->relhasoids)
{
@@ -1920,6 +1935,12 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
RelationPutHeapTuple(relation, buffer, heaptup);
+ if (PageIsAllVisible(BufferGetPage(buffer)))
+ {
+ all_visible_cleared = true;
+ PageClearAllVisible(BufferGetPage(buffer));
+ }
+
/*
* XXX Should we set PageSetPrunable on this page ?
*
@@ -1943,6 +1964,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
+ xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = heaptup->t_self;
rdata[0].data = (char *) &xlrec;
@@ -1994,6 +2016,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
UnlockReleaseBuffer(buffer);
+ /* Clear the bit in the visibility map if necessary */
+ if (all_visible_cleared)
+ visibilitymap_clear(relation,
+ ItemPointerGetBlockNumber(&(heaptup->t_self)));
+
/*
* If tuple is cachable, mark it for invalidation from the caches in case
* we abort. Note it is OK to do this after releasing the buffer, because
@@ -2070,6 +2097,7 @@ heap_delete(Relation relation, ItemPointer tid,
Buffer buffer;
bool have_tuple_lock = false;
bool iscombo;
+ bool all_visible_cleared = false;
Assert(ItemPointerIsValid(tid));
@@ -2216,6 +2244,12 @@ l1:
*/
PageSetPrunable(page, xid);
+ if (PageIsAllVisible(page))
+ {
+ all_visible_cleared = true;
+ PageClearAllVisible(page);
+ }
+
/* store transaction information of xact deleting the tuple */
tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
@@ -2237,6 +2271,7 @@ l1:
XLogRecPtr recptr;
XLogRecData rdata[2];
+ xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = tp.t_self;
rdata[0].data = (char *) &xlrec;
@@ -2281,6 +2316,10 @@ l1:
*/
CacheInvalidateHeapTuple(relation, &tp);
+ /* Clear the bit in the visibility map if necessary */
+ if (all_visible_cleared)
+ visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+
/* Now we can release the buffer */
ReleaseBuffer(buffer);
@@ -2388,6 +2427,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
bool have_tuple_lock = false;
bool iscombo;
bool use_hot_update = false;
+ bool all_visible_cleared = false;
+ bool all_visible_cleared_new = false;
Assert(ItemPointerIsValid(otid));
@@ -2763,6 +2804,12 @@ l2:
MarkBufferDirty(newbuf);
MarkBufferDirty(buffer);
+ /*
+ * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL
+ * record, because log_heap_update looks at those flags to set the
+ * corresponding flags in the WAL record.
+ */
+
/* XLOG stuff */
if (!relation->rd_istemp)
{
@@ -2778,6 +2825,18 @@ l2:
PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
}
+ /* Clear PD_ALL_VISIBLE flags */
+ if (PageIsAllVisible(BufferGetPage(buffer)))
+ {
+ all_visible_cleared = true;
+ PageClearAllVisible(BufferGetPage(buffer));
+ }
+ if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
+ {
+ all_visible_cleared_new = true;
+ PageClearAllVisible(BufferGetPage(newbuf));
+ }
+
END_CRIT_SECTION();
if (newbuf != buffer)
@@ -2791,6 +2850,12 @@ l2:
*/
CacheInvalidateHeapTuple(relation, &oldtup);
+ /* Clear bits in visibility map */
+ if (all_visible_cleared)
+ visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ if (all_visible_cleared_new)
+ visibilitymap_clear(relation, BufferGetBlockNumber(newbuf));
+
/* Now we can release the buffer(s) */
if (newbuf != buffer)
ReleaseBuffer(newbuf);
@@ -3412,6 +3477,11 @@ l3:
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
/*
+ * Don't update the visibility map here. Locking a tuple doesn't
+ * change visibility info.
+ */
+
+ /*
* Now that we have successfully marked the tuple as locked, we can
* release the lmgr tuple lock, if we had it.
*/
@@ -3916,7 +3986,9 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
xlrec.target.node = reln->rd_node;
xlrec.target.tid = from;
+ xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf));
xlrec.newtid = newtup->t_self;
+ xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf));
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapUpdate;
@@ -4185,13 +4257,25 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleHeader htup;
+ BlockNumber blkno;
+
+ blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+ /*
+ * The visibility map always needs to be updated, even if the heap page
+ * is already up-to-date.
+ */
+ if (xlrec->all_visible_cleared)
+ {
+ Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ visibilitymap_clear(reln, blkno);
+ FreeFakeRelcacheEntry(reln);
+ }
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- false);
+ buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
@@ -4223,6 +4307,9 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, record->xl_xid);
+ if (xlrec->all_visible_cleared)
+ PageClearAllVisible(page);
+
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = xlrec->target.tid;
PageSetLSN(page, lsn);
@@ -4249,11 +4336,22 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
Size freespace;
BlockNumber blkno;
+ blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+
+ /*
+ * The visibility map always needs to be updated, even if the heap page
+ * is already up-to-date.
+ */
+ if (xlrec->all_visible_cleared)
+ {
+ Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ visibilitymap_clear(reln, blkno);
+ FreeFakeRelcacheEntry(reln);
+ }
+
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
- blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
-
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
@@ -4307,6 +4405,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
+
+ if (xlrec->all_visible_cleared)
+ PageClearAllVisible(page);
+
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
@@ -4347,6 +4449,18 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
uint32 newlen;
Size freespace;
+ /*
+ * The visibility map always needs to be updated, even if the heap page
+ * is already up-to-date.
+ */
+ if (xlrec->all_visible_cleared)
+ {
+ Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ visibilitymap_clear(reln,
+ ItemPointerGetBlockNumber(&xlrec->target.tid));
+ FreeFakeRelcacheEntry(reln);
+ }
+
if (record->xl_info & XLR_BKP_BLOCK_1)
{
if (samepage)
@@ -4411,6 +4525,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, record->xl_xid);
+ if (xlrec->all_visible_cleared)
+ PageClearAllVisible(page);
+
/*
* this test is ugly, but necessary to avoid thinking that insert change
* is already applied
@@ -4426,6 +4543,17 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
newt:;
+ /*
+ * The visibility map always needs to be updated, even if the heap page
+ * is already up-to-date.
+ */
+ if (xlrec->new_all_visible_cleared)
+ {
+ Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid));
+ FreeFakeRelcacheEntry(reln);
+ }
+
if (record->xl_info & XLR_BKP_BLOCK_2)
return;
@@ -4504,6 +4632,9 @@ newsame:;
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_update_redo: failed to add tuple");
+ if (xlrec->new_all_visible_cleared)
+ PageClearAllVisible(page);
+
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
PageSetLSN(page, lsn);
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
new file mode 100644
index 00000000000..e3cbb4e3dd8
--- /dev/null
+++ b/src/backend/access/heap/visibilitymap.c
@@ -0,0 +1,478 @@
+/*-------------------------------------------------------------------------
+ *
+ * visibilitymap.c
+ * bitmap for tracking visibility of heap tuples
+ *
+ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/heap/visibilitymap.c,v 1.1 2008/12/03 13:05:22 heikki Exp $
+ *
+ * INTERFACE ROUTINES
+ * visibilitymap_clear - clear a bit in the visibility map
+ * visibilitymap_pin - pin a map page for setting a bit
+ * visibilitymap_set - set a bit in a previously pinned page
+ * visibilitymap_test - test if a bit is set
+ *
+ * NOTES
+ *
+ * The visibility map is a bitmap with one bit per heap page. A set bit means
+ * that all tuples on the page are visible to all transactions, and doesn't
+ * therefore need to be vacuumed. The map is conservative in the sense that we
+ * make sure that whenever a bit is set, we know the condition is true, but if
+ * a bit is not set, it might or might not be.
+ *
+ * There's no explicit WAL logging in the functions in this file. The callers
+ * must make sure that whenever a bit is cleared, the bit is cleared on WAL
+ * replay of the updating operation as well. Setting bits during recovery
+ * isn't necessary for correctness.
+ *
+ * Currently, the visibility map is only used as a hint, to speed up VACUUM.
+ * A corrupted visibility map won't cause data corruption, although it can
+ * make VACUUM skip pages that need vacuuming, until the next anti-wraparound
+ * vacuum. The visibility map is not used for anti-wraparound vacuums, because
+ * an anti-wraparound vacuum needs to freeze tuples and observe the latest xid
+ * present in the table, also on pages that don't have any dead tuples.
+ *
+ * Although the visibility map is just a hint at the moment, the PD_ALL_VISIBLE
+ * flag on heap pages *must* be correct.
+ *
+ * LOCKING
+ *
+ * In heapam.c, whenever a page is modified so that not all tuples on the
+ * page are visible to everyone anymore, the corresponding bit in the
+ * visibility map is cleared. The bit in the visibility map is cleared
+ * after releasing the lock on the heap page, to avoid holding the lock
+ * over possible I/O to read in the visibility map page.
+ *
+ * To set a bit, you need to hold a lock on the heap page. That prevents
+ * the race condition where VACUUM sees that all tuples on the page are
+ * visible to everyone, but another backend modifies the page before VACUUM
+ * sets the bit in the visibility map.
+ *
+ * When a bit is set, the LSN of the visibility map page is updated to make
+ * sure that the visibility map update doesn't get written to disk before the
+ * WAL record of the changes that made it possible to set the bit is flushed.
+ * But when a bit is cleared, we don't have to do that because it's always OK
+ * to clear a bit in the map from correctness point of view.
+ *
+ * TODO
+ *
+ * It would be nice to use the visibility map to skip visibility checkes in
+ * index scans.
+ *
+ * Currently, the visibility map is not 100% correct all the time.
+ * During updates, the bit in the visibility map is cleared after releasing
+ * the lock on the heap page. During the window after releasing the lock
+ * and clearing the bit in the visibility map, the bit in the visibility map
+ * is set, but the new insertion or deletion is not yet visible to other
+ * backends.
+ *
+ * That might actually be OK for the index scans, though. The newly inserted
+ * tuple wouldn't have an index pointer yet, so all tuples reachable from an
+ * index would still be visible to all other backends, and deletions wouldn't
+ * be visible to other backends yet.
+ *
+ * There's another hole in the way the PD_ALL_VISIBLE flag is set. When
+ * vacuum observes that all tuples are visible to all, it sets the flag on
+ * the heap page, and also sets the bit in the visibility map. If we then
+ * crash, and only the visibility map page was flushed to disk, we'll have
+ * a bit set in the visibility map, but the corresponding flag on the heap
+ * page is not set. If the heap page is then updated, the updater won't
+ * know to clear the bit in the visibility map.
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/visibilitymap.h"
+#include "storage/bufmgr.h"
+#include "storage/bufpage.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "utils/inval.h"
+
+/*#define TRACE_VISIBILITYMAP */
+
+/*
+ * Size of the bitmap on each visibility map page, in bytes. There's no
+ * extra headers, so the whole page minus except for the standard page header
+ * is used for the bitmap.
+ */
+#define MAPSIZE (BLCKSZ - SizeOfPageHeaderData)
+
+/* Number of bits allocated for each heap block. */
+#define BITS_PER_HEAPBLOCK 1
+
+/* Number of heap blocks we can represent in one byte. */
+#define HEAPBLOCKS_PER_BYTE 8
+
+/* Number of heap blocks we can represent in one visibility map page. */
+#define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
+
+/* Mapping from heap block number to the right bit in the visibility map */
+#define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+#define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
+#define HEAPBLK_TO_MAPBIT(x) ((x) % HEAPBLOCKS_PER_BYTE)
+
+/* prototypes for internal routines */
+static Buffer vm_readbuf(Relation rel, BlockNumber blkno, bool extend);
+static void vm_extend(Relation rel, BlockNumber nvmblocks);
+
+
+/*
+ * visibilitymap_clear - clear a bit in visibility map
+ *
+ * Clear a bit in the visibility map, marking that not all tuples are
+ * visible to all transactions anymore.
+ */
+void
+visibilitymap_clear(Relation rel, BlockNumber heapBlk)
+{
+ BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ int mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ int mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ uint8 mask = 1 << mapBit;
+ Buffer mapBuffer;
+ char *map;
+
+#ifdef TRACE_VISIBILITYMAP
+ elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
+#endif
+
+ mapBuffer = vm_readbuf(rel, mapBlock, false);
+ if (!BufferIsValid(mapBuffer))
+ return; /* nothing to do */
+
+ LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ map = PageGetContents(BufferGetPage(mapBuffer));
+
+ if (map[mapByte] & mask)
+ {
+ map[mapByte] &= ~mask;
+
+ MarkBufferDirty(mapBuffer);
+ }
+
+ UnlockReleaseBuffer(mapBuffer);
+}
+
+/*
+ * visibilitymap_pin - pin a map page for setting a bit
+ *
+ * Setting a bit in the visibility map is a two-phase operation. First, call
+ * visibilitymap_pin, to pin the visibility map page containing the bit for
+ * the heap page. Because that can require I/O to read the map page, you
+ * shouldn't hold a lock on the heap page while doing that. Then, call
+ * visibilitymap_set to actually set the bit.
+ *
+ * On entry, *buf should be InvalidBuffer or a valid buffer returned by
+ * an earlier call to visibilitymap_pin or visibilitymap_test on the same
+ * relation. On return, *buf is a valid buffer with the map page containing
+ * the the bit for heapBlk.
+ *
+ * If the page doesn't exist in the map file yet, it is extended.
+ */
+void
+visibilitymap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf)
+{
+ BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+
+ /* Reuse the old pinned buffer if possible */
+ if (BufferIsValid(*buf))
+ {
+ if (BufferGetBlockNumber(*buf) == mapBlock)
+ return;
+
+ ReleaseBuffer(*buf);
+ }
+ *buf = vm_readbuf(rel, mapBlock, true);
+}
+
+/*
+ * visibilitymap_set - set a bit on a previously pinned page
+ *
+ * recptr is the LSN of the heap page. The LSN of the visibility map page is
+ * advanced to that, to make sure that the visibility map doesn't get flushed
+ * to disk before the update to the heap page that made all tuples visible.
+ *
+ * This is an opportunistic function. It does nothing, unless *buf
+ * contains the bit for heapBlk. Call visibilitymap_pin first to pin
+ * the right map page. This function doesn't do any I/O.
+ */
+void
+visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
+ Buffer *buf)
+{
+ BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ uint32 mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ uint8 mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ Page page;
+ char *map;
+
+#ifdef TRACE_VISIBILITYMAP
+ elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
+#endif
+
+ /* Check that we have the right page pinned */
+ if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != mapBlock)
+ return;
+
+ page = BufferGetPage(*buf);
+ map = PageGetContents(page);
+ LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+
+ if (!(map[mapByte] & (1 << mapBit)))
+ {
+ map[mapByte] |= (1 << mapBit);
+
+ if (XLByteLT(PageGetLSN(page), recptr))
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(*buf);
+ }
+
+ LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+}
+
+/*
+ * visibilitymap_test - test if a bit is set
+ *
+ * Are all tuples on heapBlk visible to all, according to the visibility map?
+ *
+ * On entry, *buf should be InvalidBuffer or a valid buffer returned by an
+ * earlier call to visibilitymap_pin or visibilitymap_test on the same
+ * relation. On return, *buf is a valid buffer with the map page containing
+ * the the bit for heapBlk, or InvalidBuffer. The caller is responsible for
+ * releasing *buf after it's done testing and setting bits.
+ */
+bool
+visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
+{
+ BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ uint32 mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ uint8 mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ bool result;
+ char *map;
+
+#ifdef TRACE_VISIBILITYMAP
+ elog(DEBUG1, "vm_test %s %d", RelationGetRelationName(rel), heapBlk);
+#endif
+
+ /* Reuse the old pinned buffer if possible */
+ if (BufferIsValid(*buf))
+ {
+ if (BufferGetBlockNumber(*buf) != mapBlock)
+ {
+ ReleaseBuffer(*buf);
+ *buf = InvalidBuffer;
+ }
+ }
+
+ if (!BufferIsValid(*buf))
+ {
+ *buf = vm_readbuf(rel, mapBlock, false);
+ if (!BufferIsValid(*buf))
+ return false;
+ }
+
+ map = PageGetContents(BufferGetPage(*buf));
+
+ /*
+ * We don't need to lock the page, as we're only looking at a single bit.
+ */
+ result = (map[mapByte] & (1 << mapBit)) ? true : false;
+
+ return result;
+}
+
+/*
+ * visibilitymap_test - truncate the visibility map
+ */
+void
+visibilitymap_truncate(Relation rel, BlockNumber nheapblocks)
+{
+ BlockNumber newnblocks;
+ /* last remaining block, byte, and bit */
+ BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+ uint32 truncByte = HEAPBLK_TO_MAPBYTE(nheapblocks);
+ uint8 truncBit = HEAPBLK_TO_MAPBIT(nheapblocks);
+
+#ifdef TRACE_VISIBILITYMAP
+ elog(DEBUG1, "vm_truncate %s %d", RelationGetRelationName(rel), nheapblocks);
+#endif
+
+ /*
+ * If no visibility map has been created yet for this relation, there's
+ * nothing to truncate.
+ */
+ if (!smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ return;
+
+ /*
+ * Unless the new size is exactly at a visibility map page boundary, the
+ * tail bits in the last remaining map page, representing truncated heap
+ * blocks, need to be cleared. This is not only tidy, but also necessary
+ * because we don't get a chance to clear the bits if the heap is
+ * extended again.
+ */
+ if (truncByte != 0 || truncBit != 0)
+ {
+ Buffer mapBuffer;
+ Page page;
+ char *map;
+
+ newnblocks = truncBlock + 1;
+
+ mapBuffer = vm_readbuf(rel, truncBlock, false);
+ if (!BufferIsValid(mapBuffer))
+ {
+ /* nothing to do, the file was already smaller */
+ return;
+ }
+
+ page = BufferGetPage(mapBuffer);
+ map = PageGetContents(page);
+
+ LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /* Clear out the unwanted bytes. */
+ MemSet(&map[truncByte + 1], 0, MAPSIZE - (truncByte + 1));
+
+ /*
+ * Mask out the unwanted bits of the last remaining byte.
+ *
+ * ((1 << 0) - 1) = 00000000
+ * ((1 << 1) - 1) = 00000001
+ * ...
+ * ((1 << 6) - 1) = 00111111
+ * ((1 << 7) - 1) = 01111111
+ */
+ map[truncByte] &= (1 << truncBit) - 1;
+
+ MarkBufferDirty(mapBuffer);
+ UnlockReleaseBuffer(mapBuffer);
+ }
+ else
+ newnblocks = truncBlock;
+
+ if (smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM) < newnblocks)
+ {
+ /* nothing to do, the file was already smaller than requested size */
+ return;
+ }
+
+ smgrtruncate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, newnblocks,
+ rel->rd_istemp);
+
+ /*
+ * Need to invalidate the relcache entry, because rd_vm_nblocks
+ * seen by other backends is no longer valid.
+ */
+ if (!InRecovery)
+ CacheInvalidateRelcache(rel);
+
+ rel->rd_vm_nblocks = newnblocks;
+}
+
+/*
+ * Read a visibility map page.
+ *
+ * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
+ * true, the visibility map file is extended.
+ */
+static Buffer
+vm_readbuf(Relation rel, BlockNumber blkno, bool extend)
+{
+ Buffer buf;
+
+ RelationOpenSmgr(rel);
+
+ /*
+ * The current size of the visibility map fork is kept in relcache, to
+ * avoid reading beyond EOF. If we haven't cached the size of the map yet,
+ * do that first.
+ */
+ if (rel->rd_vm_nblocks == InvalidBlockNumber)
+ {
+ if (smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ rel->rd_vm_nblocks = smgrnblocks(rel->rd_smgr,
+ VISIBILITYMAP_FORKNUM);
+ else
+ rel->rd_vm_nblocks = 0;
+ }
+
+ /* Handle requests beyond EOF */
+ if (blkno >= rel->rd_vm_nblocks)
+ {
+ if (extend)
+ vm_extend(rel, blkno + 1);
+ else
+ return InvalidBuffer;
+ }
+
+ /*
+ * Use ZERO_ON_ERROR mode, and initialize the page if necessary. It's
+ * always safe to clear bits, so it's better to clear corrupt pages than
+ * error out.
+ */
+ buf = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, blkno,
+ RBM_ZERO_ON_ERROR, NULL);
+ if (PageIsNew(BufferGetPage(buf)))
+ PageInit(BufferGetPage(buf), BLCKSZ, 0);
+ return buf;
+}
+
+/*
+ * Ensure that the visibility map fork is at least vm_nblocks long, extending
+ * it if necessary with zeroed pages.
+ */
+static void
+vm_extend(Relation rel, BlockNumber vm_nblocks)
+{
+ BlockNumber vm_nblocks_now;
+ Page pg;
+
+ pg = (Page) palloc(BLCKSZ);
+ PageInit(pg, BLCKSZ, 0);
+
+ /*
+ * We use the relation extension lock to lock out other backends trying
+ * to extend the visibility map at the same time. It also locks out
+ * extension of the main fork, unnecessarily, but extending the
+ * visibility map happens seldom enough that it doesn't seem worthwhile to
+ * have a separate lock tag type for it.
+ *
+ * Note that another backend might have extended or created the
+ * relation before we get the lock.
+ */
+ LockRelationForExtension(rel, ExclusiveLock);
+
+ /* Create the file first if it doesn't exist */
+ if ((rel->rd_vm_nblocks == 0 || rel->rd_vm_nblocks == InvalidBlockNumber)
+ && !smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ {
+ smgrcreate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, false);
+ vm_nblocks_now = 0;
+ }
+ else
+ vm_nblocks_now = smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+
+ while (vm_nblocks_now < vm_nblocks)
+ {
+ smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
+ (char *) pg, rel->rd_istemp);
+ vm_nblocks_now++;
+ }
+
+ UnlockRelationForExtension(rel, ExclusiveLock);
+
+ pfree(pg);
+
+ /* Update the relcache with the up-to-date size */
+ if (!InRecovery)
+ CacheInvalidateRelcache(rel);
+ rel->rd_vm_nblocks = vm_nblocks_now;
+}