diff options
Diffstat (limited to 'src/backend/utils/adt/txid.c')
-rw-r--r-- | src/backend/utils/adt/txid.c | 583 |
1 files changed, 583 insertions, 0 deletions
diff --git a/src/backend/utils/adt/txid.c b/src/backend/utils/adt/txid.c new file mode 100644 index 00000000000..f4267dfbdb4 --- /dev/null +++ b/src/backend/utils/adt/txid.c @@ -0,0 +1,583 @@ +/*------------------------------------------------------------------------- + * txid.c + * + * Export internal transaction IDs to user level. + * + * Note that only top-level transaction IDs are ever converted to TXID. + * This is important because TXIDs frequently persist beyond the global + * xmin horizon, or may even be shipped to other machines, so we cannot + * rely on being able to correlate subtransaction IDs with their parents + * via functions such as SubTransGetTopmostTransaction(). + * + * + * Copyright (c) 2003-2007, PostgreSQL Global Development Group + * Author: Jan Wieck, Afilias USA INC. + * 64-bit txids: Marko Kreen, Skype Technologies + * + * $PostgreSQL: pgsql/src/backend/utils/adt/txid.c,v 1.1 2007/10/13 23:06:26 tgl Exp $ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/xact.h" +#include "funcapi.h" +#include "libpq/pqformat.h" +#include "utils/builtins.h" + + +#ifndef INT64_IS_BUSTED +/* txid will be signed int8 in database, so must limit to 63 bits */ +#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF) +#else +/* we only really have 32 bits to work with :-( */ +#define MAX_TXID UINT64CONST(0x7FFFFFFF) +#endif + +/* Use unsigned variant internally */ +typedef uint64 txid; + +/* sprintf format code for uint64 */ +#define TXID_FMT UINT64_FORMAT + +/* + * If defined, use bsearch() function for searching for txids in snapshots + * that have more than the specified number of values. + */ +#define USE_BSEARCH_IF_NXIP_GREATER 30 + + +/* + * Snapshot containing 8byte txids. + */ +typedef struct +{ + /* + * 4-byte length hdr, should not be touched directly. + * + * Explicit embedding is ok as we want always correct + * alignment anyway. + */ + int32 __varsz; + + uint32 nxip; /* number of txids in xip array */ + txid xmin; + txid xmax; + txid xip[1]; /* in-progress txids, xmin <= xip[i] < xmax */ +} TxidSnapshot; + +#define TXID_SNAPSHOT_SIZE(nxip) \ + (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip)) + +/* + * Epoch values from xact.c + */ +typedef struct +{ + TransactionId last_xid; + uint32 epoch; +} TxidEpoch; + + +/* + * Fetch epoch data from xact.c. + */ +static void +load_xid_epoch(TxidEpoch *state) +{ + GetNextXidAndEpoch(&state->last_xid, &state->epoch); +} + +/* + * do a TransactionId -> txid conversion for an XID near the given epoch + */ +static txid +convert_xid(TransactionId xid, const TxidEpoch *state) +{ +#ifndef INT64_IS_BUSTED + uint64 epoch; + + /* return special xid's as-is */ + if (!TransactionIdIsNormal(xid)) + return (txid) xid; + + /* xid can be on either side when near wrap-around */ + epoch = (uint64) state->epoch; + if (xid > state->last_xid && + TransactionIdPrecedes(xid, state->last_xid)) + epoch--; + else if (xid < state->last_xid && + TransactionIdFollows(xid, state->last_xid)) + epoch++; + + return (epoch << 32) | xid; +#else /* INT64_IS_BUSTED */ + /* we can't do anything with the epoch, so ignore it */ + return (txid) xid & MAX_TXID; +#endif /* INT64_IS_BUSTED */ +} + +/* + * txid comparator for qsort/bsearch + */ +static int +cmp_txid(const void *aa, const void *bb) +{ + txid a = *(const txid *) aa; + txid b = *(const txid *) bb; + + if (a < b) + return -1; + if (a > b) + return 1; + return 0; +} + +/* + * sort a snapshot's txids, so we can use bsearch() later. + * + * For consistency of on-disk representation, we always sort even if bsearch + * will not be used. + */ +static void +sort_snapshot(TxidSnapshot *snap) +{ + if (snap->nxip > 1) + qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid); +} + +/* + * check txid visibility. + */ +static bool +is_visible_txid(txid value, const TxidSnapshot *snap) +{ + if (value < snap->xmin) + return true; + else if (value >= snap->xmax) + return false; +#ifdef USE_BSEARCH_IF_NXIP_GREATER + else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER) + { + void *res; + + res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid); + /* if found, transaction is still in progress */ + return (res) ? false : true; + } +#endif + else + { + uint32 i; + + for (i = 0; i < snap->nxip; i++) + { + if (value == snap->xip[i]) + return false; + } + return true; + } +} + +/* + * helper functions to use StringInfo for TxidSnapshot creation. + */ + +static StringInfo +buf_init(txid xmin, txid xmax) +{ + TxidSnapshot snap; + StringInfo buf; + + snap.xmin = xmin; + snap.xmax = xmax; + snap.nxip = 0; + + buf = makeStringInfo(); + appendBinaryStringInfo(buf, (char *)&snap, TXID_SNAPSHOT_SIZE(0)); + return buf; +} + +static void +buf_add_txid(StringInfo buf, txid xid) +{ + TxidSnapshot *snap = (TxidSnapshot *)buf->data; + + /* do this before possible realloc */ + snap->nxip++; + + appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid)); +} + +static TxidSnapshot * +buf_finalize(StringInfo buf) +{ + TxidSnapshot *snap = (TxidSnapshot *)buf->data; + + SET_VARSIZE(snap, buf->len); + + /* buf is not needed anymore */ + buf->data = NULL; + pfree(buf); + + return snap; +} + +/* + * simple number parser. + * + * We return 0 on error, which is invalid value for txid. + */ +static txid +str2txid(const char *s, const char **endp) +{ + txid val = 0; + txid cutoff = MAX_TXID / 10; + txid cutlim = MAX_TXID % 10; + + for (; *s; s++) + { + unsigned d; + + if (*s < '0' || *s > '9') + break; + d = *s - '0'; + + /* + * check for overflow + */ + if (val > cutoff || (val == cutoff && d > cutlim)) + { + val = 0; + break; + } + + val = val * 10 + d; + } + if (endp) + *endp = s; + return val; +} + +/* + * parse snapshot from cstring + */ +static TxidSnapshot * +parse_snapshot(const char *str) +{ + txid xmin; + txid xmax; + txid last_val = 0, val; + const char *str_start = str; + const char *endp; + StringInfo buf; + + xmin = str2txid(str, &endp); + if (*endp != ':') + goto bad_format; + str = endp + 1; + + xmax = str2txid(str, &endp); + if (*endp != ':') + goto bad_format; + str = endp + 1; + + /* it should look sane */ + if (xmin == 0 || xmax == 0 || xmin > xmax) + goto bad_format; + + /* allocate buffer */ + buf = buf_init(xmin, xmax); + + /* loop over values */ + while (*str != '\0') + { + /* read next value */ + val = str2txid(str, &endp); + str = endp; + + /* require the input to be in order */ + if (val < xmin || val >= xmax || val <= last_val) + goto bad_format; + + buf_add_txid(buf, val); + last_val = val; + + if (*str == ',') + str++; + else if (*str != '\0') + goto bad_format; + } + + return buf_finalize(buf); + +bad_format: + elog(ERROR, "invalid input for txid_snapshot: \"%s\"", str_start); + return NULL; +} + +/* + * Public functions. + * + * txid_current() and txid_current_snapshot() are the only ones that + * communicate with core xid machinery. All the others work on data + * returned by them. + */ + +/* + * txid_current() returns int8 + * + * Return the current toplevel transaction ID as TXID + */ +Datum +txid_current(PG_FUNCTION_ARGS) +{ + txid val; + TxidEpoch state; + + load_xid_epoch(&state); + + val = convert_xid(GetTopTransactionId(), &state); + + PG_RETURN_INT64(val); +} + +/* + * txid_current_snapshot() returns txid_snapshot + * + * Return current snapshot in TXID format + * + * Note that only top-transaction XIDs are included in the snapshot. + */ +Datum +txid_current_snapshot(PG_FUNCTION_ARGS) +{ + TxidSnapshot *snap; + uint32 nxip, i, size; + TxidEpoch state; + Snapshot cur; + + cur = ActiveSnapshot; + if (cur == NULL) + elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL"); + + load_xid_epoch(&state); + + /* allocate */ + nxip = cur->xcnt; + size = TXID_SNAPSHOT_SIZE(nxip); + snap = palloc(size); + SET_VARSIZE(snap, size); + + /* fill */ + snap->xmin = convert_xid(cur->xmin, &state); + snap->xmax = convert_xid(cur->xmax, &state); + snap->nxip = nxip; + for (i = 0; i < nxip; i++) + snap->xip[i] = convert_xid(cur->xip[i], &state); + + /* we want them guaranteed to be in ascending order */ + sort_snapshot(snap); + + PG_RETURN_POINTER(snap); +} + +/* + * txid_snapshot_in(cstring) returns txid_snapshot + * + * input function for type txid_snapshot + */ +Datum +txid_snapshot_in(PG_FUNCTION_ARGS) +{ + char *str = PG_GETARG_CSTRING(0); + TxidSnapshot *snap; + + snap = parse_snapshot(str); + + PG_RETURN_POINTER(snap); +} + +/* + * txid_snapshot_out(txid_snapshot) returns cstring + * + * output function for type txid_snapshot + */ +Datum +txid_snapshot_out(PG_FUNCTION_ARGS) +{ + TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); + StringInfoData str; + uint32 i; + + initStringInfo(&str); + + appendStringInfo(&str, TXID_FMT ":", snap->xmin); + appendStringInfo(&str, TXID_FMT ":", snap->xmax); + + for (i = 0; i < snap->nxip; i++) + { + if (i > 0) + appendStringInfoChar(&str, ','); + appendStringInfo(&str, TXID_FMT, snap->xip[i]); + } + + PG_RETURN_CSTRING(str.data); +} + +/* + * txid_snapshot_recv(internal) returns txid_snapshot + * + * binary input function for type txid_snapshot + * + * format: int4 nxip, int8 xmin, int8 xmax, int8 xip + */ +Datum +txid_snapshot_recv(PG_FUNCTION_ARGS) +{ + StringInfo buf = (StringInfo) PG_GETARG_POINTER(0); + TxidSnapshot *snap; + txid last = 0; + int nxip; + int i; + int avail; + int expect; + txid xmin, xmax; + + /* + * load nxip and check for nonsense. + * + * (nxip > avail) check is against int overflows in 'expect'. + */ + nxip = pq_getmsgint(buf, 4); + avail = buf->len - buf->cursor; + expect = 8 + 8 + nxip * 8; + if (nxip < 0 || nxip > avail || expect > avail) + goto bad_format; + + xmin = pq_getmsgint64(buf); + xmax = pq_getmsgint64(buf); + if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID) + goto bad_format; + + snap = palloc(TXID_SNAPSHOT_SIZE(nxip)); + snap->xmin = xmin; + snap->xmax = xmax; + snap->nxip = nxip; + SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip)); + + for (i = 0; i < nxip; i++) + { + txid cur = pq_getmsgint64(buf); + if (cur <= last || cur < xmin || cur >= xmax) + goto bad_format; + snap->xip[i] = cur; + last = cur; + } + PG_RETURN_POINTER(snap); + +bad_format: + elog(ERROR, "invalid snapshot data"); + return (Datum)NULL; +} + +/* + * txid_snapshot_send(txid_snapshot) returns bytea + * + * binary output function for type txid_snapshot + * + * format: int4 nxip, int8 xmin, int8 xmax, int8 xip + */ +Datum +txid_snapshot_send(PG_FUNCTION_ARGS) +{ + TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0); + StringInfoData buf; + uint32 i; + + pq_begintypsend(&buf); + pq_sendint(&buf, snap->nxip, 4); + pq_sendint64(&buf, snap->xmin); + pq_sendint64(&buf, snap->xmax); + for (i = 0; i < snap->nxip; i++) + pq_sendint64(&buf, snap->xip[i]); + PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); +} + +/* + * txid_visible_in_snapshot(int8, txid_snapshot) returns bool + * + * is txid visible in snapshot ? + */ +Datum +txid_visible_in_snapshot(PG_FUNCTION_ARGS) +{ + txid value = PG_GETARG_INT64(0); + TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1); + + PG_RETURN_BOOL(is_visible_txid(value, snap)); +} + +/* + * txid_snapshot_xmin(txid_snapshot) returns int8 + * + * return snapshot's xmin + */ +Datum +txid_snapshot_xmin(PG_FUNCTION_ARGS) +{ + TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); + + PG_RETURN_INT64(snap->xmin); +} + +/* + * txid_snapshot_xmax(txid_snapshot) returns int8 + * + * return snapshot's xmax + */ +Datum +txid_snapshot_xmax(PG_FUNCTION_ARGS) +{ + TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); + + PG_RETURN_INT64(snap->xmax); +} + +/* + * txid_snapshot_xip(txid_snapshot) returns setof int8 + * + * return in-progress TXIDs in snapshot. + */ +Datum +txid_snapshot_xip(PG_FUNCTION_ARGS) +{ + FuncCallContext *fctx; + TxidSnapshot *snap; + txid value; + + /* on first call initialize snap_state and get copy of snapshot */ + if (SRF_IS_FIRSTCALL()) { + TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); + + fctx = SRF_FIRSTCALL_INIT(); + + /* make a copy of user snapshot */ + snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg)); + memcpy(snap, arg, VARSIZE(arg)); + + fctx->user_fctx = snap; + } + + /* return values one-by-one */ + fctx = SRF_PERCALL_SETUP(); + snap = fctx->user_fctx; + if (fctx->call_cntr < snap->nxip) { + value = snap->xip[fctx->call_cntr]; + SRF_RETURN_NEXT(fctx, Int64GetDatum(value)); + } else { + SRF_RETURN_DONE(fctx); + } +} |