summaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c228
1 files changed, 142 insertions, 86 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 570cf95eaf7..64aecf251f2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -4620,6 +4620,9 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record)
* conflict processing to occur before we begin index vacuum actions. see
* vacuumlazy.c and also comments in btvacuumpage()
*/
+
+ /* Backup blocks are not used in cleanup_info records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
}
/*
@@ -4652,10 +4655,15 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
xlrec->node);
- RestoreBkpBlocks(lsn, record, true);
-
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /*
+ * If we have a full-page image, restore it (using a cleanup lock) and
+ * we're done.
+ */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, true, false);
return;
+ }
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
if (!BufferIsValid(buffer))
@@ -4721,15 +4729,16 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
- RestoreBkpBlocks(lsn, record, false);
-
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
- buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
+ buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
- LockBufferForCleanup(buffer);
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page)))
@@ -4779,18 +4788,6 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
Page page;
/*
- * Read the heap page, if it still exists. If the heap file has been
- * dropped or truncated later in recovery, this might fail. In that case,
- * there's no point in doing anything further, since the visibility map
- * will have to be cleared out at the same time.
- */
- buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
- RBM_NORMAL);
- if (!BufferIsValid(buffer))
- return;
- page = (Page) BufferGetPage(buffer);
-
- /*
* If there are any Hot Standby transactions running that have an xmin
* horizon old enough that this page isn't all-visible for them, they
* might incorrectly decide that an index-only scan can skip a heap fetch.
@@ -4802,37 +4799,50 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
if (InHotStandby)
ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
/*
- * We don't bump the LSN of the heap page when setting the visibility map
- * bit, because that would generate an unworkable volume of full-page
- * writes. This exposes us to torn page hazards, but since we're not
- * inspecting the existing page contents in any way, we don't care.
- *
- * However, all operations that clear the visibility map bit *do* bump the
- * LSN, and those operations will only be replayed if the XLOG LSN follows
- * the page LSN. Thus, if the page LSN has advanced past our XLOG
- * record's LSN, we mustn't mark the page all-visible, because the
- * subsequent update won't be replayed to clear the flag.
+ * Read the heap page, if it still exists. If the heap file has been
+ * dropped or truncated later in recovery, we don't need to update the
+ * page, but we'd better still update the visibility map.
*/
- if (!XLByteLE(lsn, PageGetLSN(page)))
+ buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block,
+ RBM_NORMAL);
+ if (BufferIsValid(buffer))
{
- PageSetAllVisible(page);
- MarkBufferDirty(buffer);
- }
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- /* Done with heap page. */
- UnlockReleaseBuffer(buffer);
+ page = (Page) BufferGetPage(buffer);
+
+ /*
+ * We don't bump the LSN of the heap page when setting the visibility
+ * map bit, because that would generate an unworkable volume of
+ * full-page writes. This exposes us to torn page hazards, but since
+ * we're not inspecting the existing page contents in any way, we
+ * don't care.
+ *
+ * However, all operations that clear the visibility map bit *do* bump
+ * the LSN, and those operations will only be replayed if the XLOG LSN
+ * follows the page LSN. Thus, if the page LSN has advanced past our
+ * XLOG record's LSN, we mustn't mark the page all-visible, because
+ * the subsequent update won't be replayed to clear the flag.
+ */
+ if (!XLByteLE(lsn, PageGetLSN(page)))
+ {
+ PageSetAllVisible(page);
+ MarkBufferDirty(buffer);
+ }
+
+ /* Done with heap page. */
+ UnlockReleaseBuffer(buffer);
+ }
/*
- * Even we skipped the heap page update due to the LSN interlock, it's
+ * Even if we skipped the heap page update due to the LSN interlock, it's
* still safe to update the visibility map. Any WAL record that clears
* the visibility map bit does so before checking the page LSN, so any
* bits that need to be cleared will still be cleared.
*/
- if (record->xl_info & XLR_BKP_BLOCK_1)
- RestoreBkpBlocks(lsn, record, false);
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
else
{
Relation reln;
@@ -4844,13 +4854,13 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
/*
* Don't set the bit if replay has already passed this point.
*
- * It might be safe to do this unconditionally; if replay has past
+ * It might be safe to do this unconditionally; if replay has passed
* this point, we'll replay at least as far this time as we did
* before, and if this bit needs to be cleared, the record responsible
* for doing so should be again replayed, and clear it. For right
* now, out of an abundance of conservatism, we use the same test here
- * we did for the heap page; if this results in a dropped bit, no real
- * harm is done; and the next VACUUM will fix it.
+ * we did for the heap page. If this results in a dropped bit, no
+ * real harm is done; and the next VACUUM will fix it.
*/
if (!XLByteLE(lsn, PageGetLSN(BufferGetPage(vmbuffer))))
visibilitymap_set(reln, xlrec->block, lsn, vmbuffer,
@@ -4868,6 +4878,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer;
Page page;
+ /* Backup blocks are not used in newpage records */
+ Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
/*
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
@@ -4923,8 +4936,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
if (!BufferIsValid(buffer))
@@ -5004,8 +5021,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
@@ -5107,8 +5128,6 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
* required.
*/
- RestoreBkpBlocks(lsn, record, false);
-
xlrec = (xl_heap_multi_insert *) recdata;
recdata += SizeOfHeapMultiInsert;
@@ -5137,8 +5156,12 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
if (isinit)
{
@@ -5232,9 +5255,10 @@ static void
heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
{
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
- Buffer buffer;
bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ Buffer obuffer,
+ nbuffer;
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
@@ -5265,27 +5289,44 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /*
+ * In normal operation, it is important to lock the two pages in
+ * page-number order, to avoid possible deadlocks against other update
+ * operations going the other way. However, during WAL replay there can
+ * be no other update happening, so we don't need to worry about that. But
+ * we *do* need to worry that we don't expose an inconsistent state to Hot
+ * Standby queries --- so the original page can't be unlocked before we've
+ * added the new tuple to the new page.
+ */
+
+ if (record->xl_info & XLR_BKP_BLOCK(0))
{
+ obuffer = RestoreBackupBlock(lsn, record, 0, false, true);
if (samepage)
- return; /* backup block covered both changes */
+ {
+ /* backup block covered both changes, so we're done */
+ UnlockReleaseBuffer(obuffer);
+ return;
+ }
goto newt;
}
/* Deal with old tuple version */
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)),
- false);
- if (!BufferIsValid(buffer))
+ obuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)),
+ false);
+ if (!BufferIsValid(obuffer))
goto newt;
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(obuffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
- UnlockReleaseBuffer(buffer);
if (samepage)
+ {
+ UnlockReleaseBuffer(obuffer);
return;
+ }
goto newt;
}
@@ -5323,11 +5364,14 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
* is already applied
*/
if (samepage)
+ {
+ nbuffer = obuffer;
goto newsame;
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ MarkBufferDirty(obuffer);
/* Deal with new tuple */
@@ -5349,31 +5393,38 @@ newt:;
FreeFakeRelcacheEntry(reln);
}
- if (record->xl_info & XLR_BKP_BLOCK_2)
+ if (record->xl_info & XLR_BKP_BLOCK(1))
+ {
+ (void) RestoreBackupBlock(lsn, record, 1, false, false);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
return;
+ }
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
{
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- true);
- Assert(BufferIsValid(buffer));
- page = (Page) BufferGetPage(buffer);
+ nbuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ true);
+ Assert(BufferIsValid(nbuffer));
+ page = (Page) BufferGetPage(nbuffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
+ PageInit(page, BufferGetPageSize(nbuffer), 0);
}
else
{
- buffer = XLogReadBuffer(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- false);
- if (!BufferIsValid(buffer))
+ nbuffer = XLogReadBuffer(xlrec->target.node,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ false);
+ if (!BufferIsValid(nbuffer))
return;
- page = (Page) BufferGetPage(buffer);
+ page = (Page) BufferGetPage(nbuffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
- UnlockReleaseBuffer(buffer);
+ UnlockReleaseBuffer(nbuffer);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
return;
}
}
@@ -5418,11 +5469,14 @@ newsame:;
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
+ MarkBufferDirty(nbuffer);
+ UnlockReleaseBuffer(nbuffer);
+
+ if (BufferIsValid(obuffer) && obuffer != nbuffer)
+ UnlockReleaseBuffer(obuffer);
/*
- * If the page is running low on free space, update the FSM as well.
+ * If the new page is running low on free space, update the FSM as well.
* Arbitrarily, our definition of "low" is less than 20%. We can't do much
* better than that without knowing the fill-factor for the table.
*
@@ -5438,7 +5492,8 @@ newsame:;
*/
if (!hot_update && freespace < BLCKSZ / 5)
XLogRecordPageWithFreeSpace(xlrec->target.node,
- ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace);
+ ItemPointerGetBlockNumber(&(xlrec->newtid)),
+ freespace);
}
static void
@@ -5451,8 +5506,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
ItemId lp = NULL;
HeapTupleHeader htup;
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
@@ -5510,8 +5569,12 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
uint32 oldlen;
uint32 newlen;
- if (record->xl_info & XLR_BKP_BLOCK_1)
+ /* If we have a full-page image, restore it and we're done */
+ if (record->xl_info & XLR_BKP_BLOCK(0))
+ {
+ (void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
+ }
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
@@ -5560,8 +5623,6 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
* required. The ones in heap2 rmgr do.
*/
- RestoreBkpBlocks(lsn, record, false);
-
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP_INSERT:
@@ -5595,11 +5656,6 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- /*
- * Note that RestoreBkpBlocks() is called after conflict processing within
- * each record type handling function.
- */
-
switch (info & XLOG_HEAP_OPMASK)
{
case XLOG_HEAP2_FREEZE: