diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 1060 |
1 files changed, 466 insertions, 594 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4d7575bc062..ae15c0b9556 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7134,15 +7134,13 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) { xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record); Buffer buffer; - Page page; - OffsetNumber *end; - OffsetNumber *redirected; - OffsetNumber *nowdead; - OffsetNumber *nowunused; - int nredirected; - int ndead; - int nunused; - Size freespace; + Size freespace = 0; + RelFileNode rnode; + BlockNumber blkno; + XLogRedoAction action; + + rnode = xlrec->node; + blkno = xlrec->block; /* * We're about to remove tuples. In Hot Standby mode, ensure that there's @@ -7153,65 +7151,63 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, - xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and * we're done. */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - { - (void) RestoreBackupBlock(lsn, record, 0, true, false); - return; - } + action = XLogReadBufferForRedoExtended(lsn, record, 0, + rnode, MAIN_FORKNUM, blkno, + RBM_NORMAL, true, &buffer); + if (action == BLK_NEEDS_REDO) + { + Page page = (Page) BufferGetPage(buffer); + OffsetNumber *end; + OffsetNumber *redirected; + OffsetNumber *nowdead; + OffsetNumber *nowunused; + int nredirected; + int ndead; + int nunused; + + nredirected = xlrec->nredirected; + ndead = xlrec->ndead; + end = (OffsetNumber *) ((char *) xlrec + record->xl_len); + redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); + nowdead = redirected + (nredirected * 2); + nowunused = nowdead + ndead; + nunused = (end - nowunused); + Assert(nunused >= 0); + + /* Update all item pointers per the record, and repair fragmentation */ + heap_page_prune_execute(buffer, + redirected, nredirected, + nowdead, ndead, + nowunused, nunused); + + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); - if (!BufferIsValid(buffer)) - return; - LockBufferForCleanup(buffer); - page = (Page) BufferGetPage(buffer); + /* + * Note: we don't worry about updating the page's prunability hints. + * At worst this will cause an extra prune cycle to occur soon. + */ - if (lsn <= PageGetLSN(page)) - { - UnlockReleaseBuffer(buffer); - return; + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } - - nredirected = xlrec->nredirected; - ndead = xlrec->ndead; - end = (OffsetNumber *) ((char *) xlrec + record->xl_len); - redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean); - nowdead = redirected + (nredirected * 2); - nowunused = nowdead + ndead; - nunused = (end - nowunused); - Assert(nunused >= 0); - - /* Update all item pointers per the record, and repair fragmentation */ - heap_page_prune_execute(buffer, - redirected, nredirected, - nowdead, ndead, - nowunused, nunused); - - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - - /* - * Note: we don't worry about updating the page's prunability hints. At - * worst this will cause an extra prune cycle to occur soon. - */ - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* * Update the FSM as well. * - * XXX: We don't get here if the page was restored from full page image. - * We don't bother to update the FSM in that case, it doesn't need to be + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ - XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace); + if (action == BLK_NEEDS_REDO) + XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace); } /* @@ -7226,6 +7222,14 @@ static void heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) { xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record); + Buffer buffer; + Page page; + RelFileNode rnode; + BlockNumber blkno; + XLogRedoAction action; + + rnode = xlrec->node; + blkno = xlrec->block; /* * If there are any Hot Standby transactions running that have an xmin @@ -7237,60 +7241,43 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); /* - * If heap block was backed up, restore it. This can only happen with - * checksums enabled. + * Read the heap page, if it still exists. If the heap file has dropped or + * truncated later in recovery, we don't need to update the page, but we'd + * better still update the visibility map. */ - if (record->xl_info & XLR_BKP_BLOCK(1)) + action = XLogReadBufferForRedo(lsn, record, 1, rnode, blkno, &buffer); + if (action == BLK_NEEDS_REDO) { - Assert(DataChecksumsEnabled()); - (void) RestoreBackupBlock(lsn, record, 1, false, false); + /* + * We don't bump the LSN of the heap page when setting the visibility + * map bit (unless checksums are enabled, in which case we must), + * because that would generate an unworkable volume of full-page + * writes. This exposes us to torn page hazards, but since we're not + * inspecting the existing page contents in any way, we don't care. + * + * However, all operations that clear the visibility map bit *do* bump + * the LSN, and those operations will only be replayed if the XLOG LSN + * follows the page LSN. Thus, if the page LSN has advanced past our + * XLOG record's LSN, we mustn't mark the page all-visible, because + * the subsequent update won't be replayed to clear the flag. + */ + page = BufferGetPage(buffer); + PageSetAllVisible(page); + MarkBufferDirty(buffer); } - else + else if (action == BLK_RESTORED) { - Buffer buffer; - Page page; - /* - * Read the heap page, if it still exists. If the heap file has been - * dropped or truncated later in recovery, we don't need to update the - * page, but we'd better still update the visibility map. + * If heap block was backed up, restore it. This can only happen with + * checksums enabled. */ - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, - xlrec->block, RBM_NORMAL); - if (BufferIsValid(buffer)) - { - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - - page = (Page) BufferGetPage(buffer); - - /* - * We don't bump the LSN of the heap page when setting the - * visibility map bit (unless checksums are enabled, in which case - * we must), because that would generate an unworkable volume of - * full-page writes. This exposes us to torn page hazards, but - * since we're not inspecting the existing page contents in any - * way, we don't care. - * - * However, all operations that clear the visibility map bit *do* - * bump the LSN, and those operations will only be replayed if the - * XLOG LSN follows the page LSN. Thus, if the page LSN has - * advanced past our XLOG record's LSN, we mustn't mark the page - * all-visible, because the subsequent update won't be replayed to - * clear the flag. - */ - if (lsn > PageGetLSN(page)) - { - PageSetAllVisible(page); - MarkBufferDirty(buffer); - } - - /* Done with heap page. */ - UnlockReleaseBuffer(buffer); - } + Assert(DataChecksumsEnabled()); } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* * Even if we skipped the heap page update due to the LSN interlock, it's @@ -7305,8 +7292,8 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) Relation reln; Buffer vmbuffer = InvalidBuffer; - reln = CreateFakeRelcacheEntry(xlrec->node); - visibilitymap_pin(reln, xlrec->block, &vmbuffer); + reln = CreateFakeRelcacheEntry(rnode); + visibilitymap_pin(reln, blkno, &vmbuffer); /* * Don't set the bit if replay has already passed this point. @@ -7320,7 +7307,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) * real harm is done; and the next VACUUM will fix it. */ if (lsn > PageGetLSN(BufferGetPage(vmbuffer))) - visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer, + visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer, xlrec->cutoff_xid); ReleaseBuffer(vmbuffer); @@ -7347,42 +7334,30 @@ heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node); - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) + if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block, + &buffer) == BLK_NEEDS_REDO) { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - - buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); - if (!BufferIsValid(buffer)) - return; - - page = (Page) BufferGetPage(buffer); + page = BufferGetPage(buffer); - if (lsn <= PageGetLSN(page)) - { - UnlockReleaseBuffer(buffer); - return; - } + /* now execute freeze plan for each frozen tuple */ + for (ntup = 0; ntup < xlrec->ntuples; ntup++) + { + xl_heap_freeze_tuple *xlrec_tp; + ItemId lp; + HeapTupleHeader tuple; - /* now execute freeze plan for each frozen tuple */ - for (ntup = 0; ntup < xlrec->ntuples; ntup++) - { - xl_heap_freeze_tuple *xlrec_tp; - ItemId lp; - HeapTupleHeader tuple; + xlrec_tp = &xlrec->tuples[ntup]; + lp = PageGetItemId(page, xlrec_tp->offset); /* offsets are one-based */ + tuple = (HeapTupleHeader) PageGetItem(page, lp); - xlrec_tp = &xlrec->tuples[ntup]; - lp = PageGetItemId(page, xlrec_tp->offset); /* offsets are one-based */ - tuple = (HeapTupleHeader) PageGetItem(page, lp); + heap_execute_freeze_tuple(tuple, xlrec_tp); + } - heap_execute_freeze_tuple(tuple, xlrec_tp); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } /* @@ -7422,8 +7397,10 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; BlockNumber blkno; + RelFileNode target_node; blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + target_node = xlrec->target.node; /* * The visibility map may need to be fixed even if the heap page is @@ -7431,7 +7408,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) */ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { - Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, blkno, &vmbuffer); @@ -7440,52 +7417,41 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) + if (XLogReadBufferForRedo(lsn, record, 0, target_node, blkno, &buffer) + == BLK_NEEDS_REDO) { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - - buffer = XLogReadBuffer(xlrec->target.node, blkno, false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(buffer); - if (lsn <= PageGetLSN(page)) /* changes are applied */ - { - UnlockReleaseBuffer(buffer); - return; - } + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_delete_redo: invalid lp"); - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "heap_delete_redo: invalid lp"); + htup = (HeapTupleHeader) PageGetItem(page, lp); - htup = (HeapTupleHeader) PageGetItem(page, lp); - - htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); - htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; - HeapTupleHeaderClearHotUpdated(htup); - fix_infomask_from_infobits(xlrec->infobits_set, - &htup->t_infomask, &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->xmax); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; + HeapTupleHeaderClearHotUpdated(htup); + fix_infomask_from_infobits(xlrec->infobits_set, + &htup->t_infomask, &htup->t_infomask2); + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + HeapTupleHeaderSetCmax(htup, FirstCommandId, false); - /* Mark the page as a candidate for pruning */ - PageSetPrunable(page, record->xl_xid); + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page, record->xl_xid); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); - /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = xlrec->target.tid; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + /* Make sure there is no forward chain link in t_ctid */ + htup->t_ctid = xlrec->target.tid; + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -7503,9 +7469,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) HeapTupleHeader htup; xl_heap_header xlhdr; uint32 newlen; - Size freespace; + Size freespace = 0; + RelFileNode target_node; BlockNumber blkno; + XLogRedoAction action; + target_node = xlrec->target.node; blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); /* @@ -7514,7 +7483,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) */ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { - Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + Relation reln = CreateFakeRelcacheEntry(target_node); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, blkno, &vmbuffer); @@ -7523,82 +7492,76 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - + /* + * If we inserted the first and only tuple on the page, re-initialize + * the page from scratch. + */ if (record->xl_info & XLOG_HEAP_INIT_PAGE) { - buffer = XLogReadBuffer(xlrec->target.node, blkno, true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); - + XLogReadBufferForRedoExtended(lsn, record, 0, + target_node, MAIN_FORKNUM, blkno, + RBM_ZERO, false, &buffer); + page = BufferGetPage(buffer); PageInit(page, BufferGetPageSize(buffer), 0); + action = BLK_NEEDS_REDO; } else + action = XLogReadBufferForRedo(lsn, record, 0, target_node, blkno, + &buffer); + + if (action == BLK_NEEDS_REDO) { - buffer = XLogReadBuffer(xlrec->target.node, blkno, false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); + page = BufferGetPage(buffer); - if (lsn <= PageGetLSN(page)) /* changes are applied */ - { - UnlockReleaseBuffer(buffer); - return; - } - } + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "heap_insert_redo: invalid max offset number"); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) + 1 < offnum) - elog(PANIC, "heap_insert_redo: invalid max offset number"); - - newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader; - Assert(newlen <= MaxHeapTupleSize); - memcpy((char *) &xlhdr, - (char *) xlrec + SizeOfHeapInsert, - SizeOfHeapHeader); - htup = &tbuf.hdr; - MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); - /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ - memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), - (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader, - newlen); - newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; - HeapTupleHeaderSetXmin(htup, record->xl_xid); - HeapTupleHeaderSetCmin(htup, FirstCommandId); - htup->t_ctid = xlrec->target.tid; + newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader; + Assert(newlen <= MaxHeapTupleSize); + memcpy((char *) &xlhdr, + (char *) xlrec + SizeOfHeapInsert, + SizeOfHeapHeader); + htup = &tbuf.hdr; + MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); + /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ + memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), + (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader, + newlen); + newlen += offsetof(HeapTupleHeaderData, t_bits); + htup->t_infomask2 = xlhdr.t_infomask2; + htup->t_infomask = xlhdr.t_infomask; + htup->t_hoff = xlhdr.t_hoff; + HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + htup->t_ctid = xlrec->target.tid; - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); - if (offnum == InvalidOffsetNumber) - elog(PANIC, "heap_insert_redo: failed to add tuple"); + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "heap_insert_redo: failed to add tuple"); - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - PageSetLSN(page, lsn); + PageSetLSN(page, lsn); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* * If the page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * - * XXX: We don't get here if the page was restored from full page image. - * We don't bother to update the FSM in that case, it doesn't need to be + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ - if (freespace < BLCKSZ / 5) + if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace); } @@ -7610,6 +7573,8 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) { char *recdata = XLogRecGetData(record); xl_heap_multi_insert *xlrec; + RelFileNode rnode; + BlockNumber blkno; Buffer buffer; Page page; struct @@ -7619,10 +7584,10 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) } tbuf; HeapTupleHeader htup; uint32 newlen; - Size freespace; - BlockNumber blkno; + Size freespace = 0; int i; bool isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0; + XLogRedoAction action; /* * Insertion doesn't overwrite MVCC data, so no conflict processing is @@ -7632,6 +7597,9 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) xlrec = (xl_heap_multi_insert *) recdata; recdata += SizeOfHeapMultiInsert; + rnode = xlrec->node; + blkno = xlrec->blkno; + /* * If we're reinitializing the page, the tuples are stored in order from * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL @@ -7640,15 +7608,13 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) if (!isinit) recdata += sizeof(OffsetNumber) * xlrec->ntuples; - blkno = xlrec->blkno; - /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { - Relation reln = CreateFakeRelcacheEntry(xlrec->node); + Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, blkno, &vmbuffer); @@ -7657,94 +7623,82 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - if (isinit) { - buffer = XLogReadBuffer(xlrec->node, blkno, true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); - + XLogReadBufferForRedoExtended(lsn, record, 0, + rnode, MAIN_FORKNUM, blkno, + RBM_ZERO, false, &buffer); + page = BufferGetPage(buffer); PageInit(page, BufferGetPageSize(buffer), 0); + action = BLK_NEEDS_REDO; } else - { - buffer = XLogReadBuffer(xlrec->node, blkno, false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); + action = XLogReadBufferForRedo(lsn, record, 0, rnode, blkno, &buffer); - if (lsn <= PageGetLSN(page)) /* changes are applied */ - { - UnlockReleaseBuffer(buffer); - return; - } - } - - for (i = 0; i < xlrec->ntuples; i++) + if (action == BLK_NEEDS_REDO) { - OffsetNumber offnum; - xl_multi_insert_tuple *xlhdr; + page = BufferGetPage(buffer); + for (i = 0; i < xlrec->ntuples; i++) + { + OffsetNumber offnum; + xl_multi_insert_tuple *xlhdr; - if (isinit) - offnum = FirstOffsetNumber + i; - else - offnum = xlrec->offsets[i]; - if (PageGetMaxOffsetNumber(page) + 1 < offnum) - elog(PANIC, "heap_multi_insert_redo: invalid max offset number"); + if (isinit) + offnum = FirstOffsetNumber + i; + else + offnum = xlrec->offsets[i]; + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "heap_multi_insert_redo: invalid max offset number"); + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata); + recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; + + newlen = xlhdr->datalen; + Assert(newlen <= MaxHeapTupleSize); + htup = &tbuf.hdr; + MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); + /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ + memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), + (char *) recdata, + newlen); + recdata += newlen; + + newlen += offsetof(HeapTupleHeaderData, t_bits); + htup->t_infomask2 = xlhdr->t_infomask2; + htup->t_infomask = xlhdr->t_infomask; + htup->t_hoff = xlhdr->t_hoff; + HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + ItemPointerSetBlockNumber(&htup->t_ctid, blkno); + ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); + + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "heap_multi_insert_redo: failed to add tuple"); + } - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata); - recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - newlen = xlhdr->datalen; - Assert(newlen <= MaxHeapTupleSize); - htup = &tbuf.hdr; - MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); - /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ - memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), - (char *) recdata, - newlen); - recdata += newlen; + PageSetLSN(page, lsn); - newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_infomask2 = xlhdr->t_infomask2; - htup->t_infomask = xlhdr->t_infomask; - htup->t_hoff = xlhdr->t_hoff; - HeapTupleHeaderSetXmin(htup, record->xl_xid); - HeapTupleHeaderSetCmin(htup, FirstCommandId); - ItemPointerSetBlockNumber(&htup->t_ctid, blkno); - ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); - if (offnum == InvalidOffsetNumber) - elog(PANIC, "heap_multi_insert_redo: failed to add tuple"); + MarkBufferDirty(buffer); } - - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - - PageSetLSN(page, lsn); - - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); /* * If the page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * - * XXX: We don't get here if the page was restored from full page image. - * We don't bother to update the FSM in that case, it doesn't need to be + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ - if (freespace < BLCKSZ / 5) + if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace); } @@ -7755,8 +7709,9 @@ static void heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); - bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == - ItemPointerGetBlockNumber(&(xlrec->target.tid))); + RelFileNode rnode; + BlockNumber oldblk; + BlockNumber newblk; Buffer obuffer, nbuffer; Page page; @@ -7775,24 +7730,29 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) } tbuf; xl_heap_header_len xlhdr; uint32 newlen; - Size freespace; + Size freespace = 0; + XLogRedoAction oldaction; + XLogRedoAction newaction; /* initialize to keep the compiler quiet */ oldtup.t_data = NULL; oldtup.t_len = 0; + rnode = xlrec->target.node; + newblk = ItemPointerGetBlockNumber(&xlrec->newtid); + oldblk = ItemPointerGetBlockNumber(&xlrec->target.tid); + /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { - Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); - BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); + Relation reln = CreateFakeRelcacheEntry(rnode); Buffer vmbuffer = InvalidBuffer; - visibilitymap_pin(reln, block, &vmbuffer); - visibilitymap_clear(reln, block, vmbuffer); + visibilitymap_pin(reln, oldblk, &vmbuffer); + visibilitymap_clear(reln, oldblk, vmbuffer); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } @@ -7807,84 +7767,67 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * added the new tuple to the new page. */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - { - obuffer = RestoreBackupBlock(lsn, record, 0, false, true); - if (samepage) - { - /* backup block covered both changes, so we're done */ - UnlockReleaseBuffer(obuffer); - return; - } - goto newt; - } - /* Deal with old tuple version */ - - obuffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(obuffer)) - goto newt; - page = (Page) BufferGetPage(obuffer); - - if (lsn <= PageGetLSN(page)) /* changes are applied */ + oldaction = XLogReadBufferForRedo(lsn, record, 0, rnode, oldblk, &obuffer); + if (oldaction == BLK_NEEDS_REDO) { - if (samepage) - { - UnlockReleaseBuffer(obuffer); - return; - } - goto newt; - } - - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); + page = (Page) BufferGetPage(obuffer); - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "heap_update_redo: invalid lp"); + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); - htup = (HeapTupleHeader) PageGetItem(page, lp); + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_update_redo: invalid lp"); - oldtup.t_data = htup; - oldtup.t_len = ItemIdGetLength(lp); + htup = (HeapTupleHeader) PageGetItem(page, lp); - htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); - htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; - if (hot_update) - HeapTupleHeaderSetHotUpdated(htup); - else - HeapTupleHeaderClearHotUpdated(htup); - fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, - &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); - /* Set forward chain link in t_ctid */ - htup->t_ctid = xlrec->newtid; + oldtup.t_data = htup; + oldtup.t_len = ItemIdGetLength(lp); - /* Mark the page as a candidate for pruning */ - PageSetPrunable(page, record->xl_xid); + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; + if (hot_update) + HeapTupleHeaderSetHotUpdated(htup); + else + HeapTupleHeaderClearHotUpdated(htup); + fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, + &htup->t_infomask2); + HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); + HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + /* Set forward chain link in t_ctid */ + htup->t_ctid = xlrec->newtid; + + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page, record->xl_xid); + + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); - if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); + PageSetLSN(page, lsn); + MarkBufferDirty(obuffer); + } /* - * this test is ugly, but necessary to avoid thinking that insert change - * is already applied + * Read the page the new tuple goes into, if different from old. */ - if (samepage) + if (oldblk == newblk) { nbuffer = obuffer; - goto newsame; + newaction = oldaction; } - - PageSetLSN(page, lsn); - MarkBufferDirty(obuffer); - - /* Deal with new tuple */ - -newt:; + else if (record->xl_info & XLOG_HEAP_INIT_PAGE) + { + XLogReadBufferForRedoExtended(lsn, record, 1, + rnode, MAIN_FORKNUM, newblk, + RBM_ZERO, false, &nbuffer); + page = (Page) BufferGetPage(nbuffer); + PageInit(page, BufferGetPageSize(nbuffer), 0); + newaction = BLK_NEEDS_REDO; + } + else + newaction = XLogReadBufferForRedo(lsn, record, 1, rnode, newblk, + &nbuffer); /* * The visibility map may need to be fixed even if the heap page is @@ -7893,144 +7836,110 @@ newt:; if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); - BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); Buffer vmbuffer = InvalidBuffer; - visibilitymap_pin(reln, block, &vmbuffer); - visibilitymap_clear(reln, block, vmbuffer); + visibilitymap_pin(reln, newblk, &vmbuffer); + visibilitymap_clear(reln, newblk, vmbuffer); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK(1)) - { - (void) RestoreBackupBlock(lsn, record, 1, false, false); - if (BufferIsValid(obuffer)) - UnlockReleaseBuffer(obuffer); - return; - } - - if (record->xl_info & XLOG_HEAP_INIT_PAGE) + /* Deal with new tuple */ + if (newaction == BLK_NEEDS_REDO) { - nbuffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - true); - Assert(BufferIsValid(nbuffer)); page = (Page) BufferGetPage(nbuffer); - PageInit(page, BufferGetPageSize(nbuffer), 0); - } - else - { - nbuffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - false); - if (!BufferIsValid(nbuffer)) + offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "heap_update_redo: invalid max offset number"); + + recdata = (char *) xlrec + SizeOfHeapUpdate; + + if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) { - if (BufferIsValid(obuffer)) - UnlockReleaseBuffer(obuffer); - return; + Assert(newblk == oldblk); + memcpy(&prefixlen, recdata, sizeof(uint16)); + recdata += sizeof(uint16); } - page = (Page) BufferGetPage(nbuffer); - - if (lsn <= PageGetLSN(page)) /* changes are applied */ + if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD) { - UnlockReleaseBuffer(nbuffer); - if (BufferIsValid(obuffer)) - UnlockReleaseBuffer(obuffer); - return; + Assert(newblk == oldblk); + memcpy(&suffixlen, recdata, sizeof(uint16)); + recdata += sizeof(uint16); } - } -newsame:; + memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen); + recdata += SizeOfHeapHeaderLen; - offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); - if (PageGetMaxOffsetNumber(page) + 1 < offnum) - elog(PANIC, "heap_update_redo: invalid max offset number"); - - recdata = (char *) xlrec + SizeOfHeapUpdate; + Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize); + htup = &tbuf.hdr; + MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); - if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) - { - Assert(samepage); - memcpy(&prefixlen, recdata, sizeof(uint16)); - recdata += sizeof(uint16); - } - if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD) - { - Assert(samepage); - memcpy(&suffixlen, recdata, sizeof(uint16)); - recdata += sizeof(uint16); - } + /* + * Reconstruct the new tuple using the prefix and/or suffix from the + * old tuple, and the data stored in the WAL record. + */ + newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits); + if (prefixlen > 0) + { + int len; + + /* copy bitmap [+ padding] [+ oid] from WAL record */ + len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits); + memcpy(newp, recdata, len); + recdata += len; + newp += len; + + /* copy prefix from old tuple */ + memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen); + newp += prefixlen; + + /* copy new tuple data from WAL record */ + len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits)); + memcpy(newp, recdata, len); + recdata += len; + newp += len; + } + else + { + /* + * copy bitmap [+ padding] [+ oid] + data from record, all in one + * go + */ + memcpy(newp, recdata, xlhdr.t_len); + recdata += xlhdr.t_len; + newp += xlhdr.t_len; + } + /* copy suffix from old tuple */ + if (suffixlen > 0) + memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); - memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen); - recdata += SizeOfHeapHeaderLen; + newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen; + htup->t_infomask2 = xlhdr.header.t_infomask2; + htup->t_infomask = xlhdr.header.t_infomask; + htup->t_hoff = xlhdr.header.t_hoff; - Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize); - htup = &tbuf.hdr; - MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); + HeapTupleHeaderSetXmin(htup, record->xl_xid); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); + /* Make sure there is no forward chain link in t_ctid */ + htup->t_ctid = xlrec->newtid; - /* - * Reconstruct the new tuple using the prefix and/or suffix from the old - * tuple, and the data stored in the WAL record. - */ - newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits); - if (prefixlen > 0) - { - int len; + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "heap_update_redo: failed to add tuple"); - /* copy bitmap [+ padding] [+ oid] from WAL record */ - len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits); - memcpy(newp, recdata, len); - recdata += len; - newp += len; + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); - /* copy prefix from old tuple */ - memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen); - newp += prefixlen; + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - /* copy new tuple data from WAL record */ - len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits)); - memcpy(newp, recdata, len); - recdata += len; - newp += len; + PageSetLSN(page, lsn); + MarkBufferDirty(nbuffer); } - else - { - /* copy bitmap [+ padding] [+ oid] + data from record, all in one go */ - memcpy(newp, recdata, xlhdr.t_len); - recdata += xlhdr.t_len; - newp += xlhdr.t_len; - } - /* copy suffix from old tuple */ - if (suffixlen > 0) - memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); - - newlen = offsetof(HeapTupleHeaderData, t_bits) +xlhdr.t_len + prefixlen + suffixlen; - htup->t_infomask2 = xlhdr.header.t_infomask2; - htup->t_infomask = xlhdr.header.t_infomask; - htup->t_hoff = xlhdr.header.t_hoff; - - HeapTupleHeaderSetXmin(htup, record->xl_xid); - HeapTupleHeaderSetCmin(htup, FirstCommandId); - HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); - /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = xlrec->newtid; - - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); - if (offnum == InvalidOffsetNumber) - elog(PANIC, "heap_update_redo: failed to add tuple"); - - if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - - PageSetLSN(page, lsn); - MarkBufferDirty(nbuffer); - UnlockReleaseBuffer(nbuffer); - - if (BufferIsValid(obuffer) && obuffer != nbuffer) + if (BufferIsValid(nbuffer) && nbuffer != obuffer) + UnlockReleaseBuffer(nbuffer); + if (BufferIsValid(obuffer)) UnlockReleaseBuffer(obuffer); /* @@ -8044,11 +7953,11 @@ newsame:; * as it did before the update, assuming the new tuple is about the same * size as the old one. * - * XXX: We don't get here if the page was restored from full page image. - * We don't bother to update the FSM in that case, it doesn't need to be + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ - if (!hot_update && freespace < BLCKSZ / 5) + if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace); @@ -8064,53 +7973,41 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - - if (lsn <= PageGetLSN(page)) /* changes are applied */ + if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, + ItemPointerGetBlockNumber(&xlrec->target.tid), + &buffer) == BLK_NEEDS_REDO) { - UnlockReleaseBuffer(buffer); - return; - } + page = (Page) BufferGetPage(buffer); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "heap_lock_redo: invalid lp"); + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_lock_redo: invalid lp"); - htup = (HeapTupleHeader) PageGetItem(page, lp); + htup = (HeapTupleHeader) PageGetItem(page, lp); - fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, - &htup->t_infomask2); + fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, + &htup->t_infomask2); - /* - * Clear relevant update flags, but only if the modified infomask says - * there's no update. - */ - if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask)) - { - HeapTupleHeaderClearHotUpdated(htup); - /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = xlrec->target.tid; + /* + * Clear relevant update flags, but only if the modified infomask says + * there's no update. + */ + if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask)) + { + HeapTupleHeaderClearHotUpdated(htup); + /* Make sure there is no forward chain link in t_ctid */ + htup->t_ctid = xlrec->target.tid; + } + HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); + HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); } - HeapTupleHeaderSetXmax(htup, xlrec->locking_xid); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -8124,42 +8021,29 @@ heap_xlog_lock_updated(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) - { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } - - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); - - if (lsn <= PageGetLSN(page)) /* changes are applied */ + if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + &buffer) == BLK_NEEDS_REDO) { - UnlockReleaseBuffer(buffer); - return; - } - - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); + page = BufferGetPage(buffer); + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "heap_xlog_lock_updated: invalid lp"); + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_xlog_lock_updated: invalid lp"); - htup = (HeapTupleHeader) PageGetItem(page, lp); + htup = (HeapTupleHeader) PageGetItem(page, lp); - fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, - &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->xmax); + fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, + &htup->t_infomask2); + HeapTupleHeaderSetXmax(htup, xlrec->xmax); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } static void @@ -8174,47 +8058,35 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) uint32 oldlen; uint32 newlen; - /* If we have a full-page image, restore it and we're done */ - if (record->xl_info & XLR_BKP_BLOCK(0)) + if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + &buffer) == BLK_NEEDS_REDO) { - (void) RestoreBackupBlock(lsn, record, 0, false, false); - return; - } + page = BufferGetPage(buffer); - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(buffer)) - return; - page = (Page) BufferGetPage(buffer); + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); - if (lsn <= PageGetLSN(page)) /* changes are applied */ - { - UnlockReleaseBuffer(buffer); - return; - } + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "heap_inplace_redo: invalid lp"); - offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); + htup = (HeapTupleHeader) PageGetItem(page, lp); - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "heap_inplace_redo: invalid lp"); + oldlen = ItemIdGetLength(lp) - htup->t_hoff; + newlen = record->xl_len - SizeOfHeapInplace; + if (oldlen != newlen) + elog(PANIC, "heap_inplace_redo: wrong tuple length"); - htup = (HeapTupleHeader) PageGetItem(page, lp); - - oldlen = ItemIdGetLength(lp) - htup->t_hoff; - newlen = record->xl_len - SizeOfHeapInplace; - if (oldlen != newlen) - elog(PANIC, "heap_inplace_redo: wrong tuple length"); - - memcpy((char *) htup + htup->t_hoff, - (char *) xlrec + SizeOfHeapInplace, - newlen); + memcpy((char *) htup + htup->t_hoff, + (char *) xlrec + SizeOfHeapInplace, + newlen); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); } void |