diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
| -rw-r--r-- | src/backend/access/heap/heapam.c | 147 |
1 files changed, 100 insertions, 47 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1ec1efcd112..4f4b5712a46 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -4132,6 +4132,9 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record) * conflict processing to occur before we begin index vacuum actions. see * vacuumlazy.c and also comments in btvacuumpage() */ + + /* Backup blocks are not used in cleanup_info records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); } /* @@ -4164,10 +4167,15 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); - RestoreBkpBlocks(lsn, record, true); - - if (record->xl_info & XLR_BKP_BLOCK_1) + /* + * If we have a full-page image, restore it (using a cleanup lock) and + * we're done. + */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, true, false); return; + } buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); if (!BufferIsValid(buffer)) @@ -4233,15 +4241,16 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node); - RestoreBkpBlocks(lsn, record, false); - - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); + buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); if (!BufferIsValid(buffer)) return; - LockBufferForCleanup(buffer); page = (Page) BufferGetPage(buffer); if (XLByteLE(lsn, PageGetLSN(page))) @@ -4282,6 +4291,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* Backup blocks are not used in newpage records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + /* * Note: the NEWPAGE log record is used for both heaps and indexes, so do * not do anything that assumes we are touching a heap. @@ -4334,8 +4346,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, blkno, false); if (!BufferIsValid(buffer)) @@ -4412,8 +4428,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } if (record->xl_info & XLOG_HEAP_INIT_PAGE) { @@ -4495,9 +4515,10 @@ static void heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); - Buffer buffer; bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == ItemPointerGetBlockNumber(&(xlrec->target.tid))); + Buffer obuffer, + nbuffer; Page page; OffsetNumber offnum; ItemId lp = NULL; @@ -4525,27 +4546,44 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* + * In normal operation, it is important to lock the two pages in + * page-number order, to avoid possible deadlocks against other update + * operations going the other way. However, during WAL replay there can + * be no other update happening, so we don't need to worry about that. But + * we *do* need to worry that we don't expose an inconsistent state to Hot + * Standby queries --- so the original page can't be unlocked before we've + * added the new tuple to the new page. + */ + + if (record->xl_info & XLR_BKP_BLOCK(0)) { + obuffer = RestoreBackupBlock(lsn, record, 0, false, true); if (samepage) - return; /* backup block covered both changes */ + { + /* backup block covered both changes, so we're done */ + UnlockReleaseBuffer(obuffer); + return; + } goto newt; } /* Deal with old tuple version */ - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(buffer)) + obuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); + if (!BufferIsValid(obuffer)) goto newt; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(obuffer); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - UnlockReleaseBuffer(buffer); if (samepage) + { + UnlockReleaseBuffer(obuffer); return; + } goto newt; } @@ -4583,11 +4621,14 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * is already applied */ if (samepage) + { + nbuffer = obuffer; goto newsame; + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(obuffer); /* Deal with new tuple */ @@ -4605,31 +4646,38 @@ newt:; FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_2) + if (record->xl_info & XLR_BKP_BLOCK(1)) + { + (void) RestoreBackupBlock(lsn, record, 1, false, false); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); return; + } if (record->xl_info & XLOG_HEAP_INIT_PAGE) { - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); + nbuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + true); + Assert(BufferIsValid(nbuffer)); + page = (Page) BufferGetPage(nbuffer); - PageInit(page, BufferGetPageSize(buffer), 0); + PageInit(page, BufferGetPageSize(nbuffer), 0); } else { - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - false); - if (!BufferIsValid(buffer)) + nbuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + false); + if (!BufferIsValid(nbuffer)) return; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(nbuffer); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - UnlockReleaseBuffer(buffer); + UnlockReleaseBuffer(nbuffer); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); return; } } @@ -4674,11 +4722,14 @@ newsame:; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(nbuffer); + UnlockReleaseBuffer(nbuffer); + + if (BufferIsValid(obuffer) && obuffer != nbuffer) + UnlockReleaseBuffer(obuffer); /* - * If the page is running low on free space, update the FSM as well. + * If the new page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * @@ -4694,7 +4745,8 @@ newsame:; */ if (!hot_update && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace); + ItemPointerGetBlockNumber(&(xlrec->newtid)), + freespace); } static void @@ -4707,8 +4759,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), @@ -4766,8 +4822,12 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) uint32 oldlen; uint32 newlen; - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), @@ -4816,8 +4876,6 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) * required. The ones in heap2 rmgr do. */ - RestoreBkpBlocks(lsn, record, false); - switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP_INSERT: @@ -4851,11 +4909,6 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - /* - * Note that RestoreBkpBlocks() is called after conflict processing within - * each record type handling function. - */ - switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP2_FREEZE: |
