summaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c88
1 files changed, 75 insertions, 13 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 25d9fdea3a0..7dcc6015de9 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -57,6 +57,7 @@
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
+#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "storage/standby.h"
@@ -261,20 +262,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
{
if (ItemIdIsNormal(lpp))
{
+ HeapTupleData loctup;
bool valid;
+ loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+ loctup.t_len = ItemIdGetLength(lpp);
+ ItemPointerSet(&(loctup.t_self), page, lineoff);
+
if (all_visible)
valid = true;
else
- {
- HeapTupleData loctup;
+ valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
- loctup.t_len = ItemIdGetLength(lpp);
- ItemPointerSet(&(loctup.t_self), page, lineoff);
+ CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer);
- valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- }
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
@@ -468,12 +469,16 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
+ CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf);
+
if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
nkeys, key, valid);
if (valid)
{
+ if (!scan->rs_relpredicatelocked)
+ PredicateLockTuple(scan->rs_rd, tuple);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
@@ -741,12 +746,16 @@ heapgettup_pagemode(HeapScanDesc scan,
nkeys, key, valid);
if (valid)
{
+ if (!scan->rs_relpredicatelocked)
+ PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
}
else
{
+ if (!scan->rs_relpredicatelocked)
+ PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
@@ -1213,6 +1222,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
scan->rs_strategy = NULL; /* set in initscan */
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
+ scan->rs_relpredicatelocked = false;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1459,8 +1469,13 @@ heap_fetch(Relation relation,
*/
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
+ if (valid)
+ PredicateLockTuple(relation, tuple);
+
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ CheckForSerializableConflictOut(valid, relation, tuple, buffer);
+
if (valid)
{
/*
@@ -1506,13 +1521,15 @@ heap_fetch(Relation relation,
* heap_fetch, we do not report any pgstats count; caller may do so if wanted.
*/
bool
-heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
- bool *all_dead)
+heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
+ Snapshot snapshot, bool *all_dead)
{
Page dp = (Page) BufferGetPage(buffer);
TransactionId prev_xmax = InvalidTransactionId;
OffsetNumber offnum;
bool at_chain_start;
+ bool valid;
+ bool match_found;
if (all_dead)
*all_dead = true;
@@ -1522,6 +1539,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = true;
+ match_found = false;
/* Scan through possible multiple members of HOT-chain */
for (;;)
@@ -1552,6 +1570,8 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
heapTuple.t_len = ItemIdGetLength(lp);
+ heapTuple.t_tableOid = relation->rd_id;
+ heapTuple.t_self = *tid;
/*
* Shouldn't see a HEAP_ONLY tuple at chain start.
@@ -1569,12 +1589,18 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break;
/* If it's visible per the snapshot, we must return it */
- if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer))
+ valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
+ CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer);
+ if (valid)
{
ItemPointerSetOffsetNumber(tid, offnum);
+ PredicateLockTuple(relation, &heapTuple);
if (all_dead)
*all_dead = false;
- return true;
+ if (IsolationIsSerializable())
+ match_found = true;
+ else
+ return true;
}
/*
@@ -1603,7 +1629,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break; /* end of chain */
}
- return false;
+ return match_found;
}
/*
@@ -1622,7 +1648,7 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
- result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead);
+ result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return result;
@@ -1729,6 +1755,7 @@ heap_get_latest_tid(Relation relation,
* result candidate.
*/
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
+ CheckForSerializableConflictOut(valid, relation, &tp, buffer);
if (valid)
*tid = ctid;
@@ -1893,6 +1920,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate);
+ /*
+ * We're about to do the actual insert -- check for conflict at the
+ * relation or buffer level first, to avoid possibly having to roll
+ * back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, NULL, buffer);
+
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@@ -2193,6 +2227,12 @@ l1:
return result;
}
+ /*
+ * We're about to do the actual delete -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, &tp, buffer);
+
/* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
@@ -2546,6 +2586,12 @@ l2:
return result;
}
+ /*
+ * We're about to do the actual update -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ */
+ CheckForSerializableConflictIn(relation, &oldtup, buffer);
+
/* Fill in OID and transaction status data for newtup */
if (relation->rd_rel->relhasoids)
{
@@ -2691,6 +2737,16 @@ l2:
}
/*
+ * We're about to create the new tuple -- check for conflict first,
+ * to avoid possibly having to roll back work we've just done.
+ *
+ * NOTE: For a tuple insert, we only need to check for table locks, since
+ * predicate locking at the index level will cover ranges for anything
+ * except a table scan. Therefore, only provide the relation.
+ */
+ CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+
+ /*
* At this point newbuf and buffer are both pinned and locked, and newbuf
* has enough space for the new tuple. If they are the same buffer, only
* one pin is held.
@@ -2799,6 +2855,12 @@ l2:
END_CRIT_SECTION();
+ /*
+ * Any existing SIREAD locks on the old tuple must be linked to the new
+ * tuple for conflict detection purposes.
+ */
+ PredicateLockTupleRowVersionLink(relation, &oldtup, newtup);
+
if (newbuf != buffer)
LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);