diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 88 |
1 files changed, 75 insertions, 13 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 25d9fdea3a0..7dcc6015de9 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -57,6 +57,7 @@ #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/lmgr.h" +#include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" #include "storage/standby.h" @@ -261,20 +262,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) { if (ItemIdIsNormal(lpp)) { + HeapTupleData loctup; bool valid; + loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + loctup.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(loctup.t_self), page, lineoff); + if (all_visible) valid = true; else - { - HeapTupleData loctup; + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - loctup.t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(loctup.t_self), page, lineoff); + CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer); - valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - } if (valid) scan->rs_vistuples[ntup++] = lineoff; } @@ -468,12 +469,16 @@ heapgettup(HeapScanDesc scan, snapshot, scan->rs_cbuf); + CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf); + if (valid && key != NULL) HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), nkeys, key, valid); if (valid) { + if (!scan->rs_relpredicatelocked) + PredicateLockTuple(scan->rs_rd, tuple); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); return; } @@ -741,12 +746,16 @@ heapgettup_pagemode(HeapScanDesc scan, nkeys, key, valid); if (valid) { + if (!scan->rs_relpredicatelocked) + PredicateLockTuple(scan->rs_rd, tuple); scan->rs_cindex = lineindex; return; } } else { + if (!scan->rs_relpredicatelocked) + PredicateLockTuple(scan->rs_rd, tuple); scan->rs_cindex = lineindex; return; } @@ -1213,6 +1222,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_strategy = NULL; /* set in initscan */ scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; + scan->rs_relpredicatelocked = false; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1459,8 +1469,13 @@ heap_fetch(Relation relation, */ valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); + if (valid) + PredicateLockTuple(relation, tuple); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + CheckForSerializableConflictOut(valid, relation, tuple, buffer); + if (valid) { /* @@ -1506,13 +1521,15 @@ heap_fetch(Relation relation, * heap_fetch, we do not report any pgstats count; caller may do so if wanted. */ bool -heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, - bool *all_dead) +heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, + Snapshot snapshot, bool *all_dead) { Page dp = (Page) BufferGetPage(buffer); TransactionId prev_xmax = InvalidTransactionId; OffsetNumber offnum; bool at_chain_start; + bool valid; + bool match_found; if (all_dead) *all_dead = true; @@ -1522,6 +1539,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer)); offnum = ItemPointerGetOffsetNumber(tid); at_chain_start = true; + match_found = false; /* Scan through possible multiple members of HOT-chain */ for (;;) @@ -1552,6 +1570,8 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp); heapTuple.t_len = ItemIdGetLength(lp); + heapTuple.t_tableOid = relation->rd_id; + heapTuple.t_self = *tid; /* * Shouldn't see a HEAP_ONLY tuple at chain start. @@ -1569,12 +1589,18 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, break; /* If it's visible per the snapshot, we must return it */ - if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer)) + valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer); + CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer); + if (valid) { ItemPointerSetOffsetNumber(tid, offnum); + PredicateLockTuple(relation, &heapTuple); if (all_dead) *all_dead = false; - return true; + if (IsolationIsSerializable()) + match_found = true; + else + return true; } /* @@ -1603,7 +1629,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot, break; /* end of chain */ } - return false; + return match_found; } /* @@ -1622,7 +1648,7 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(buffer, BUFFER_LOCK_SHARE); - result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead); + result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); return result; @@ -1729,6 +1755,7 @@ heap_get_latest_tid(Relation relation, * result candidate. */ valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer); + CheckForSerializableConflictOut(valid, relation, &tp, buffer); if (valid) *tid = ctid; @@ -1893,6 +1920,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate); + /* + * We're about to do the actual insert -- check for conflict at the + * relation or buffer level first, to avoid possibly having to roll + * back work we've just done. + */ + CheckForSerializableConflictIn(relation, NULL, buffer); + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -2193,6 +2227,12 @@ l1: return result; } + /* + * We're about to do the actual delete -- check for conflict first, + * to avoid possibly having to roll back work we've just done. + */ + CheckForSerializableConflictIn(relation, &tp, buffer); + /* replace cid with a combo cid if necessary */ HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); @@ -2546,6 +2586,12 @@ l2: return result; } + /* + * We're about to do the actual update -- check for conflict first, + * to avoid possibly having to roll back work we've just done. + */ + CheckForSerializableConflictIn(relation, &oldtup, buffer); + /* Fill in OID and transaction status data for newtup */ if (relation->rd_rel->relhasoids) { @@ -2691,6 +2737,16 @@ l2: } /* + * We're about to create the new tuple -- check for conflict first, + * to avoid possibly having to roll back work we've just done. + * + * NOTE: For a tuple insert, we only need to check for table locks, since + * predicate locking at the index level will cover ranges for anything + * except a table scan. Therefore, only provide the relation. + */ + CheckForSerializableConflictIn(relation, NULL, InvalidBuffer); + + /* * At this point newbuf and buffer are both pinned and locked, and newbuf * has enough space for the new tuple. If they are the same buffer, only * one pin is held. @@ -2799,6 +2855,12 @@ l2: END_CRIT_SECTION(); + /* + * Any existing SIREAD locks on the old tuple must be linked to the new + * tuple for conflict detection purposes. + */ + PredicateLockTupleRowVersionLink(relation, &oldtup, newtup); + if (newbuf != buffer) LockBuffer(newbuf, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); |