summaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c2548
1 files changed, 0 insertions, 2548 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
deleted file mode 100644
index ada50e2730f..00000000000
--- a/src/backend/access/heap/heapam.c
+++ /dev/null
@@ -1,2548 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * heapam.c
- * heap access method code
- *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.139 2002/06/20 20:29:24 momjian Exp $
- *
- *
- * INTERFACE ROUTINES
- * relation_open - open any relation by relation OID
- * relation_openrv - open any relation specified by a RangeVar
- * relation_openr - open a system relation by name
- * relation_close - close any relation
- * heap_open - open a heap relation by relation OID
- * heap_openrv - open a heap relation specified by a RangeVar
- * heap_openr - open a system heap relation by name
- * heap_close - (now just a macro for relation_close)
- * heap_beginscan - begin relation scan
- * heap_rescan - restart a relation scan
- * heap_endscan - end relation scan
- * heap_getnext - retrieve next tuple in scan
- * heap_fetch - retrieve tuple with tid
- * heap_insert - insert tuple into a relation
- * heap_delete - delete a tuple from a relation
- * heap_update - replace a tuple in a relation with another tuple
- * heap_markpos - mark scan position
- * heap_restrpos - restore position to marked location
- *
- * NOTES
- * This file contains the heap_ routines which implement
- * the POSTGRES heap access method used for all POSTGRES
- * relations.
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/heapam.h"
-#include "access/hio.h"
-#include "access/tuptoaster.h"
-#include "access/valid.h"
-#include "access/xlogutils.h"
-#include "catalog/catalog.h"
-#include "catalog/namespace.h"
-#include "miscadmin.h"
-#include "utils/inval.h"
-#include "utils/relcache.h"
-#include "pgstat.h"
-
-
-/* comments are in heap_update */
-static xl_heaptid _locked_tuple_;
-static void _heap_unlock_tuple(void *data);
-static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
- ItemPointerData from, Buffer newbuf, HeapTuple newtup, bool move);
-
-
-/* ----------------------------------------------------------------
- * heap support routines
- * ----------------------------------------------------------------
- */
-
-/* ----------------
- * initscan - scan code common to heap_beginscan and heap_rescan
- * ----------------
- */
-static void
-initscan(HeapScanDesc scan, ScanKey key)
-{
- /*
- * Make sure we have up-to-date idea of number of blocks in relation.
- * It is sufficient to do this once at scan start, since any tuples
- * added while the scan is in progress will be invisible to my
- * transaction anyway...
- */
- scan->rs_rd->rd_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
-
- scan->rs_ctup.t_datamcxt = NULL;
- scan->rs_ctup.t_data = NULL;
- scan->rs_cbuf = InvalidBuffer;
-
- /* we don't have a marked position... */
- ItemPointerSetInvalid(&(scan->rs_mctid));
-
- /*
- * copy the scan key, if appropriate
- */
- if (key != NULL)
- memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
-}
-
-/* ----------------
- * heapgettup - fetch next heap tuple
- *
- * routine used by heap_getnext() which does most of the
- * real work in scanning tuples.
- *
- * The passed-in *buffer must be either InvalidBuffer or the pinned
- * current page of the scan. If we have to move to another page,
- * we will unpin this buffer (if valid). On return, *buffer is either
- * InvalidBuffer or the ID of a pinned buffer.
- * ----------------
- */
-static void
-heapgettup(Relation relation,
- int dir,
- HeapTuple tuple,
- Buffer *buffer,
- Snapshot snapshot,
- int nkeys,
- ScanKey key)
-{
- ItemId lpp;
- Page dp;
- BlockNumber page;
- BlockNumber pages;
- int lines;
- OffsetNumber lineoff;
- int linesleft;
- ItemPointer tid;
-
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_heapgettup);
- IncrHeapAccessStat(global_heapgettup);
-
- tid = (tuple->t_data == NULL) ? (ItemPointer) NULL : &(tuple->t_self);
-
- /*
- * debugging stuff
- *
- * check validity of arguments, here and for other functions too Note: no
- * locking manipulations needed--this is a local function
- */
-#ifdef HEAPDEBUGALL
- if (ItemPointerIsValid(tid))
- {
- elog(LOG, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
- RelationGetRelationName(relation), tid, tid->ip_blkid,
- tid->ip_posid, dir);
- }
- else
- {
- elog(LOG, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
- RelationGetRelationName(relation), tid, dir);
- }
- elog(LOG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
-
- elog(LOG, "heapgettup: relation(%c)=`%s', %p",
- relation->rd_rel->relkind, RelationGetRelationName(relation),
- snapshot);
-#endif /* !defined(HEAPLOGALL) */
-
- if (!ItemPointerIsValid(tid))
- {
- Assert(!PointerIsValid(tid));
- tid = NULL;
- }
-
- tuple->t_tableOid = relation->rd_id;
-
- /*
- * return null immediately if relation is empty
- */
- if ((pages = relation->rd_nblocks) == 0)
- {
- if (BufferIsValid(*buffer))
- ReleaseBuffer(*buffer);
- *buffer = InvalidBuffer;
- tuple->t_datamcxt = NULL;
- tuple->t_data = NULL;
- return;
- }
-
- /*
- * calculate next starting lineoff, given scan direction
- */
- if (dir == 0)
- {
- /*
- * ``no movement'' scan direction: refetch same tuple
- */
- if (tid == NULL)
- {
- if (BufferIsValid(*buffer))
- ReleaseBuffer(*buffer);
- *buffer = InvalidBuffer;
- tuple->t_datamcxt = NULL;
- tuple->t_data = NULL;
- return;
- }
-
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- ItemPointerGetBlockNumber(tid));
- if (!BufferIsValid(*buffer))
- elog(ERROR, "heapgettup: failed ReadBuffer");
-
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
- dp = (Page) BufferGetPage(*buffer);
- lineoff = ItemPointerGetOffsetNumber(tid);
- lpp = PageGetItemId(dp, lineoff);
-
- tuple->t_datamcxt = NULL;
- tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
- tuple->t_len = ItemIdGetLength(lpp);
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
- return;
- }
- else if (dir < 0)
- {
- /*
- * reverse scan direction
- */
- if (tid == NULL)
- {
- page = pages - 1; /* final page */
- }
- else
- {
- page = ItemPointerGetBlockNumber(tid); /* current page */
- }
-
- Assert(page < pages);
-
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- page);
- if (!BufferIsValid(*buffer))
- elog(ERROR, "heapgettup: failed ReadBuffer");
-
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
- dp = (Page) BufferGetPage(*buffer);
- lines = PageGetMaxOffsetNumber(dp);
- if (tid == NULL)
- {
- lineoff = lines; /* final offnum */
- }
- else
- {
- lineoff = /* previous offnum */
- OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
- }
- /* page and lineoff now reference the physically previous tid */
- }
- else
- {
- /*
- * forward scan direction
- */
- if (tid == NULL)
- {
- page = 0; /* first page */
- lineoff = FirstOffsetNumber; /* first offnum */
- }
- else
- {
- page = ItemPointerGetBlockNumber(tid); /* current page */
- lineoff = /* next offnum */
- OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
- }
-
- Assert(page < pages);
-
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- page);
- if (!BufferIsValid(*buffer))
- elog(ERROR, "heapgettup: failed ReadBuffer");
-
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
- dp = (Page) BufferGetPage(*buffer);
- lines = PageGetMaxOffsetNumber(dp);
- /* page and lineoff now reference the physically next tid */
- }
-
- /* 'dir' is now non-zero */
-
- /*
- * calculate line pointer and number of remaining items to check on
- * this page.
- */
- lpp = PageGetItemId(dp, lineoff);
- if (dir < 0)
- linesleft = lineoff - 1;
- else
- linesleft = lines - lineoff;
-
- /*
- * advance the scan until we find a qualifying tuple or run out of
- * stuff to scan
- */
- for (;;)
- {
- while (linesleft >= 0)
- {
- if (ItemIdIsUsed(lpp))
- {
- bool valid;
-
- tuple->t_datamcxt = NULL;
- tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
- tuple->t_len = ItemIdGetLength(lpp);
- ItemPointerSet(&(tuple->t_self), page, lineoff);
-
- /*
- * if current tuple qualifies, return it.
- */
- HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
- snapshot, nkeys, key, valid);
- if (valid)
- {
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- return;
- }
- }
-
- /*
- * otherwise move to the next item on the page
- */
- --linesleft;
- if (dir < 0)
- {
- --lpp; /* move back in this page's ItemId array */
- --lineoff;
- }
- else
- {
- ++lpp; /* move forward in this page's ItemId
- * array */
- ++lineoff;
- }
- }
-
- /*
- * if we get here, it means we've exhausted the items on this page
- * and it's time to move to the next.
- */
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
- /*
- * return NULL if we've exhausted all the pages
- */
- if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
- {
- if (BufferIsValid(*buffer))
- ReleaseBuffer(*buffer);
- *buffer = InvalidBuffer;
- tuple->t_datamcxt = NULL;
- tuple->t_data = NULL;
- return;
- }
-
- page = (dir < 0) ? (page - 1) : (page + 1);
-
- Assert(page < pages);
-
- *buffer = ReleaseAndReadBuffer(*buffer,
- relation,
- page);
- if (!BufferIsValid(*buffer))
- elog(ERROR, "heapgettup: failed ReadBuffer");
-
- LockBuffer(*buffer, BUFFER_LOCK_SHARE);
- dp = (Page) BufferGetPage(*buffer);
- lines = PageGetMaxOffsetNumber((Page) dp);
- linesleft = lines - 1;
- if (dir < 0)
- {
- lineoff = lines;
- lpp = PageGetItemId(dp, lines);
- }
- else
- {
- lineoff = FirstOffsetNumber;
- lpp = PageGetItemId(dp, FirstOffsetNumber);
- }
- }
-}
-
-
-#if defined(DISABLE_COMPLEX_MACRO)
-/*
- * This is formatted so oddly so that the correspondence to the macro
- * definition in access/heapam.h is maintained.
- */
-Datum
-fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
- bool *isnull)
-{
- return (
- (attnum) > 0 ?
- (
- ((isnull) ? (*(isnull) = false) : (dummyret) NULL),
- HeapTupleNoNulls(tup) ?
- (
- (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
- (
- fetchatt((tupleDesc)->attrs[(attnum) - 1],
- (char *) (tup)->t_data + (tup)->t_data->t_hoff +
- (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
- )
- :
- nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
- )
- :
- (
- att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
- (
- ((isnull) ? (*(isnull) = true) : (dummyret) NULL),
- (Datum) NULL
- )
- :
- (
- nocachegetattr((tup), (attnum), (tupleDesc), (isnull))
- )
- )
- )
- :
- (
- (Datum) NULL
- )
- );
-}
-#endif /* defined(DISABLE_COMPLEX_MACRO) */
-
-
-/* ----------------------------------------------------------------
- * heap access method interface
- * ----------------------------------------------------------------
- */
-
-/* ----------------
- * relation_open - open any relation by relation OID
- *
- * If lockmode is not "NoLock", the specified kind of lock is
- * obtained on the relation. (Generally, NoLock should only be
- * used if the caller knows it has some appropriate lock on the
- * relation already.)
- *
- * An error is raised if the relation does not exist.
- *
- * NB: a "relation" is anything with a pg_class entry. The caller is
- * expected to check whether the relkind is something it can handle.
- * ----------------
- */
-Relation
-relation_open(Oid relationId, LOCKMODE lockmode)
-{
- Relation r;
-
- Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
-
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_open);
- IncrHeapAccessStat(global_open);
-
- /* The relcache does all the real work... */
- r = RelationIdGetRelation(relationId);
-
- if (!RelationIsValid(r))
- elog(ERROR, "Relation %u does not exist", relationId);
-
- if (lockmode != NoLock)
- LockRelation(r, lockmode);
-
- return r;
-}
-
-/* ----------------
- * relation_openrv - open any relation specified by a RangeVar
- *
- * As above, but the relation is specified by a RangeVar.
- * ----------------
- */
-Relation
-relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
-{
- Oid relOid;
-
- /*
- * In bootstrap mode, don't do any namespace processing.
- */
- if (IsBootstrapProcessingMode())
- {
- Assert(relation->schemaname == NULL);
- return relation_openr(relation->relname, lockmode);
- }
-
- /*
- * Check for shared-cache-inval messages before trying to open the
- * relation. This is needed to cover the case where the name
- * identifies a rel that has been dropped and recreated since the
- * start of our transaction: if we don't flush the old syscache entry
- * then we'll latch onto that entry and suffer an error when we do
- * LockRelation. Note that relation_open does not need to do this,
- * since a relation's OID never changes.
- *
- * We skip this if asked for NoLock, on the assumption that the caller
- * has already ensured some appropriate lock is held.
- */
- if (lockmode != NoLock)
- AcceptInvalidationMessages();
-
- /* Look up the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(relation, false);
-
- /* Let relation_open do the rest */
- return relation_open(relOid, lockmode);
-}
-
-/* ----------------
- * relation_openr - open a system relation specified by name.
- *
- * As above, but the relation is specified by an unqualified name;
- * it is assumed to live in the system catalog namespace.
- * ----------------
- */
-Relation
-relation_openr(const char *sysRelationName, LOCKMODE lockmode)
-{
- Relation r;
-
- Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
-
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_openr);
- IncrHeapAccessStat(global_openr);
-
- /*
- * We assume we should not need to worry about the rel's OID changing,
- * hence no need for AcceptInvalidationMessages here.
- */
-
- /* The relcache does all the real work... */
- r = RelationSysNameGetRelation(sysRelationName);
-
- if (!RelationIsValid(r))
- elog(ERROR, "Relation \"%s\" does not exist", sysRelationName);
-
- if (lockmode != NoLock)
- LockRelation(r, lockmode);
-
- return r;
-}
-
-/* ----------------
- * relation_close - close any relation
- *
- * If lockmode is not "NoLock", we first release the specified lock.
- *
- * Note that it is often sensible to hold a lock beyond relation_close;
- * in that case, the lock is released automatically at xact end.
- * ----------------
- */
-void
-relation_close(Relation relation, LOCKMODE lockmode)
-{
- Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
-
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_close);
- IncrHeapAccessStat(global_close);
-
- if (lockmode != NoLock)
- UnlockRelation(relation, lockmode);
-
- /* The relcache does the real work... */
- RelationClose(relation);
-}
-
-
-/* ----------------
- * heap_open - open a heap relation by relation OID
- *
- * This is essentially relation_open plus check that the relation
- * is not an index or special relation. (The caller should also check
- * that it's not a view before assuming it has storage.)
- * ----------------
- */
-Relation
-heap_open(Oid relationId, LOCKMODE lockmode)
-{
- Relation r;
-
- r = relation_open(relationId, lockmode);
-
- if (r->rd_rel->relkind == RELKIND_INDEX)
- elog(ERROR, "%s is an index relation",
- RelationGetRelationName(r));
- else if (r->rd_rel->relkind == RELKIND_SPECIAL)
- elog(ERROR, "%s is a special relation",
- RelationGetRelationName(r));
-
- pgstat_initstats(&r->pgstat_info, r);
-
- return r;
-}
-
-/* ----------------
- * heap_openrv - open a heap relation specified
- * by a RangeVar node
- *
- * As above, but relation is specified by a RangeVar.
- * ----------------
- */
-Relation
-heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
-{
- Relation r;
-
- r = relation_openrv(relation, lockmode);
-
- if (r->rd_rel->relkind == RELKIND_INDEX)
- elog(ERROR, "%s is an index relation",
- RelationGetRelationName(r));
- else if (r->rd_rel->relkind == RELKIND_SPECIAL)
- elog(ERROR, "%s is a special relation",
- RelationGetRelationName(r));
-
- pgstat_initstats(&r->pgstat_info, r);
-
- return r;
-}
-
-/* ----------------
- * heap_openr - open a system heap relation specified by name.
- *
- * As above, but the relation is specified by an unqualified name;
- * it is assumed to live in the system catalog namespace.
- * ----------------
- */
-Relation
-heap_openr(const char *sysRelationName, LOCKMODE lockmode)
-{
- Relation r;
-
- r = relation_openr(sysRelationName, lockmode);
-
- if (r->rd_rel->relkind == RELKIND_INDEX)
- elog(ERROR, "%s is an index relation",
- RelationGetRelationName(r));
- else if (r->rd_rel->relkind == RELKIND_SPECIAL)
- elog(ERROR, "%s is a special relation",
- RelationGetRelationName(r));
-
- pgstat_initstats(&r->pgstat_info, r);
-
- return r;
-}
-
-
-/* ----------------
- * heap_beginscan - begin relation scan
- * ----------------
- */
-HeapScanDesc
-heap_beginscan(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- HeapScanDesc scan;
-
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_beginscan);
- IncrHeapAccessStat(global_beginscan);
-
- /*
- * sanity checks
- */
- if (!RelationIsValid(relation))
- elog(ERROR, "heap_beginscan: !RelationIsValid(relation)");
-
- /*
- * increment relation ref count while scanning relation
- *
- * This is just to make really sure the relcache entry won't go away
- * while the scan has a pointer to it. Caller should be holding the
- * rel open anyway, so this is redundant in all normal scenarios...
- */
- RelationIncrementReferenceCount(relation);
-
- /* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */
- if (relation->rd_rel->relkind == RELKIND_UNCATALOGED)
- snapshot = SnapshotSelf;
-
- /*
- * allocate and initialize scan descriptor
- */
- scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
-
- scan->rs_rd = relation;
- scan->rs_snapshot = snapshot;
- scan->rs_nkeys = nkeys;
-
- /*
- * we do this here instead of in initscan() because heap_rescan also
- * calls initscan() and we don't want to allocate memory again
- */
- if (nkeys > 0)
- scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
- else
- scan->rs_key = NULL;
-
- pgstat_initstats(&scan->rs_pgstat_info, relation);
-
- initscan(scan, key);
-
- return scan;
-}
-
-/* ----------------
- * heap_rescan - restart a relation scan
- * ----------------
- */
-void
-heap_rescan(HeapScanDesc scan,
- ScanKey key)
-{
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_rescan);
- IncrHeapAccessStat(global_rescan);
-
- /*
- * unpin scan buffers
- */
- if (BufferIsValid(scan->rs_cbuf))
- ReleaseBuffer(scan->rs_cbuf);
-
- /*
- * reinitialize scan descriptor
- */
- initscan(scan, key);
-
- pgstat_reset_heap_scan(&scan->rs_pgstat_info);
-}
-
-/* ----------------
- * heap_endscan - end relation scan
- *
- * See how to integrate with index scans.
- * Check handling if reldesc caching.
- * ----------------
- */
-void
-heap_endscan(HeapScanDesc scan)
-{
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_endscan);
- IncrHeapAccessStat(global_endscan);
-
- /* Note: no locking manipulations needed */
-
- /*
- * unpin scan buffers
- */
- if (BufferIsValid(scan->rs_cbuf))
- ReleaseBuffer(scan->rs_cbuf);
-
- /*
- * decrement relation reference count and free scan descriptor storage
- */
- RelationDecrementReferenceCount(scan->rs_rd);
-
- if (scan->rs_key)
- pfree(scan->rs_key);
-
- pfree(scan);
-}
-
-/* ----------------
- * heap_getnext - retrieve next tuple in scan
- *
- * Fix to work with index relations.
- * We don't return the buffer anymore, but you can get it from the
- * returned HeapTuple.
- * ----------------
- */
-
-#ifdef HEAPDEBUGALL
-#define HEAPDEBUG_1 \
- elog(LOG, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
- RelationGetRelationName(scan->rs_rd), scan->rs_nkeys, (int) direction)
-
-#define HEAPDEBUG_2 \
- elog(LOG, "heap_getnext returning EOS")
-
-#define HEAPDEBUG_3 \
- elog(LOG, "heap_getnext returning tuple")
-#else
-#define HEAPDEBUG_1
-#define HEAPDEBUG_2
-#define HEAPDEBUG_3
-#endif /* !defined(HEAPDEBUGALL) */
-
-
-HeapTuple
-heap_getnext(HeapScanDesc scan, ScanDirection direction)
-{
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_getnext);
- IncrHeapAccessStat(global_getnext);
-
- /* Note: no locking manipulations needed */
-
- /*
- * argument checks
- */
- if (scan == NULL)
- elog(ERROR, "heap_getnext: NULL relscan");
-
- HEAPDEBUG_1; /* heap_getnext( info ) */
-
- /*
- * Note: we depend here on the -1/0/1 encoding of ScanDirection.
- */
- heapgettup(scan->rs_rd,
- (int) direction,
- &(scan->rs_ctup),
- &(scan->rs_cbuf),
- scan->rs_snapshot,
- scan->rs_nkeys,
- scan->rs_key);
-
- if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
- {
- HEAPDEBUG_2; /* heap_getnext returning EOS */
- return NULL;
- }
-
- pgstat_count_heap_scan(&scan->rs_pgstat_info);
-
- /*
- * if we get here it means we have a new current scan tuple, so point
- * to the proper return buffer and return the tuple.
- */
-
- HEAPDEBUG_3; /* heap_getnext returning tuple */
-
- if (scan->rs_ctup.t_data != NULL)
- pgstat_count_heap_getnext(&scan->rs_pgstat_info);
-
- return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
-}
-
-/*
- * heap_fetch - retrieve tuple with given tid
- *
- * On entry, tuple->t_self is the TID to fetch. We pin the buffer holding
- * the tuple, fill in the remaining fields of *tuple, and check the tuple
- * against the specified snapshot.
- *
- * If successful (tuple found and passes snapshot time qual), then *userbuf
- * is set to the buffer holding the tuple and TRUE is returned. The caller
- * must unpin the buffer when done with the tuple.
- *
- * If the tuple is not found, then tuple->t_data is set to NULL, *userbuf
- * is set to InvalidBuffer, and FALSE is returned.
- *
- * If the tuple is found but fails the time qual check, then FALSE will be
- * returned. When the caller specifies keep_buf = true, we retain the pin
- * on the buffer and return it in *userbuf (so the caller can still access
- * the tuple); when keep_buf = false, the pin is released and *userbuf is set
- * to InvalidBuffer.
- *
- * It is somewhat inconsistent that we elog() on invalid block number but
- * return false on invalid item number. This is historical. The only
- * justification I can see is that the caller can relatively easily check the
- * block number for validity, but cannot check the item number without reading
- * the page himself.
- */
-bool
-heap_fetch(Relation relation,
- Snapshot snapshot,
- HeapTuple tuple,
- Buffer *userbuf,
- bool keep_buf,
- PgStat_Info *pgstat_info)
-{
- ItemPointer tid = &(tuple->t_self);
- ItemId lp;
- Buffer buffer;
- PageHeader dp;
- OffsetNumber offnum;
- bool valid;
-
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_fetch);
- IncrHeapAccessStat(global_fetch);
-
- /*
- * get the buffer from the relation descriptor. Note that this does a
- * buffer pin.
- */
- buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
-
- if (!BufferIsValid(buffer))
- elog(ERROR, "heap_fetch: ReadBuffer(%s, %lu) failed",
- RelationGetRelationName(relation),
- (unsigned long) ItemPointerGetBlockNumber(tid));
-
- /*
- * Need share lock on buffer to examine tuple commit status.
- */
- LockBuffer(buffer, BUFFER_LOCK_SHARE);
-
- /*
- * get the item line pointer corresponding to the requested tid
- */
- dp = (PageHeader) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(tid);
- lp = PageGetItemId(dp, offnum);
-
- /*
- * must check for deleted tuple (see for example analyze.c, which is
- * careful to pass an offnum in range, but doesn't know if the offnum
- * actually corresponds to an undeleted tuple).
- */
- if (!ItemIdIsUsed(lp))
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- *userbuf = InvalidBuffer;
- tuple->t_datamcxt = NULL;
- tuple->t_data = NULL;
- return false;
- }
-
- /*
- * fill in *tuple fields
- */
- tuple->t_datamcxt = NULL;
- tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
- tuple->t_len = ItemIdGetLength(lp);
- tuple->t_tableOid = relation->rd_id;
-
- /*
- * check time qualification of tuple, then release lock
- */
- HeapTupleSatisfies(tuple, relation, buffer, dp,
- snapshot, 0, (ScanKey) NULL, valid);
-
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
- if (valid)
- {
- /*
- * All checks passed, so return the tuple as valid. Caller is now
- * responsible for releasing the buffer.
- */
- *userbuf = buffer;
-
- /*
- * Count the successful fetch in *pgstat_info if given,
- * otherwise in the relation's default statistics area.
- */
- if (pgstat_info != NULL)
- pgstat_count_heap_fetch(pgstat_info);
- else
- pgstat_count_heap_fetch(&relation->pgstat_info);
-
- return true;
- }
-
- /* Tuple failed time qual, but maybe caller wants to see it anyway. */
- if (keep_buf)
- {
- *userbuf = buffer;
-
- return false;
- }
-
- /* Okay to release pin on buffer. */
- ReleaseBuffer(buffer);
-
- *userbuf = InvalidBuffer;
-
- return false;
-}
-
-/*
- * heap_get_latest_tid - get the latest tid of a specified tuple
- */
-ItemPointer
-heap_get_latest_tid(Relation relation,
- Snapshot snapshot,
- ItemPointer tid)
-{
- ItemId lp = NULL;
- Buffer buffer;
- PageHeader dp;
- OffsetNumber offnum;
- HeapTupleData tp;
- HeapTupleHeader t_data;
- ItemPointerData ctid;
- bool invalidBlock,
- linkend,
- valid;
-
- /*
- * get the buffer from the relation descriptor Note that this does a
- * buffer pin.
- */
-
- buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
-
- if (!BufferIsValid(buffer))
- elog(ERROR, "heap_get_latest_tid: %s relation: ReadBuffer(%lx) failed",
- RelationGetRelationName(relation), (long) tid);
-
- LockBuffer(buffer, BUFFER_LOCK_SHARE);
-
- /*
- * get the item line pointer corresponding to the requested tid
- */
- dp = (PageHeader) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(tid);
- invalidBlock = true;
- if (!PageIsNew(dp))
- {
- lp = PageGetItemId(dp, offnum);
- if (ItemIdIsUsed(lp))
- invalidBlock = false;
- }
- if (invalidBlock)
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return NULL;
- }
-
- /*
- * more sanity checks
- */
-
- tp.t_datamcxt = NULL;
- t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
- tp.t_len = ItemIdGetLength(lp);
- tp.t_self = *tid;
- ctid = tp.t_data->t_ctid;
-
- /*
- * check time qualification of tid
- */
-
- HeapTupleSatisfies(&tp, relation, buffer, dp,
- snapshot, 0, (ScanKey) NULL, valid);
-
- linkend = true;
- if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
- !ItemPointerEquals(tid, &ctid))
- linkend = false;
-
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
-
- if (!valid)
- {
- if (linkend)
- return NULL;
- heap_get_latest_tid(relation, snapshot, &ctid);
- *tid = ctid;
- }
-
- return tid;
-}
-
-/*
- * heap_insert - insert tuple into a heap
- *
- * The new tuple is stamped with current transaction ID and the specified
- * command ID.
- */
-Oid
-heap_insert(Relation relation, HeapTuple tup, CommandId cid)
-{
- Buffer buffer;
-
- /* increment access statistics */
- IncrHeapAccessStat(local_insert);
- IncrHeapAccessStat(global_insert);
-
- if (relation->rd_rel->relhasoids)
- {
- /*
- * If the object id of this tuple has already been assigned, trust
- * the caller. There are a couple of ways this can happen. At
- * initial db creation, the backend program sets oids for tuples.
- * When we define an index, we set the oid. Finally, in the
- * future, we may allow users to set their own object ids in order
- * to support a persistent object store (objects need to contain
- * pointers to one another).
- */
- if (!OidIsValid(tup->t_data->t_oid))
- tup->t_data->t_oid = newoid();
- else
- CheckMaxObjectId(tup->t_data->t_oid);
- }
-
- HeapTupleHeaderSetXmin(tup->t_data, GetCurrentTransactionId());
- HeapTupleHeaderSetCmin(tup->t_data, cid);
- HeapTupleHeaderSetXmaxInvalid(tup->t_data);
- HeapTupleHeaderSetCmax(tup->t_data, FirstCommandId);
- tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
- tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
- tup->t_tableOid = relation->rd_id;
-
-#ifdef TUPLE_TOASTER_ACTIVE
-
- /*
- * If the new tuple is too big for storage or contains already toasted
- * attributes from some other relation, invoke the toaster.
- */
- if (HeapTupleHasExtended(tup) ||
- (MAXALIGN(tup->t_len) > TOAST_TUPLE_THRESHOLD))
- heap_tuple_toast_attrs(relation, tup, NULL);
-#endif
-
- /* Find buffer to insert this tuple into */
- buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
-
- /* NO ELOG(ERROR) from here till changes are logged */
- START_CRIT_SECTION();
- RelationPutHeapTuple(relation, buffer, tup);
-
- pgstat_count_heap_insert(&relation->pgstat_info);
-
- /* XLOG stuff */
- {
- xl_heap_insert xlrec;
- xl_heap_header xlhdr;
- XLogRecPtr recptr;
- XLogRecData rdata[3];
- Page page = BufferGetPage(buffer);
- uint8 info = XLOG_HEAP_INSERT;
-
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tup->t_self;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapInsert;
- rdata[0].next = &(rdata[1]);
-
- xlhdr.t_oid = tup->t_data->t_oid;
- xlhdr.t_natts = tup->t_data->t_natts;
- xlhdr.t_hoff = tup->t_data->t_hoff;
- xlhdr.mask = tup->t_data->t_infomask;
- rdata[1].buffer = buffer;
- rdata[1].data = (char *) &xlhdr;
- rdata[1].len = SizeOfHeapHeader;
- rdata[1].next = &(rdata[2]);
-
- rdata[2].buffer = buffer;
- rdata[2].data = (char *) tup->t_data + offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].len = tup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].next = NULL;
-
- /* If this is the single and first tuple on page... */
- if (ItemPointerGetOffsetNumber(&(tup->t_self)) == FirstOffsetNumber &&
- PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
- {
- info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
- }
-
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
-
- PageSetLSN(page, recptr);
- PageSetSUI(page, ThisStartUpID);
- }
- END_CRIT_SECTION();
-
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- WriteBuffer(buffer);
-
- /*
- * If tuple is cachable, mark it for invalidation from the caches in case
- * we abort. Note it is OK to do this after WriteBuffer releases the
- * buffer, because the "tup" data structure is all in local memory,
- * not in the shared buffer.
- */
- CacheInvalidateHeapTuple(relation, tup);
-
- return tup->t_data->t_oid;
-}
-
-/*
- * simple_heap_insert - insert a tuple
- *
- * Currently, this routine differs from heap_insert only in supplying
- * a default command ID. But it should be used rather than using
- * heap_insert directly in most places where we are modifying system catalogs.
- */
-Oid
-simple_heap_insert(Relation relation, HeapTuple tup)
-{
- return heap_insert(relation, tup, GetCurrentCommandId());
-}
-
-/*
- * heap_delete - delete a tuple
- *
- * NB: do not call this directly unless you are prepared to deal with
- * concurrent-update conditions. Use simple_heap_delete instead.
- */
-int
-heap_delete(Relation relation, ItemPointer tid,
- ItemPointer ctid, CommandId cid)
-{
- ItemId lp;
- HeapTupleData tp;
- PageHeader dp;
- Buffer buffer;
- int result;
-
- /* increment access statistics */
- IncrHeapAccessStat(local_delete);
- IncrHeapAccessStat(global_delete);
-
- Assert(ItemPointerIsValid(tid));
-
- buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
-
- if (!BufferIsValid(buffer))
- elog(ERROR, "heap_delete: failed ReadBuffer");
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
- dp = (PageHeader) BufferGetPage(buffer);
- lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
- tp.t_datamcxt = NULL;
- tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
- tp.t_len = ItemIdGetLength(lp);
- tp.t_self = *tid;
- tp.t_tableOid = relation->rd_id;
-
-l1:
- result = HeapTupleSatisfiesUpdate(&tp, cid);
-
- if (result == HeapTupleInvisible)
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- elog(ERROR, "heap_delete: (am)invalid tid");
- }
- else if (result == HeapTupleBeingUpdated)
- {
- TransactionId xwait = HeapTupleHeaderGetXmax(tp.t_data);
-
- /* sleep until concurrent transaction ends */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (TransactionIdDidAbort(xwait))
- goto l1;
-
- /*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
- */
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), xwait))
- goto l1;
- if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
- {
- tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(buffer);
- }
- /* if tuple was marked for update but not updated... */
- if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- result = HeapTupleMayBeUpdated;
- else
- result = HeapTupleUpdated;
- }
- if (result != HeapTupleMayBeUpdated)
- {
- Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
- *ctid = tp.t_data->t_ctid;
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return result;
- }
-
- START_CRIT_SECTION();
- /* store transaction information of xact deleting the tuple */
- tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
- HeapTupleHeaderSetXmax(tp.t_data, GetCurrentTransactionId());
- HeapTupleHeaderSetCmax(tp.t_data, cid);
- /* XLOG stuff */
- {
- xl_heap_delete xlrec;
- XLogRecPtr recptr;
- XLogRecData rdata[2];
-
- xlrec.target.node = relation->rd_node;
- xlrec.target.tid = tp.t_self;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapDelete;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].buffer = buffer;
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].next = NULL;
-
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
-
- PageSetLSN(dp, recptr);
- PageSetSUI(dp, ThisStartUpID);
- }
- END_CRIT_SECTION();
-
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
-#ifdef TUPLE_TOASTER_ACTIVE
-
- /*
- * If the relation has toastable attributes, we need to delete no
- * longer needed items there too. We have to do this before
- * WriteBuffer because we need to look at the contents of the tuple,
- * but it's OK to release the context lock on the buffer first.
- */
- if (HeapTupleHasExtended(&tp))
- heap_tuple_toast_attrs(relation, NULL, &(tp));
-#endif
-
- pgstat_count_heap_delete(&relation->pgstat_info);
-
- /*
- * Mark tuple for invalidation from system caches at next command
- * boundary. We have to do this before WriteBuffer because we need to
- * look at the contents of the tuple, so we need to hold our refcount
- * on the buffer.
- */
- CacheInvalidateHeapTuple(relation, &tp);
-
- WriteBuffer(buffer);
-
- return HeapTupleMayBeUpdated;
-}
-
-/*
- * simple_heap_delete - delete a tuple
- *
- * This routine may be used to delete a tuple when concurrent updates of
- * the target tuple are not expected (for example, because we have a lock
- * on the relation associated with the tuple). Any failure is reported
- * via elog().
- */
-void
-simple_heap_delete(Relation relation, ItemPointer tid)
-{
- ItemPointerData ctid;
- int result;
-
- result = heap_delete(relation, tid, &ctid, GetCurrentCommandId());
- switch (result)
- {
- case HeapTupleSelfUpdated:
- /* Tuple was already updated in current command? */
- elog(ERROR, "simple_heap_delete: tuple already updated by self");
- break;
-
- case HeapTupleMayBeUpdated:
- /* done successfully */
- break;
-
- case HeapTupleUpdated:
- elog(ERROR, "simple_heap_delete: tuple concurrently updated");
- break;
-
- default:
- elog(ERROR, "Unknown status %u from heap_delete", result);
- break;
- }
-}
-
-/*
- * heap_update - replace a tuple
- *
- * NB: do not call this directly unless you are prepared to deal with
- * concurrent-update conditions. Use simple_heap_update instead.
- */
-int
-heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
- ItemPointer ctid, CommandId cid)
-{
- ItemId lp;
- HeapTupleData oldtup;
- PageHeader dp;
- Buffer buffer,
- newbuf;
- bool need_toast,
- already_marked;
- Size newtupsize,
- pagefree;
- int result;
-
- /* increment access statistics */
- IncrHeapAccessStat(local_replace);
- IncrHeapAccessStat(global_replace);
-
- Assert(ItemPointerIsValid(otid));
-
- buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
- if (!BufferIsValid(buffer))
- elog(ERROR, "heap_update: failed ReadBuffer");
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
- dp = (PageHeader) BufferGetPage(buffer);
- lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));
-
- oldtup.t_datamcxt = NULL;
- oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
- oldtup.t_len = ItemIdGetLength(lp);
- oldtup.t_self = *otid;
-
- /*
- * Note: beyond this point, use oldtup not otid to refer to old tuple.
- * otid may very well point at newtup->t_self, which we will overwrite
- * with the new tuple's location, so there's great risk of confusion
- * if we use otid anymore.
- */
-
-l2:
- result = HeapTupleSatisfiesUpdate(&oldtup, cid);
-
- if (result == HeapTupleInvisible)
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- elog(ERROR, "heap_update: (am)invalid tid");
- }
- else if (result == HeapTupleBeingUpdated)
- {
- TransactionId xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
-
- /* sleep untill concurrent transaction ends */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (TransactionIdDidAbort(xwait))
- goto l2;
-
- /*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
- */
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), xwait))
- goto l2;
- if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
- {
- oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(buffer);
- }
- /* if tuple was marked for update but not updated... */
- if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- result = HeapTupleMayBeUpdated;
- else
- result = HeapTupleUpdated;
- }
- if (result != HeapTupleMayBeUpdated)
- {
- Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
- *ctid = oldtup.t_data->t_ctid;
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return result;
- }
-
- /* Fill in OID and transaction status data for newtup */
- newtup->t_data->t_oid = oldtup.t_data->t_oid;
- newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
- newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
- HeapTupleHeaderSetXmin(newtup->t_data, GetCurrentTransactionId());
- HeapTupleHeaderSetCmin(newtup->t_data, cid);
- HeapTupleHeaderSetXmaxInvalid(newtup->t_data);
-
- /*
- * If the toaster needs to be activated, OR if the new tuple will not
- * fit on the same page as the old, then we need to release the
- * context lock (but not the pin!) on the old tuple's buffer while we
- * are off doing TOAST and/or table-file-extension work. We must mark
- * the old tuple to show that it's already being updated, else other
- * processes may try to update it themselves. To avoid second XLOG log
- * record, we use xact mgr hook to unlock old tuple without reading
- * log if xact will abort before update is logged. In the event of
- * crash prio logging, TQUAL routines will see HEAP_XMAX_UNLOGGED
- * flag...
- *
- * NOTE: this trick is useless currently but saved for future when we'll
- * implement UNDO and will re-use transaction IDs after postmaster
- * startup.
- *
- * We need to invoke the toaster if there are already any toasted values
- * present, or if the new tuple is over-threshold.
- */
- need_toast = (HeapTupleHasExtended(&oldtup) ||
- HeapTupleHasExtended(newtup) ||
- (MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD));
-
- newtupsize = MAXALIGN(newtup->t_len);
- pagefree = PageGetFreeSpace((Page) dp);
-
- if (need_toast || newtupsize > pagefree)
- {
- _locked_tuple_.node = relation->rd_node;
- _locked_tuple_.tid = oldtup.t_self;
- XactPushRollback(_heap_unlock_tuple, (void *) &_locked_tuple_);
-
- oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE);
- oldtup.t_data->t_infomask |= HEAP_XMAX_UNLOGGED;
- HeapTupleHeaderSetXmax(oldtup.t_data, GetCurrentTransactionId());
- HeapTupleHeaderSetCmax(oldtup.t_data, cid);
- already_marked = true;
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
- /* Let the toaster do its thing */
- if (need_toast)
- {
- heap_tuple_toast_attrs(relation, newtup, &oldtup);
- newtupsize = MAXALIGN(newtup->t_len);
- }
-
- /*
- * Now, do we need a new page for the tuple, or not? This is a
- * bit tricky since someone else could have added tuples to the
- * page while we weren't looking. We have to recheck the
- * available space after reacquiring the buffer lock. But don't
- * bother to do that if the former amount of free space is still
- * not enough; it's unlikely there's more free now than before.
- *
- * What's more, if we need to get a new page, we will need to acquire
- * buffer locks on both old and new pages. To avoid deadlock
- * against some other backend trying to get the same two locks in
- * the other order, we must be consistent about the order we get
- * the locks in. We use the rule "lock the lower-numbered page of
- * the relation first". To implement this, we must do
- * RelationGetBufferForTuple while not holding the lock on the old
- * page, and we must rely on it to get the locks on both pages in
- * the correct order.
- */
- if (newtupsize > pagefree)
- {
- /* Assume there's no chance to put newtup on same page. */
- newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
- buffer);
- }
- else
- {
- /* Re-acquire the lock on the old tuple's page. */
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- /* Re-check using the up-to-date free space */
- pagefree = PageGetFreeSpace((Page) dp);
- if (newtupsize > pagefree)
- {
- /*
- * Rats, it doesn't fit anymore. We must now unlock and
- * relock to avoid deadlock. Fortunately, this path
- * should seldom be taken.
- */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
- buffer);
- }
- else
- {
- /* OK, it fits here, so we're done. */
- newbuf = buffer;
- }
- }
- }
- else
- {
- /* No TOAST work needed, and it'll fit on same page */
- already_marked = false;
- newbuf = buffer;
- }
-
- pgstat_count_heap_update(&relation->pgstat_info);
-
- /*
- * At this point newbuf and buffer are both pinned and locked, and
- * newbuf has enough space for the new tuple. If they are the same
- * buffer, only one pin is held.
- */
-
- /* NO ELOG(ERROR) from here till changes are logged */
- START_CRIT_SECTION();
-
- RelationPutHeapTuple(relation, newbuf, newtup); /* insert new tuple */
-
- if (already_marked)
- {
- oldtup.t_data->t_infomask &= ~HEAP_XMAX_UNLOGGED;
- XactPopRollback();
- }
- else
- {
- oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE);
- HeapTupleHeaderSetXmax(oldtup.t_data, GetCurrentTransactionId());
- HeapTupleHeaderSetCmax(oldtup.t_data, cid);
- }
-
- /* record address of new tuple in t_ctid of old one */
- oldtup.t_data->t_ctid = newtup->t_self;
-
- /* XLOG stuff */
- {
- XLogRecPtr recptr = log_heap_update(relation, buffer, oldtup.t_self,
- newbuf, newtup, false);
-
- if (newbuf != buffer)
- {
- PageSetLSN(BufferGetPage(newbuf), recptr);
- PageSetSUI(BufferGetPage(newbuf), ThisStartUpID);
- }
- PageSetLSN(BufferGetPage(buffer), recptr);
- PageSetSUI(BufferGetPage(buffer), ThisStartUpID);
- }
-
- END_CRIT_SECTION();
-
- if (newbuf != buffer)
- LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
- /*
- * Mark old tuple for invalidation from system caches at next command
- * boundary. We have to do this before WriteBuffer because we need to
- * look at the contents of the tuple, so we need to hold our refcount.
- */
- CacheInvalidateHeapTuple(relation, &oldtup);
-
- if (newbuf != buffer)
- WriteBuffer(newbuf);
- WriteBuffer(buffer);
-
- /*
- * If new tuple is cachable, mark it for invalidation from the caches in
- * case we abort. Note it is OK to do this after WriteBuffer releases
- * the buffer, because the "newtup" data structure is all in local
- * memory, not in the shared buffer.
- */
- CacheInvalidateHeapTuple(relation, newtup);
-
- return HeapTupleMayBeUpdated;
-}
-
-/*
- * simple_heap_update - replace a tuple
- *
- * This routine may be used to update a tuple when concurrent updates of
- * the target tuple are not expected (for example, because we have a lock
- * on the relation associated with the tuple). Any failure is reported
- * via elog().
- */
-void
-simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
-{
- ItemPointerData ctid;
- int result;
-
- result = heap_update(relation, otid, tup, &ctid, GetCurrentCommandId());
- switch (result)
- {
- case HeapTupleSelfUpdated:
- /* Tuple was already updated in current command? */
- elog(ERROR, "simple_heap_update: tuple already updated by self");
- break;
-
- case HeapTupleMayBeUpdated:
- /* done successfully */
- break;
-
- case HeapTupleUpdated:
- elog(ERROR, "simple_heap_update: tuple concurrently updated");
- break;
-
- default:
- elog(ERROR, "Unknown status %u from heap_update", result);
- break;
- }
-}
-
-/*
- * heap_mark4update - mark a tuple for update
- */
-int
-heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
- CommandId cid)
-{
- ItemPointer tid = &(tuple->t_self);
- ItemId lp;
- PageHeader dp;
- int result;
-
- /* increment access statistics */
- IncrHeapAccessStat(local_mark4update);
- IncrHeapAccessStat(global_mark4update);
-
- *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
-
- if (!BufferIsValid(*buffer))
- elog(ERROR, "heap_mark4update: failed ReadBuffer");
-
- LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
-
- dp = (PageHeader) BufferGetPage(*buffer);
- lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
- tuple->t_datamcxt = NULL;
- tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
- tuple->t_len = ItemIdGetLength(lp);
-
-l3:
- result = HeapTupleSatisfiesUpdate(tuple, cid);
-
- if (result == HeapTupleInvisible)
- {
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(*buffer);
- elog(ERROR, "heap_mark4update: (am)invalid tid");
- }
- else if (result == HeapTupleBeingUpdated)
- {
- TransactionId xwait = HeapTupleHeaderGetXmax(tuple->t_data);
-
- /* sleep untill concurrent transaction ends */
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
- if (TransactionIdDidAbort(xwait))
- goto l3;
-
- /*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
- */
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), xwait))
- goto l3;
- if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
- {
- tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(*buffer);
- }
- /* if tuple was marked for update but not updated... */
- if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- result = HeapTupleMayBeUpdated;
- else
- result = HeapTupleUpdated;
- }
- if (result != HeapTupleMayBeUpdated)
- {
- Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
- tuple->t_self = tuple->t_data->t_ctid;
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- return result;
- }
-
- /*
- * XLOG stuff: no logging is required as long as we have no
- * savepoints. For savepoints private log could be used...
- */
- ((PageHeader) BufferGetPage(*buffer))->pd_sui = ThisStartUpID;
-
- /* store transaction information of xact marking the tuple */
- tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
- tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;
- HeapTupleHeaderSetXmax(tuple->t_data, GetCurrentTransactionId());
- HeapTupleHeaderSetCmax(tuple->t_data, cid);
-
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
- WriteNoReleaseBuffer(*buffer);
-
- return HeapTupleMayBeUpdated;
-}
-
-/* ----------------
- * heap_markpos - mark scan position
- *
- * Note:
- * Should only one mark be maintained per scan at one time.
- * Check if this can be done generally--say calls to get the
- * next/previous tuple and NEVER pass struct scandesc to the
- * user AM's. Now, the mark is sent to the executor for safekeeping.
- * Probably can store this info into a GENERAL scan structure.
- *
- * May be best to change this call to store the marked position
- * (up to 2?) in the scan structure itself.
- * Fix to use the proper caching structure.
- * ----------------
- */
-void
-heap_markpos(HeapScanDesc scan)
-{
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_markpos);
- IncrHeapAccessStat(global_markpos);
-
- /* Note: no locking manipulations needed */
-
- if (scan->rs_ctup.t_data != NULL)
- scan->rs_mctid = scan->rs_ctup.t_self;
- else
- ItemPointerSetInvalid(&scan->rs_mctid);
-}
-
-/* ----------------
- * heap_restrpos - restore position to marked location
- *
- * Note: there are bad side effects here. If we were past the end
- * of a relation when heapmarkpos is called, then if the relation is
- * extended via insert, then the next call to heaprestrpos will set
- * cause the added tuples to be visible when the scan continues.
- * Problems also arise if the TID's are rearranged!!!
- *
- * XXX might be better to do direct access instead of
- * using the generality of heapgettup().
- *
- * XXX It is very possible that when a scan is restored, that a tuple
- * XXX which previously qualified may fail for time range purposes, unless
- * XXX some form of locking exists (ie., portals currently can act funny.
- * ----------------
- */
-void
-heap_restrpos(HeapScanDesc scan)
-{
- /*
- * increment access statistics
- */
- IncrHeapAccessStat(local_restrpos);
- IncrHeapAccessStat(global_restrpos);
-
- /* XXX no amrestrpos checking that ammarkpos called */
-
- /* Note: no locking manipulations needed */
-
- /*
- * unpin scan buffers
- */
- if (BufferIsValid(scan->rs_cbuf))
- ReleaseBuffer(scan->rs_cbuf);
- scan->rs_cbuf = InvalidBuffer;
-
- if (!ItemPointerIsValid(&scan->rs_mctid))
- {
- scan->rs_ctup.t_datamcxt = NULL;
- scan->rs_ctup.t_data = NULL;
- }
- else
- {
- scan->rs_ctup.t_self = scan->rs_mctid;
- scan->rs_ctup.t_datamcxt = NULL;
- scan->rs_ctup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */
- heapgettup(scan->rs_rd,
- 0,
- &(scan->rs_ctup),
- &(scan->rs_cbuf),
- scan->rs_snapshot,
- 0,
- (ScanKey) NULL);
- }
-}
-
-XLogRecPtr
-log_heap_clean(Relation reln, Buffer buffer, char *unused, int unlen)
-{
- xl_heap_clean xlrec;
- XLogRecPtr recptr;
- XLogRecData rdata[3];
-
- xlrec.node = reln->rd_node;
- xlrec.block = BufferGetBlockNumber(buffer);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapClean;
- rdata[0].next = &(rdata[1]);
-
- if (unlen > 0)
- {
- rdata[1].buffer = buffer;
- rdata[1].data = unused;
- rdata[1].len = unlen;
- rdata[1].next = &(rdata[2]);
- }
- else
- rdata[0].next = &(rdata[2]);
-
- rdata[2].buffer = buffer;
- rdata[2].data = NULL;
- rdata[2].len = 0;
- rdata[2].next = NULL;
-
- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CLEAN, rdata);
-
- return (recptr);
-}
-
-static XLogRecPtr
-log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
- Buffer newbuf, HeapTuple newtup, bool move)
-{
- /*
- * Note: xlhdr is declared to have adequate size and correct alignment
- * for an xl_heap_header. However the two tids, if present at all,
- * will be packed in with no wasted space after the xl_heap_header;
- * they aren't necessarily aligned as implied by this struct
- * declaration.
- */
- struct
- {
- xl_heap_header hdr;
- TransactionId tid1;
- TransactionId tid2;
- } xlhdr;
- int hsize = SizeOfHeapHeader;
- xl_heap_update xlrec;
- XLogRecPtr recptr;
- XLogRecData rdata[4];
- Page page = BufferGetPage(newbuf);
- uint8 info = (move) ? XLOG_HEAP_MOVE : XLOG_HEAP_UPDATE;
-
- xlrec.target.node = reln->rd_node;
- xlrec.target.tid = from;
- xlrec.newtid = newtup->t_self;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = SizeOfHeapUpdate;
- rdata[0].next = &(rdata[1]);
-
- rdata[1].buffer = oldbuf;
- rdata[1].data = NULL;
- rdata[1].len = 0;
- rdata[1].next = &(rdata[2]);
-
- xlhdr.hdr.t_oid = newtup->t_data->t_oid;
- xlhdr.hdr.t_natts = newtup->t_data->t_natts;
- xlhdr.hdr.t_hoff = newtup->t_data->t_hoff;
- xlhdr.hdr.mask = newtup->t_data->t_infomask;
- if (move) /* remember xmin & xmax */
- {
- TransactionId xmax;
- TransactionId xmin;
-
- if (newtup->t_data->t_infomask & HEAP_XMAX_INVALID ||
- newtup->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- xmax = InvalidTransactionId;
- else
- xmax = HeapTupleHeaderGetXmax(newtup->t_data);
- xmin = HeapTupleHeaderGetXmin(newtup->t_data);
- memcpy((char *) &xlhdr + hsize, &xmax, sizeof(TransactionId));
- memcpy((char *) &xlhdr + hsize + sizeof(TransactionId),
- &xmin, sizeof(TransactionId));
- hsize += 2 * sizeof(TransactionId);
- }
- rdata[2].buffer = newbuf;
- rdata[2].data = (char *) &xlhdr;
- rdata[2].len = hsize;
- rdata[2].next = &(rdata[3]);
-
- rdata[3].buffer = newbuf;
- rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].next = NULL;
-
- /* If new tuple is the single and first tuple on page... */
- if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
- PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
- {
- info |= XLOG_HEAP_INIT_PAGE;
- rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
- }
-
- recptr = XLogInsert(RM_HEAP_ID, info, rdata);
-
- return (recptr);
-}
-
-XLogRecPtr
-log_heap_move(Relation reln, Buffer oldbuf, ItemPointerData from,
- Buffer newbuf, HeapTuple newtup)
-{
- return (log_heap_update(reln, oldbuf, from, newbuf, newtup, true));
-}
-
-static void
-heap_xlog_clean(bool redo, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
- Relation reln;
- Buffer buffer;
- Page page;
-
- if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
- return;
-
- reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->node);
-
- if (!RelationIsValid(reln))
- return;
-
- buffer = XLogReadBuffer(false, reln, xlrec->block);
- if (!BufferIsValid(buffer))
- elog(PANIC, "heap_clean_redo: no block");
-
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "heap_clean_redo: uninitialized page");
-
- if (XLByteLE(lsn, PageGetLSN(page)))
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
-
- if (record->xl_len > SizeOfHeapClean)
- {
- OffsetNumber unbuf[BLCKSZ / sizeof(OffsetNumber)];
- OffsetNumber *unused = unbuf;
- char *unend;
- ItemId lp;
-
- Assert((record->xl_len - SizeOfHeapClean) <= BLCKSZ);
- memcpy((char *) unbuf,
- (char *) xlrec + SizeOfHeapClean,
- record->xl_len - SizeOfHeapClean);
- unend = (char *) unbuf + (record->xl_len - SizeOfHeapClean);
-
- while ((char *) unused < unend)
- {
- lp = ((PageHeader) page)->pd_linp + *unused;
- lp->lp_flags &= ~LP_USED;
- unused++;
- }
- }
-
- PageRepairFragmentation(page, NULL);
- UnlockAndWriteBuffer(buffer);
-}
-
-static void
-heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
- Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleHeader htup;
-
- if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
- return;
-
- if (!RelationIsValid(reln))
- return;
-
- buffer = XLogReadBuffer(false, reln,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- if (!BufferIsValid(buffer))
- elog(PANIC, "heap_delete_%sdo: no block", (redo) ? "re" : "un");
-
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "heap_delete_%sdo: uninitialized page", (redo) ? "re" : "un");
-
- if (redo)
- {
- if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
- }
- else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied
- * ?! */
- elog(PANIC, "heap_delete_undo: bad page LSN");
-
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
- elog(PANIC, "heap_delete_%sdo: invalid lp", (redo) ? "re" : "un");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- if (redo)
- {
- htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
- HeapTupleHeaderSetXmax(htup, record->xl_xid);
- HeapTupleHeaderSetCmax(htup, FirstCommandId);
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
- return;
- }
-
- elog(PANIC, "heap_delete_undo: unimplemented");
-}
-
-static void
-heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
-{
- xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
- Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
-
- if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
- return;
-
- if (!RelationIsValid(reln))
- return;
-
- buffer = XLogReadBuffer((redo) ? true : false, reln,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- if (!BufferIsValid(buffer))
- return;
-
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page) &&
- (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
- elog(PANIC, "heap_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
-
- if (redo)
- {
- struct
- {
- HeapTupleHeaderData hdr;
- char data[MaxTupleSize];
- } tbuf;
- HeapTupleHeader htup;
- xl_heap_header xlhdr;
- uint32 newlen;
-
- if (record->xl_info & XLOG_HEAP_INIT_PAGE)
- PageInit(page, BufferGetPageSize(buffer), 0);
-
- if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
-
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) + 1 < offnum)
- elog(PANIC, "heap_insert_redo: invalid max offset number");
-
- newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
- Assert(newlen <= MaxTupleSize);
- memcpy((char *) &xlhdr,
- (char *) xlrec + SizeOfHeapInsert,
- SizeOfHeapHeader);
- memcpy((char *) &tbuf + offsetof(HeapTupleHeaderData, t_bits),
- (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
- newlen);
- newlen += offsetof(HeapTupleHeaderData, t_bits);
- htup = &tbuf.hdr;
- htup->t_oid = xlhdr.t_oid;
- htup->t_natts = xlhdr.t_natts;
- htup->t_hoff = xlhdr.t_hoff;
- htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask;
- HeapTupleHeaderSetXmin(htup, record->xl_xid);
- HeapTupleHeaderSetCmin(htup, FirstCommandId);
- HeapTupleHeaderSetXmax(htup, InvalidTransactionId);
- HeapTupleHeaderSetCmax(htup, FirstCommandId);
-
- offnum = PageAddItem(page, (Item) htup, newlen, offnum,
- LP_USED | OverwritePageMode);
- if (offnum == InvalidOffsetNumber)
- elog(PANIC, "heap_insert_redo: failed to add tuple");
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID); /* prev sui */
- UnlockAndWriteBuffer(buffer);
- return;
- }
-
- /* undo insert */
- if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied
- * ?! */
- elog(PANIC, "heap_insert_undo: bad page LSN");
-
- elog(PANIC, "heap_insert_undo: unimplemented");
-}
-
-/*
- * Handles UPDATE & MOVE
- */
-static void
-heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
-{
- xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
- Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
- Buffer buffer;
- bool samepage =
- (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
- ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleHeader htup;
-
- if (!RelationIsValid(reln))
- return;
-
- if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
- goto newt;
-
- /* Deal with old tuple version */
-
- buffer = XLogReadBuffer(false, reln,
- ItemPointerGetBlockNumber(&(xlrec->target.tid)));
- if (!BufferIsValid(buffer))
- elog(PANIC, "heap_update_%sdo: no block", (redo) ? "re" : "un");
-
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "heap_update_%sdo: uninitialized old page", (redo) ? "re" : "un");
-
- if (redo)
- {
- if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
- {
- UnlockAndReleaseBuffer(buffer);
- if (samepage)
- return;
- goto newt;
- }
- }
- else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied
- * ?! */
- elog(PANIC, "heap_update_undo: bad old tuple page LSN");
-
- offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
- elog(PANIC, "heap_update_%sdo: invalid lp", (redo) ? "re" : "un");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- if (redo)
- {
- if (move)
- {
- htup->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
- htup->t_infomask |= HEAP_MOVED_OFF;
- HeapTupleHeaderSetXvac(htup, record->xl_xid);
- }
- else
- {
- htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
- HeapTupleHeaderSetXmax(htup, record->xl_xid);
- HeapTupleHeaderSetCmax(htup, FirstCommandId);
- }
- if (samepage)
- goto newsame;
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID);
- UnlockAndWriteBuffer(buffer);
- goto newt;
- }
-
- elog(PANIC, "heap_update_undo: unimplemented");
-
- /* Deal with new tuple */
-
-newt:;
-
- if (redo &&
- ((record->xl_info & XLR_BKP_BLOCK_2) ||
- ((record->xl_info & XLR_BKP_BLOCK_1) && samepage)))
- return;
-
- buffer = XLogReadBuffer((redo) ? true : false, reln,
- ItemPointerGetBlockNumber(&(xlrec->newtid)));
- if (!BufferIsValid(buffer))
- return;
-
- page = (Page) BufferGetPage(buffer);
-
-newsame:;
- if (PageIsNew((PageHeader) page) &&
- (!redo || !(record->xl_info & XLOG_HEAP_INIT_PAGE)))
- elog(PANIC, "heap_update_%sdo: uninitialized page", (redo) ? "re" : "un");
-
- if (redo)
- {
- struct
- {
- HeapTupleHeaderData hdr;
- char data[MaxTupleSize];
- } tbuf;
- xl_heap_header xlhdr;
- int hsize;
- uint32 newlen;
-
- if (record->xl_info & XLOG_HEAP_INIT_PAGE)
- PageInit(page, BufferGetPageSize(buffer), 0);
-
- if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
- {
- UnlockAndReleaseBuffer(buffer);
- return;
- }
-
- offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
- if (PageGetMaxOffsetNumber(page) + 1 < offnum)
- elog(PANIC, "heap_update_redo: invalid max offset number");
-
- hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
- if (move)
- hsize += (2 * sizeof(TransactionId));
-
- newlen = record->xl_len - hsize;
- Assert(newlen <= MaxTupleSize);
- memcpy((char *) &xlhdr,
- (char *) xlrec + SizeOfHeapUpdate,
- SizeOfHeapHeader);
- memcpy((char *) &tbuf + offsetof(HeapTupleHeaderData, t_bits),
- (char *) xlrec + hsize,
- newlen);
- newlen += offsetof(HeapTupleHeaderData, t_bits);
- htup = &tbuf.hdr;
- htup->t_oid = xlhdr.t_oid;
- htup->t_natts = xlhdr.t_natts;
- htup->t_hoff = xlhdr.t_hoff;
- if (move)
- {
- TransactionId xmax;
- TransactionId xmin;
-
- hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
- memcpy(&xmax, (char *) xlrec + hsize, sizeof(TransactionId));
- memcpy(&xmin, (char *) xlrec + hsize + sizeof(TransactionId), sizeof(TransactionId));
- htup->t_infomask = xlhdr.mask;
- htup->t_infomask &= ~(HEAP_XMIN_COMMITTED |
- HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
- htup->t_infomask |= HEAP_MOVED_IN;
- HeapTupleHeaderSetXmin(htup, xmin);
- HeapTupleHeaderSetXmax(htup, xmax);
- HeapTupleHeaderSetXvac(htup, record->xl_xid);
- }
- else
- {
- htup->t_infomask = HEAP_XMAX_INVALID | xlhdr.mask;
- HeapTupleHeaderSetXmin(htup, record->xl_xid);
- HeapTupleHeaderSetCmin(htup, FirstCommandId);
- HeapTupleHeaderSetXmaxInvalid(htup);
- HeapTupleHeaderSetCmax(htup, FirstCommandId);
- }
-
- offnum = PageAddItem(page, (Item) htup, newlen, offnum,
- LP_USED | OverwritePageMode);
- if (offnum == InvalidOffsetNumber)
- elog(PANIC, "heap_update_redo: failed to add tuple");
- PageSetLSN(page, lsn);
- PageSetSUI(page, ThisStartUpID); /* prev sui */
- UnlockAndWriteBuffer(buffer);
- return;
- }
-
- /* undo */
- if (XLByteLT(PageGetLSN(page), lsn)) /* changes not applied?! */
- elog(PANIC, "heap_update_undo: bad new tuple page LSN");
-
- elog(PANIC, "heap_update_undo: unimplemented");
-
-}
-
-static void
-_heap_unlock_tuple(void *data)
-{
- xl_heaptid *xltid = (xl_heaptid *) data;
- Relation reln = XLogOpenRelation(false, RM_HEAP_ID, xltid->node);
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp;
- HeapTupleHeader htup;
-
- if (!RelationIsValid(reln))
- elog(PANIC, "_heap_unlock_tuple: can't open relation");
-
- buffer = XLogReadBuffer(false, reln,
- ItemPointerGetBlockNumber(&(xltid->tid)));
- if (!BufferIsValid(buffer))
- elog(PANIC, "_heap_unlock_tuple: can't read buffer");
-
- page = (Page) BufferGetPage(buffer);
- if (PageIsNew((PageHeader) page))
- elog(PANIC, "_heap_unlock_tuple: uninitialized page");
-
- offnum = ItemPointerGetOffsetNumber(&(xltid->tid));
- if (offnum > PageGetMaxOffsetNumber(page))
- elog(PANIC, "_heap_unlock_tuple: invalid itemid");
- lp = PageGetItemId(page, offnum);
-
- if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
- elog(PANIC, "_heap_unlock_tuple: unused/deleted tuple in rollback");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(htup), GetCurrentTransactionId()))
- elog(PANIC, "_heap_unlock_tuple: invalid xmax in rollback");
- htup->t_infomask &= ~HEAP_XMAX_UNLOGGED;
- htup->t_infomask |= HEAP_XMAX_INVALID;
- UnlockAndWriteBuffer(buffer);
- return;
-}
-
-void
-heap_redo(XLogRecPtr lsn, XLogRecord *record)
-{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
- info &= XLOG_HEAP_OPMASK;
- if (info == XLOG_HEAP_INSERT)
- heap_xlog_insert(true, lsn, record);
- else if (info == XLOG_HEAP_DELETE)
- heap_xlog_delete(true, lsn, record);
- else if (info == XLOG_HEAP_UPDATE)
- heap_xlog_update(true, lsn, record, false);
- else if (info == XLOG_HEAP_MOVE)
- heap_xlog_update(true, lsn, record, true);
- else if (info == XLOG_HEAP_CLEAN)
- heap_xlog_clean(true, lsn, record);
- else
- elog(PANIC, "heap_redo: unknown op code %u", info);
-}
-
-void
-heap_undo(XLogRecPtr lsn, XLogRecord *record)
-{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
- info &= XLOG_HEAP_OPMASK;
- if (info == XLOG_HEAP_INSERT)
- heap_xlog_insert(false, lsn, record);
- else if (info == XLOG_HEAP_DELETE)
- heap_xlog_delete(false, lsn, record);
- else if (info == XLOG_HEAP_UPDATE)
- heap_xlog_update(false, lsn, record, false);
- else if (info == XLOG_HEAP_MOVE)
- heap_xlog_update(false, lsn, record, true);
- else if (info == XLOG_HEAP_CLEAN)
- heap_xlog_clean(false, lsn, record);
- else
- elog(PANIC, "heap_undo: unknown op code %u", info);
-}
-
-static void
-out_target(char *buf, xl_heaptid *target)
-{
- sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
- target->node.tblNode, target->node.relNode,
- ItemPointerGetBlockNumber(&(target->tid)),
- ItemPointerGetOffsetNumber(&(target->tid)));
-}
-
-void
-heap_desc(char *buf, uint8 xl_info, char *rec)
-{
- uint8 info = xl_info & ~XLR_INFO_MASK;
-
- info &= XLOG_HEAP_OPMASK;
- if (info == XLOG_HEAP_INSERT)
- {
- xl_heap_insert *xlrec = (xl_heap_insert *) rec;
-
- strcat(buf, "insert: ");
- out_target(buf, &(xlrec->target));
- }
- else if (info == XLOG_HEAP_DELETE)
- {
- xl_heap_delete *xlrec = (xl_heap_delete *) rec;
-
- strcat(buf, "delete: ");
- out_target(buf, &(xlrec->target));
- }
- else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE)
- {
- xl_heap_update *xlrec = (xl_heap_update *) rec;
-
- if (info == XLOG_HEAP_UPDATE)
- strcat(buf, "update: ");
- else
- strcat(buf, "move: ");
- out_target(buf, &(xlrec->target));
- sprintf(buf + strlen(buf), "; new %u/%u",
- ItemPointerGetBlockNumber(&(xlrec->newtid)),
- ItemPointerGetOffsetNumber(&(xlrec->newtid)));
- }
- else if (info == XLOG_HEAP_CLEAN)
- {
- xl_heap_clean *xlrec = (xl_heap_clean *) rec;
-
- sprintf(buf + strlen(buf), "clean: node %u/%u; blk %u",
- xlrec->node.tblNode, xlrec->node.relNode, xlrec->block);
- }
- else
- strcat(buf, "UNKNOWN");
-}