diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 228 |
1 files changed, 142 insertions, 86 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 570cf95eaf7..64aecf251f2 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -4620,6 +4620,9 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record) * conflict processing to occur before we begin index vacuum actions. see * vacuumlazy.c and also comments in btvacuumpage() */ + + /* Backup blocks are not used in cleanup_info records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); } /* @@ -4652,10 +4655,15 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); - RestoreBkpBlocks(lsn, record, true); - - if (record->xl_info & XLR_BKP_BLOCK_1) + /* + * If we have a full-page image, restore it (using a cleanup lock) and + * we're done. + */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, true, false); return; + } buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); if (!BufferIsValid(buffer)) @@ -4721,15 +4729,16 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node); - RestoreBkpBlocks(lsn, record, false); - - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); + buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); if (!BufferIsValid(buffer)) return; - LockBufferForCleanup(buffer); page = (Page) BufferGetPage(buffer); if (XLByteLE(lsn, PageGetLSN(page))) @@ -4779,18 +4788,6 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) Page page; /* - * Read the heap page, if it still exists. If the heap file has been - * dropped or truncated later in recovery, this might fail. In that case, - * there's no point in doing anything further, since the visibility map - * will have to be cleared out at the same time. - */ - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, - RBM_NORMAL); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - - /* * If there are any Hot Standby transactions running that have an xmin * horizon old enough that this page isn't all-visible for them, they * might incorrectly decide that an index-only scan can skip a heap fetch. @@ -4802,37 +4799,50 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - /* - * We don't bump the LSN of the heap page when setting the visibility map - * bit, because that would generate an unworkable volume of full-page - * writes. This exposes us to torn page hazards, but since we're not - * inspecting the existing page contents in any way, we don't care. - * - * However, all operations that clear the visibility map bit *do* bump the - * LSN, and those operations will only be replayed if the XLOG LSN follows - * the page LSN. Thus, if the page LSN has advanced past our XLOG - * record's LSN, we mustn't mark the page all-visible, because the - * subsequent update won't be replayed to clear the flag. + * Read the heap page, if it still exists. If the heap file has been + * dropped or truncated later in recovery, we don't need to update the + * page, but we'd better still update the visibility map. */ - if (!XLByteLE(lsn, PageGetLSN(page))) + buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, + RBM_NORMAL); + if (BufferIsValid(buffer)) { - PageSetAllVisible(page); - MarkBufferDirty(buffer); - } + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - /* Done with heap page. */ - UnlockReleaseBuffer(buffer); + page = (Page) BufferGetPage(buffer); + + /* + * We don't bump the LSN of the heap page when setting the visibility + * map bit, because that would generate an unworkable volume of + * full-page writes. This exposes us to torn page hazards, but since + * we're not inspecting the existing page contents in any way, we + * don't care. + * + * However, all operations that clear the visibility map bit *do* bump + * the LSN, and those operations will only be replayed if the XLOG LSN + * follows the page LSN. Thus, if the page LSN has advanced past our + * XLOG record's LSN, we mustn't mark the page all-visible, because + * the subsequent update won't be replayed to clear the flag. + */ + if (!XLByteLE(lsn, PageGetLSN(page))) + { + PageSetAllVisible(page); + MarkBufferDirty(buffer); + } + + /* Done with heap page. */ + UnlockReleaseBuffer(buffer); + } /* - * Even we skipped the heap page update due to the LSN interlock, it's + * Even if we skipped the heap page update due to the LSN interlock, it's * still safe to update the visibility map. Any WAL record that clears * the visibility map bit does so before checking the page LSN, so any * bits that need to be cleared will still be cleared. */ - if (record->xl_info & XLR_BKP_BLOCK_1) - RestoreBkpBlocks(lsn, record, false); + if (record->xl_info & XLR_BKP_BLOCK(0)) + (void) RestoreBackupBlock(lsn, record, 0, false, false); else { Relation reln; @@ -4844,13 +4854,13 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) /* * Don't set the bit if replay has already passed this point. * - * It might be safe to do this unconditionally; if replay has past + * It might be safe to do this unconditionally; if replay has passed * this point, we'll replay at least as far this time as we did * before, and if this bit needs to be cleared, the record responsible * for doing so should be again replayed, and clear it. For right * now, out of an abundance of conservatism, we use the same test here - * we did for the heap page; if this results in a dropped bit, no real - * harm is done; and the next VACUUM will fix it. + * we did for the heap page. If this results in a dropped bit, no + * real harm is done; and the next VACUUM will fix it. */ if (!XLByteLE(lsn, PageGetLSN(BufferGetPage(vmbuffer)))) visibilitymap_set(reln, xlrec->block, lsn, vmbuffer, @@ -4868,6 +4878,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* Backup blocks are not used in newpage records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + /* * Note: the NEWPAGE log record is used for both heaps and indexes, so do * not do anything that assumes we are touching a heap. @@ -4923,8 +4936,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, blkno, false); if (!BufferIsValid(buffer)) @@ -5004,8 +5021,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } if (record->xl_info & XLOG_HEAP_INIT_PAGE) { @@ -5107,8 +5128,6 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) * required. */ - RestoreBkpBlocks(lsn, record, false); - xlrec = (xl_heap_multi_insert *) recdata; recdata += SizeOfHeapMultiInsert; @@ -5137,8 +5156,12 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } if (isinit) { @@ -5232,9 +5255,10 @@ static void heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); - Buffer buffer; bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == ItemPointerGetBlockNumber(&(xlrec->target.tid))); + Buffer obuffer, + nbuffer; Page page; OffsetNumber offnum; ItemId lp = NULL; @@ -5265,27 +5289,44 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* + * In normal operation, it is important to lock the two pages in + * page-number order, to avoid possible deadlocks against other update + * operations going the other way. However, during WAL replay there can + * be no other update happening, so we don't need to worry about that. But + * we *do* need to worry that we don't expose an inconsistent state to Hot + * Standby queries --- so the original page can't be unlocked before we've + * added the new tuple to the new page. + */ + + if (record->xl_info & XLR_BKP_BLOCK(0)) { + obuffer = RestoreBackupBlock(lsn, record, 0, false, true); if (samepage) - return; /* backup block covered both changes */ + { + /* backup block covered both changes, so we're done */ + UnlockReleaseBuffer(obuffer); + return; + } goto newt; } /* Deal with old tuple version */ - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(buffer)) + obuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); + if (!BufferIsValid(obuffer)) goto newt; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(obuffer); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - UnlockReleaseBuffer(buffer); if (samepage) + { + UnlockReleaseBuffer(obuffer); return; + } goto newt; } @@ -5323,11 +5364,14 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * is already applied */ if (samepage) + { + nbuffer = obuffer; goto newsame; + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(obuffer); /* Deal with new tuple */ @@ -5349,31 +5393,38 @@ newt:; FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_2) + if (record->xl_info & XLR_BKP_BLOCK(1)) + { + (void) RestoreBackupBlock(lsn, record, 1, false, false); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); return; + } if (record->xl_info & XLOG_HEAP_INIT_PAGE) { - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); + nbuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + true); + Assert(BufferIsValid(nbuffer)); + page = (Page) BufferGetPage(nbuffer); - PageInit(page, BufferGetPageSize(buffer), 0); + PageInit(page, BufferGetPageSize(nbuffer), 0); } else { - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - false); - if (!BufferIsValid(buffer)) + nbuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + false); + if (!BufferIsValid(nbuffer)) return; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(nbuffer); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - UnlockReleaseBuffer(buffer); + UnlockReleaseBuffer(nbuffer); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); return; } } @@ -5418,11 +5469,14 @@ newsame:; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(nbuffer); + UnlockReleaseBuffer(nbuffer); + + if (BufferIsValid(obuffer) && obuffer != nbuffer) + UnlockReleaseBuffer(obuffer); /* - * If the page is running low on free space, update the FSM as well. + * If the new page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * @@ -5438,7 +5492,8 @@ newsame:; */ if (!hot_update && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace); + ItemPointerGetBlockNumber(&(xlrec->newtid)), + freespace); } static void @@ -5451,8 +5506,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), @@ -5510,8 +5569,12 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) uint32 oldlen; uint32 newlen; - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), @@ -5560,8 +5623,6 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) * required. The ones in heap2 rmgr do. */ - RestoreBkpBlocks(lsn, record, false); - switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP_INSERT: @@ -5595,11 +5656,6 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - /* - * Note that RestoreBkpBlocks() is called after conflict processing within - * each record type handling function. - */ - switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP2_FREEZE: |