summaryrefslogtreecommitdiff
path: root/src/backend/utils/adt/txid.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/adt/txid.c')
-rw-r--r--src/backend/utils/adt/txid.c583
1 files changed, 583 insertions, 0 deletions
diff --git a/src/backend/utils/adt/txid.c b/src/backend/utils/adt/txid.c
new file mode 100644
index 00000000000..f4267dfbdb4
--- /dev/null
+++ b/src/backend/utils/adt/txid.c
@@ -0,0 +1,583 @@
+/*-------------------------------------------------------------------------
+ * txid.c
+ *
+ * Export internal transaction IDs to user level.
+ *
+ * Note that only top-level transaction IDs are ever converted to TXID.
+ * This is important because TXIDs frequently persist beyond the global
+ * xmin horizon, or may even be shipped to other machines, so we cannot
+ * rely on being able to correlate subtransaction IDs with their parents
+ * via functions such as SubTransGetTopmostTransaction().
+ *
+ *
+ * Copyright (c) 2003-2007, PostgreSQL Global Development Group
+ * Author: Jan Wieck, Afilias USA INC.
+ * 64-bit txids: Marko Kreen, Skype Technologies
+ *
+ * $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.1 2007/10/13 23:06:26 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/xact.h"
+#include "funcapi.h"
+#include "libpq/pqformat.h"
+#include "utils/builtins.h"
+
+
+#ifndef INT64_IS_BUSTED
+/* txid will be signed int8 in database, so must limit to 63 bits */
+#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
+#else
+/* we only really have 32 bits to work with :-( */
+#define MAX_TXID UINT64CONST(0x7FFFFFFF)
+#endif
+
+/* Use unsigned variant internally */
+typedef uint64 txid;
+
+/* sprintf format code for uint64 */
+#define TXID_FMT UINT64_FORMAT
+
+/*
+ * If defined, use bsearch() function for searching for txids in snapshots
+ * that have more than the specified number of values.
+ */
+#define USE_BSEARCH_IF_NXIP_GREATER 30
+
+
+/*
+ * Snapshot containing 8byte txids.
+ */
+typedef struct
+{
+ /*
+ * 4-byte length hdr, should not be touched directly.
+ *
+ * Explicit embedding is ok as we want always correct
+ * alignment anyway.
+ */
+ int32 __varsz;
+
+ uint32 nxip; /* number of txids in xip array */
+ txid xmin;
+ txid xmax;
+ txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */
+} TxidSnapshot;
+
+#define TXID_SNAPSHOT_SIZE(nxip) \
+ (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))
+
+/*
+ * Epoch values from xact.c
+ */
+typedef struct
+{
+ TransactionId last_xid;
+ uint32 epoch;
+} TxidEpoch;
+
+
+/*
+ * Fetch epoch data from xact.c.
+ */
+static void
+load_xid_epoch(TxidEpoch *state)
+{
+ GetNextXidAndEpoch(&state->last_xid, &state->epoch);
+}
+
+/*
+ * do a TransactionId -> txid conversion for an XID near the given epoch
+ */
+static txid
+convert_xid(TransactionId xid, const TxidEpoch *state)
+{
+#ifndef INT64_IS_BUSTED
+ uint64 epoch;
+
+ /* return special xid's as-is */
+ if (!TransactionIdIsNormal(xid))
+ return (txid) xid;
+
+ /* xid can be on either side when near wrap-around */
+ epoch = (uint64) state->epoch;
+ if (xid > state->last_xid &&
+ TransactionIdPrecedes(xid, state->last_xid))
+ epoch--;
+ else if (xid < state->last_xid &&
+ TransactionIdFollows(xid, state->last_xid))
+ epoch++;
+
+ return (epoch << 32) | xid;
+#else /* INT64_IS_BUSTED */
+ /* we can't do anything with the epoch, so ignore it */
+ return (txid) xid & MAX_TXID;
+#endif /* INT64_IS_BUSTED */
+}
+
+/*
+ * txid comparator for qsort/bsearch
+ */
+static int
+cmp_txid(const void *aa, const void *bb)
+{
+ txid a = *(const txid *) aa;
+ txid b = *(const txid *) bb;
+
+ if (a < b)
+ return -1;
+ if (a > b)
+ return 1;
+ return 0;
+}
+
+/*
+ * sort a snapshot's txids, so we can use bsearch() later.
+ *
+ * For consistency of on-disk representation, we always sort even if bsearch
+ * will not be used.
+ */
+static void
+sort_snapshot(TxidSnapshot *snap)
+{
+ if (snap->nxip > 1)
+ qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid);
+}
+
+/*
+ * check txid visibility.
+ */
+static bool
+is_visible_txid(txid value, const TxidSnapshot *snap)
+{
+ if (value < snap->xmin)
+ return true;
+ else if (value >= snap->xmax)
+ return false;
+#ifdef USE_BSEARCH_IF_NXIP_GREATER
+ else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
+ {
+ void *res;
+
+ res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid);
+ /* if found, transaction is still in progress */
+ return (res) ? false : true;
+ }
+#endif
+ else
+ {
+ uint32 i;
+
+ for (i = 0; i < snap->nxip; i++)
+ {
+ if (value == snap->xip[i])
+ return false;
+ }
+ return true;
+ }
+}
+
+/*
+ * helper functions to use StringInfo for TxidSnapshot creation.
+ */
+
+static StringInfo
+buf_init(txid xmin, txid xmax)
+{
+ TxidSnapshot snap;
+ StringInfo buf;
+
+ snap.xmin = xmin;
+ snap.xmax = xmax;
+ snap.nxip = 0;
+
+ buf = makeStringInfo();
+ appendBinaryStringInfo(buf, (char *)&snap, TXID_SNAPSHOT_SIZE(0));
+ return buf;
+}
+
+static void
+buf_add_txid(StringInfo buf, txid xid)
+{
+ TxidSnapshot *snap = (TxidSnapshot *)buf->data;
+
+ /* do this before possible realloc */
+ snap->nxip++;
+
+ appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid));
+}
+
+static TxidSnapshot *
+buf_finalize(StringInfo buf)
+{
+ TxidSnapshot *snap = (TxidSnapshot *)buf->data;
+
+ SET_VARSIZE(snap, buf->len);
+
+ /* buf is not needed anymore */
+ buf->data = NULL;
+ pfree(buf);
+
+ return snap;
+}
+
+/*
+ * simple number parser.
+ *
+ * We return 0 on error, which is invalid value for txid.
+ */
+static txid
+str2txid(const char *s, const char **endp)
+{
+ txid val = 0;
+ txid cutoff = MAX_TXID / 10;
+ txid cutlim = MAX_TXID % 10;
+
+ for (; *s; s++)
+ {
+ unsigned d;
+
+ if (*s < '0' || *s > '9')
+ break;
+ d = *s - '0';
+
+ /*
+ * check for overflow
+ */
+ if (val > cutoff || (val == cutoff && d > cutlim))
+ {
+ val = 0;
+ break;
+ }
+
+ val = val * 10 + d;
+ }
+ if (endp)
+ *endp = s;
+ return val;
+}
+
+/*
+ * parse snapshot from cstring
+ */
+static TxidSnapshot *
+parse_snapshot(const char *str)
+{
+ txid xmin;
+ txid xmax;
+ txid last_val = 0, val;
+ const char *str_start = str;
+ const char *endp;
+ StringInfo buf;
+
+ xmin = str2txid(str, &endp);
+ if (*endp != ':')
+ goto bad_format;
+ str = endp + 1;
+
+ xmax = str2txid(str, &endp);
+ if (*endp != ':')
+ goto bad_format;
+ str = endp + 1;
+
+ /* it should look sane */
+ if (xmin == 0 || xmax == 0 || xmin > xmax)
+ goto bad_format;
+
+ /* allocate buffer */
+ buf = buf_init(xmin, xmax);
+
+ /* loop over values */
+ while (*str != '\0')
+ {
+ /* read next value */
+ val = str2txid(str, &endp);
+ str = endp;
+
+ /* require the input to be in order */
+ if (val < xmin || val >= xmax || val <= last_val)
+ goto bad_format;
+
+ buf_add_txid(buf, val);
+ last_val = val;
+
+ if (*str == ',')
+ str++;
+ else if (*str != '\0')
+ goto bad_format;
+ }
+
+ return buf_finalize(buf);
+
+bad_format:
+ elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start);
+ return NULL;
+}
+
+/*
+ * Public functions.
+ *
+ * txid_current() and txid_current_snapshot() are the only ones that
+ * communicate with core xid machinery. All the others work on data
+ * returned by them.
+ */
+
+/*
+ * txid_current() returns int8
+ *
+ * Return the current toplevel transaction ID as TXID
+ */
+Datum
+txid_current(PG_FUNCTION_ARGS)
+{
+ txid val;
+ TxidEpoch state;
+
+ load_xid_epoch(&state);
+
+ val = convert_xid(GetTopTransactionId(), &state);
+
+ PG_RETURN_INT64(val);
+}
+
+/*
+ * txid_current_snapshot() returns txid_snapshot
+ *
+ * Return current snapshot in TXID format
+ *
+ * Note that only top-transaction XIDs are included in the snapshot.
+ */
+Datum
+txid_current_snapshot(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap;
+ uint32 nxip, i, size;
+ TxidEpoch state;
+ Snapshot cur;
+
+ cur = ActiveSnapshot;
+ if (cur == NULL)
+ elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");
+
+ load_xid_epoch(&state);
+
+ /* allocate */
+ nxip = cur->xcnt;
+ size = TXID_SNAPSHOT_SIZE(nxip);
+ snap = palloc(size);
+ SET_VARSIZE(snap, size);
+
+ /* fill */
+ snap->xmin = convert_xid(cur->xmin, &state);
+ snap->xmax = convert_xid(cur->xmax, &state);
+ snap->nxip = nxip;
+ for (i = 0; i < nxip; i++)
+ snap->xip[i] = convert_xid(cur->xip[i], &state);
+
+ /* we want them guaranteed to be in ascending order */
+ sort_snapshot(snap);
+
+ PG_RETURN_POINTER(snap);
+}
+
+/*
+ * txid_snapshot_in(cstring) returns txid_snapshot
+ *
+ * input function for type txid_snapshot
+ */
+Datum
+txid_snapshot_in(PG_FUNCTION_ARGS)
+{
+ char *str = PG_GETARG_CSTRING(0);
+ TxidSnapshot *snap;
+
+ snap = parse_snapshot(str);
+
+ PG_RETURN_POINTER(snap);
+}
+
+/*
+ * txid_snapshot_out(txid_snapshot) returns cstring
+ *
+ * output function for type txid_snapshot
+ */
+Datum
+txid_snapshot_out(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
+ StringInfoData str;
+ uint32 i;
+
+ initStringInfo(&str);
+
+ appendStringInfo(&str, TXID_FMT ":", snap->xmin);
+ appendStringInfo(&str, TXID_FMT ":", snap->xmax);
+
+ for (i = 0; i < snap->nxip; i++)
+ {
+ if (i > 0)
+ appendStringInfoChar(&str, ',');
+ appendStringInfo(&str, TXID_FMT, snap->xip[i]);
+ }
+
+ PG_RETURN_CSTRING(str.data);
+}
+
+/*
+ * txid_snapshot_recv(internal) returns txid_snapshot
+ *
+ * binary input function for type txid_snapshot
+ *
+ * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
+ */
+Datum
+txid_snapshot_recv(PG_FUNCTION_ARGS)
+{
+ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+ TxidSnapshot *snap;
+ txid last = 0;
+ int nxip;
+ int i;
+ int avail;
+ int expect;
+ txid xmin, xmax;
+
+ /*
+ * load nxip and check for nonsense.
+ *
+ * (nxip > avail) check is against int overflows in 'expect'.
+ */
+ nxip = pq_getmsgint(buf, 4);
+ avail = buf->len - buf->cursor;
+ expect = 8 + 8 + nxip * 8;
+ if (nxip < 0 || nxip > avail || expect > avail)
+ goto bad_format;
+
+ xmin = pq_getmsgint64(buf);
+ xmax = pq_getmsgint64(buf);
+ if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
+ goto bad_format;
+
+ snap = palloc(TXID_SNAPSHOT_SIZE(nxip));
+ snap->xmin = xmin;
+ snap->xmax = xmax;
+ snap->nxip = nxip;
+ SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
+
+ for (i = 0; i < nxip; i++)
+ {
+ txid cur = pq_getmsgint64(buf);
+ if (cur <= last || cur < xmin || cur >= xmax)
+ goto bad_format;
+ snap->xip[i] = cur;
+ last = cur;
+ }
+ PG_RETURN_POINTER(snap);
+
+bad_format:
+ elog(ERROR, "invalid snapshot data");
+ return (Datum)NULL;
+}
+
+/*
+ * txid_snapshot_send(txid_snapshot) returns bytea
+ *
+ * binary output function for type txid_snapshot
+ *
+ * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
+ */
+Datum
+txid_snapshot_send(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0);
+ StringInfoData buf;
+ uint32 i;
+
+ pq_begintypsend(&buf);
+ pq_sendint(&buf, snap->nxip, 4);
+ pq_sendint64(&buf, snap->xmin);
+ pq_sendint64(&buf, snap->xmax);
+ for (i = 0; i < snap->nxip; i++)
+ pq_sendint64(&buf, snap->xip[i]);
+ PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
+
+/*
+ * txid_visible_in_snapshot(int8, txid_snapshot) returns bool
+ *
+ * is txid visible in snapshot ?
+ */
+Datum
+txid_visible_in_snapshot(PG_FUNCTION_ARGS)
+{
+ txid value = PG_GETARG_INT64(0);
+ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1);
+
+ PG_RETURN_BOOL(is_visible_txid(value, snap));
+}
+
+/*
+ * txid_snapshot_xmin(txid_snapshot) returns int8
+ *
+ * return snapshot's xmin
+ */
+Datum
+txid_snapshot_xmin(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
+
+ PG_RETURN_INT64(snap->xmin);
+}
+
+/*
+ * txid_snapshot_xmax(txid_snapshot) returns int8
+ *
+ * return snapshot's xmax
+ */
+Datum
+txid_snapshot_xmax(PG_FUNCTION_ARGS)
+{
+ TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
+
+ PG_RETURN_INT64(snap->xmax);
+}
+
+/*
+ * txid_snapshot_xip(txid_snapshot) returns setof int8
+ *
+ * return in-progress TXIDs in snapshot.
+ */
+Datum
+txid_snapshot_xip(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *fctx;
+ TxidSnapshot *snap;
+ txid value;
+
+ /* on first call initialize snap_state and get copy of snapshot */
+ if (SRF_IS_FIRSTCALL()) {
+ TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);
+
+ fctx = SRF_FIRSTCALL_INIT();
+
+ /* make a copy of user snapshot */
+ snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
+ memcpy(snap, arg, VARSIZE(arg));
+
+ fctx->user_fctx = snap;
+ }
+
+ /* return values one-by-one */
+ fctx = SRF_PERCALL_SETUP();
+ snap = fctx->user_fctx;
+ if (fctx->call_cntr < snap->nxip) {
+ value = snap->xip[fctx->call_cntr];
+ SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
+ } else {
+ SRF_RETURN_DONE(fctx);
+ }
+}