diff options
| -rw-r--r-- | contrib/txid/Makefile | 10 | ||||
| -rw-r--r-- | contrib/txid/README.txid | 27 | ||||
| -rw-r--r-- | contrib/txid/txid.c | 263 | ||||
| -rw-r--r-- | contrib/txid/txid.sql.in | 45 | ||||
| -rw-r--r-- | contrib/txid/uninstall_txid.sql | 10 | 
5 files changed, 224 insertions, 131 deletions
| diff --git a/contrib/txid/Makefile b/contrib/txid/Makefile index 5e356535f5e..2d71ee72690 100644 --- a/contrib/txid/Makefile +++ b/contrib/txid/Makefile @@ -1,3 +1,4 @@ +# $PostgreSQL: pgsql/contrib/txid/Makefile,v 1.2 2007/10/11 19:54:17 tgl Exp $  MODULES = txid  DATA_built = txid.sql @@ -5,7 +6,6 @@ DATA = uninstall_txid.sql  DOCS = README.txid  REGRESS = txid -  ifdef USE_PGXS  PG_CONFIG = pg_config  PGXS := $(shell $(PG_CONFIG) --pgxs) @@ -16,11 +16,3 @@ top_builddir = ../..  include $(top_builddir)/src/Makefile.global  include $(top_srcdir)/contrib/contrib-global.mk  endif - - -test: install -	$(MAKE) installcheck || { less regression.diffs; exit 1; } - -ack: -	cp results/* expected/ - diff --git a/contrib/txid/README.txid b/contrib/txid/README.txid index 03d805d3ceb..7b39235bf37 100644 --- a/contrib/txid/README.txid +++ b/contrib/txid/README.txid @@ -1,8 +1,7 @@ +txid - export transaction IDs to user level +=========================================== -txid - export transaction id's to user level -============================================ - -The goal is to make PostgreSQL internal transaction ID and snapshot +The goal is to make PostgreSQL's internal transaction ID and snapshot  data usable externally.  This allows very efficient queue  implementation done inside database. @@ -32,7 +31,7 @@ txid_snapshot_xmax( snap ) returns int8  txid_snapshot_xip( snap ) setof int8    List of in-progress TXID's in snapshot, that are invisible. -  Values are between xmin and xmax. +  Values are between xmin (inclusive) and xmax (exclusive).  txid_visible_in_snapshot(id, snap) returns bool @@ -73,8 +72,8 @@ fetching possible txids below snap1.xmax explicitly:       AND NOT txid_visible_in_snapshot(ev_txid, :snap1)       AND txid_visible_in_snapshot(ev_txid, :snap2); -Note that although the above queries work, the PostgreSQL fails to -plan them correctly.  For actual usage the values for txid_snapshot_xmin, +Note that although the above queries work, PostgreSQL fails to +plan them efficiently.  For actual usage the values for txid_snapshot_xmin,  txid_snapshot_xmax and txid_snapshot_xip should be filled in directly,  only then will they use index. @@ -92,20 +91,16 @@ To see example code for that it's best to see pgq.batch_event_sql() function in  Dumping and restoring data containing TXIDs.  -------------------------------------------- -[towrite: reason for epoch increase] - -You can look at current epoch with query: - -  SELECT txid_current() >> 32 as epoch; +When reloading TXID data you will typically want to be sure that the current +XID counter is beyond the reloaded data.  The easiest way to do this is to +increase the XID epoch to beyond the largest one in the input data. -So new epoch should be: +You can look at current epoch with queries such as: -  SELECT (txid_current() >> 32) + 1 as newepoch; +  SELECT MAX(txid) >> 32 as epoch FROM ...;  Epoch can be changed with pg_resetxlog command:    pg_resetxlog -e NEWEPOCH DATADIR  Database needs to be shut down for that moment. - - diff --git a/contrib/txid/txid.c b/contrib/txid/txid.c index 015e1fb9368..9a89a0631ae 100644 --- a/contrib/txid/txid.c +++ b/contrib/txid/txid.c @@ -1,12 +1,21 @@  /*-------------------------------------------------------------------------   * txid.c   * - *	Export backend internal tranasction id's to user level. + *	Export internal transaction IDs to user level. + * + * Note that only top-level transaction IDs are ever converted to TXID. + * This is important because TXIDs frequently persist beyond the global + * xmin horizon, or may even be shipped to other machines, so we cannot + * rely on being able to correlate subtransaction IDs with their parents + * via functions such as SubTransGetTopmostTransaction(). + *   *   *	Copyright (c) 2003-2007, PostgreSQL Global Development Group   *	Author: Jan Wieck, Afilias USA INC. - *   *	64-bit txids: Marko Kreen, Skype Technologies + * + *	$PostgreSQL: pgsql/contrib/txid/txid.c,v 1.4 2007/10/11 19:54:17 tgl Exp $ + *   *-------------------------------------------------------------------------   */ @@ -15,32 +24,33 @@  #include "access/transam.h"  #include "access/xact.h"  #include "funcapi.h" +#include "libpq/pqformat.h" + -#ifdef PG_MODULE_MAGIC  PG_MODULE_MAGIC; -#endif  #ifdef INT64_IS_BUSTED  #error txid needs working int64  #endif -/* txid will be signed int8 in database */ +/* txid will be signed int8 in database, so must limit to 63 bits */  #define MAX_TXID   UINT64CONST(0x7FFFFFFFFFFFFFFF) +/* Use unsigned variant internally */ +typedef uint64 txid; + +/* sprintf format code for uint64 */ +#define TXID_FMT UINT64_FORMAT +  /* - * If defined, use bsearch() function for searching - * txid's inside snapshots that have more than given values. + * If defined, use bsearch() function for searching for txids in snapshots + * that have more than the specified number of values.   */  #define USE_BSEARCH_IF_NXIP_GREATER 30 -/* format code for uint64 to appendStringInfo */ -#define TXID_FMT UINT64_FORMAT - -/* Use unsigned variant internally */ -typedef uint64 txid;  /* - * Snapshot for 8byte txids. + * Snapshot containing 8byte txids.   */  typedef struct  { @@ -55,22 +65,27 @@ typedef struct      uint32      nxip;		/* number of txids in xip array */      txid		xmin;      txid		xmax; -    txid		xip[1];		/* in-progress txids */ +    txid		xip[1];		/* in-progress txids, xmin <= xip[i] < xmax */  }   TxidSnapshot; -#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip)) +#define TXID_SNAPSHOT_SIZE(nxip) \ +	(offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip))  /* - * Epoch values from backend. + * Epoch values from xact.c   */ -typedef struct { -	uint64		last_value; -	uint64		epoch; +typedef struct +{ +	TransactionId	last_xid; +	uint32			epoch;  }	TxidEpoch; +  /* public functions */  Datum       txid_snapshot_in(PG_FUNCTION_ARGS);  Datum       txid_snapshot_out(PG_FUNCTION_ARGS); +Datum       txid_snapshot_recv(PG_FUNCTION_ARGS); +Datum       txid_snapshot_send(PG_FUNCTION_ARGS);  Datum       txid_current(PG_FUNCTION_ARGS);  Datum       txid_current_snapshot(PG_FUNCTION_ARGS);  Datum       txid_snapshot_xmin(PG_FUNCTION_ARGS); @@ -81,6 +96,8 @@ Datum       txid_visible_in_snapshot(PG_FUNCTION_ARGS);  /* public function tags */  PG_FUNCTION_INFO_V1(txid_snapshot_in);  PG_FUNCTION_INFO_V1(txid_snapshot_out); +PG_FUNCTION_INFO_V1(txid_snapshot_recv); +PG_FUNCTION_INFO_V1(txid_snapshot_send);  PG_FUNCTION_INFO_V1(txid_current);  PG_FUNCTION_INFO_V1(txid_current_snapshot);  PG_FUNCTION_INFO_V1(txid_snapshot_xmin); @@ -88,62 +105,61 @@ PG_FUNCTION_INFO_V1(txid_snapshot_xmax);  PG_FUNCTION_INFO_V1(txid_snapshot_xip);  PG_FUNCTION_INFO_V1(txid_visible_in_snapshot); +  /* - * do a TransactionId -> txid conversion + * Fetch epoch data from xact.c.   */ -static txid -convert_xid(TransactionId xid, const TxidEpoch *state) +static void +load_xid_epoch(TxidEpoch *state)  { -	uint64 epoch; - -	/* return special xid's as-is */ -	if (xid < FirstNormalTransactionId) -		return xid; - -	/* xid can on both sides on wrap-around */ -	epoch = state->epoch; -	if (TransactionIdPrecedes(xid, state->last_value)) { -		if (xid > state->last_value) -			epoch--; -	} else if (TransactionIdFollows(xid, state->last_value)) { -		if (xid < state->last_value) -			epoch++; -	} -	return (epoch << 32) | xid; +	GetNextXidAndEpoch(&state->last_xid, &state->epoch);  }  /* - * Fetch epoch data from backend. + * do a TransactionId -> txid conversion for an XID near the given epoch   */ -static void -load_xid_epoch(TxidEpoch *state) +static txid +convert_xid(TransactionId xid, const TxidEpoch *state)  { -	TransactionId	xid; -	uint32			epoch; +	uint64 epoch; -	GetNextXidAndEpoch(&xid, &epoch); +	/* return special xid's as-is */ +	if (!TransactionIdIsNormal(xid)) +		return (txid) xid; + +	/* xid can be on either side when near wrap-around */ +	epoch = (uint64) state->epoch; +	if (xid > state->last_xid && +		TransactionIdPrecedes(xid, state->last_xid)) +		epoch--; +	else if (xid < state->last_xid && +			 TransactionIdFollows(xid, state->last_xid)) +		epoch++; -	state->epoch = epoch; -	state->last_value = xid; +	return (epoch << 32) | xid;  }  /* - * compare txid in memory. + * txid comparator for qsort/bsearch   */  static int  cmp_txid(const void *aa, const void *bb)  { -	const uint64 *a = aa; -	const uint64 *b = bb; -	if (*a < *b) +	txid	a = *(const txid *) aa; +	txid	b = *(const txid *) bb; + +	if (a < b)  		return -1; -	if (*a > *b) +	if (a > b)  		return 1;  	return 0;  }  /* - * order txids, for bsearch(). + * sort a snapshot's txids, so we can use bsearch() later. + * + * For consistency of on-disk representation, we always sort even if bsearch + * will not be used.   */  static void  sort_snapshot(TxidSnapshot *snap) @@ -166,13 +182,16 @@ is_visible_txid(txid value, const TxidSnapshot *snap)  	else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)  	{  		void *res; +  		res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid); +		/* if found, transaction is still in progress */  		return (res) ? false : true;  	}  #endif  	else  	{ -		int i; +		uint32 i; +  		for (i = 0; i < snap->nxip; i++)  		{  			if (value == snap->xip[i]) @@ -206,7 +225,7 @@ buf_add_txid(StringInfo buf, txid xid)  {  	TxidSnapshot *snap = (TxidSnapshot *)buf->data; -	/* do it before possible realloc */ +	/* do this before possible realloc */  	snap->nxip++;  	appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid)); @@ -216,6 +235,7 @@ static TxidSnapshot *  buf_finalize(StringInfo buf)  {  	TxidSnapshot *snap = (TxidSnapshot *)buf->data; +  	SET_VARSIZE(snap, buf->len);  	/* buf is not needed anymore */ @@ -319,13 +339,17 @@ bad_format:  }  /* - * Public functions + * Public functions. + * + * txid_current() and txid_current_snapshot() are the only ones that + * communicate with core xid machinery.  All the others work on data + * returned by them.   */  /*   * txid_current() returns int8   * - *		Return the current transaction ID + *		Return the current toplevel transaction ID as TXID   */  Datum  txid_current(PG_FUNCTION_ARGS) @@ -343,19 +367,21 @@ txid_current(PG_FUNCTION_ARGS)  /*   * txid_current_snapshot() returns txid_snapshot   * - *		Return current snapshot + *		Return current snapshot in TXID format + * + * Note that only top-transaction XIDs are included in the snapshot.   */  Datum  txid_current_snapshot(PG_FUNCTION_ARGS)  {  	TxidSnapshot *snap; -	unsigned nxip, i, size; +	uint32 nxip, i, size;  	TxidEpoch state;  	Snapshot cur; -	cur = SerializableSnapshot; +	cur = ActiveSnapshot;  	if (cur == NULL) -		elog(ERROR, "get_current_snapshot: SerializableSnapshot == NULL"); +		elog(ERROR, "txid_current_snapshot: ActiveSnapshot == NULL");  	load_xid_epoch(&state); @@ -372,7 +398,7 @@ txid_current_snapshot(PG_FUNCTION_ARGS)  	for (i = 0; i < nxip; i++)  		snap->xip[i] = convert_xid(cur->xip[i], &state); -	/* we want them guaranteed ascending order */ +	/* we want them guaranteed to be in ascending order */  	sort_snapshot(snap);  	PG_RETURN_POINTER(snap); @@ -386,8 +412,8 @@ txid_current_snapshot(PG_FUNCTION_ARGS)  Datum  txid_snapshot_in(PG_FUNCTION_ARGS)  { -	TxidSnapshot *snap;  	char	   *str = PG_GETARG_CSTRING(0); +	TxidSnapshot *snap;  	snap = parse_snapshot(str); @@ -402,11 +428,9 @@ txid_snapshot_in(PG_FUNCTION_ARGS)  Datum  txid_snapshot_out(PG_FUNCTION_ARGS)  { -	TxidSnapshot   *snap; +	TxidSnapshot   *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);  	StringInfoData	str; -	int				i; - -	snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); +	uint32			i;  	initStringInfo(&str); @@ -415,16 +439,92 @@ txid_snapshot_out(PG_FUNCTION_ARGS)  	for (i = 0; i < snap->nxip; i++)  	{ -		appendStringInfo(&str, "%s" TXID_FMT, -						 ((i > 0) ? "," : ""), -						 snap->xip[i]); +		if (i > 0) +			appendStringInfoChar(&str, ','); +		appendStringInfo(&str, TXID_FMT, snap->xip[i]);  	} -	PG_FREE_IF_COPY(snap, 0); -  	PG_RETURN_CSTRING(str.data);  } +/* + * txid_snapshot_recv(internal) returns txid_snapshot + * + *		binary input function for type txid_snapshot + * + *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip + */ +Datum +txid_snapshot_recv(PG_FUNCTION_ARGS) +{ +	StringInfo  buf = (StringInfo) PG_GETARG_POINTER(0); +	TxidSnapshot *snap; +	txid last = 0; +	int nxip; +	int i; +	int avail; +	int expect; +	txid xmin, xmax; + +	/* +	 * load nxip and check for nonsense. +	 * +	 * (nxip > avail) check is against int overflows in 'expect'. +	 */ +	nxip = pq_getmsgint(buf, 4); +	avail = buf->len - buf->cursor; +	expect = 8 + 8 + nxip * 8; +	if (nxip < 0 || nxip > avail || expect > avail) +		goto bad_format; + +	xmin = pq_getmsgint64(buf); +	xmax = pq_getmsgint64(buf); +	if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID) +		goto bad_format; + +	snap = palloc(TXID_SNAPSHOT_SIZE(nxip)); +	snap->xmin = xmin; +	snap->xmax = xmax; +	snap->nxip = nxip; +	SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip)); + +	for (i = 0; i < nxip; i++) +	{ +		txid cur =  pq_getmsgint64(buf); +		if (cur <= last || cur < xmin || cur >= xmax) +			goto bad_format; +		snap->xip[i] = cur; +		last = cur; +	} +	PG_RETURN_POINTER(snap); + +bad_format: +	elog(ERROR, "invalid snapshot data"); +	return (Datum)NULL; +} + +/* + * txid_snapshot_send(txid_snapshot) returns bytea + * + *		binary output function for type txid_snapshot + * + *		format: int4 nxip, int8 xmin, int8 xmax, int8 xip + */ +Datum +txid_snapshot_send(PG_FUNCTION_ARGS) +{ +	TxidSnapshot *snap = (TxidSnapshot *)PG_GETARG_VARLENA_P(0); +	StringInfoData buf; +	uint32 i; + +	pq_begintypsend(&buf); +	pq_sendint(&buf, snap->nxip, 4); +	pq_sendint64(&buf, snap->xmin); +	pq_sendint64(&buf, snap->xmax); +	for (i = 0; i < snap->nxip; i++) +		pq_sendint64(&buf, snap->xip[i]); +	PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); +}  /*   * txid_visible_in_snapshot(int8, txid_snapshot) returns bool @@ -436,12 +536,8 @@ txid_visible_in_snapshot(PG_FUNCTION_ARGS)  {  	txid value = PG_GETARG_INT64(0);  	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1); -	int			res; -	res = is_visible_txid(value, snap) ? true : false; - -	PG_FREE_IF_COPY(snap, 1); -	PG_RETURN_BOOL(res); +	PG_RETURN_BOOL(is_visible_txid(value, snap));  }  /* @@ -453,9 +549,8 @@ Datum  txid_snapshot_xmin(PG_FUNCTION_ARGS)  {  	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); -	txid res = snap->xmin; -	PG_FREE_IF_COPY(snap, 0); -	PG_RETURN_INT64(res); + +	PG_RETURN_INT64(snap->xmin);  }  /* @@ -467,9 +562,8 @@ Datum  txid_snapshot_xmax(PG_FUNCTION_ARGS)  {  	TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); -	txid res = snap->xmax; -	PG_FREE_IF_COPY(snap, 0); -	PG_RETURN_INT64(res); + +	PG_RETURN_INT64(snap->xmax);  }  /* @@ -486,15 +580,13 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)  	/* on first call initialize snap_state and get copy of snapshot */  	if (SRF_IS_FIRSTCALL()) { -		TxidSnapshot *arg; +		TxidSnapshot *arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);  		fctx = SRF_FIRSTCALL_INIT();  		/* make a copy of user snapshot */ -		arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0);  		snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));  		memcpy(snap, arg, VARSIZE(arg)); -		PG_FREE_IF_COPY(arg, 0);  		fctx->user_fctx = snap;  	} @@ -509,4 +601,3 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)  		SRF_RETURN_DONE(fctx);  	}  } - diff --git a/contrib/txid/txid.sql.in b/contrib/txid/txid.sql.in index 3c36dc27459..ce5cded738b 100644 --- a/contrib/txid/txid.sql.in +++ b/contrib/txid/txid.sql.in @@ -5,21 +5,38 @@  --  --	Copyright (c) 2003-2007, PostgreSQL Global Development Group  --	Author: Jan Wieck, Afilias USA INC. ---  --	64-bit txids: Marko Kreen, Skype Technologies +-- +--	$PostgreSQL: pgsql/contrib/txid/txid.sql.in,v 1.2 2007/10/11 19:54:17 tgl Exp $ +--  -- ---------- +-- Adjust this setting to control where the objects get created. +SET search_path = public; + +BEGIN; +  --  -- A special transaction snapshot data type for faster visibility checks  -- -CREATE OR REPLACE FUNCTION txid_snapshot_in(cstring) +CREATE TYPE txid_snapshot; + +CREATE FUNCTION txid_snapshot_in(cstring)  	RETURNS txid_snapshot  	AS 'MODULE_PATHNAME' LANGUAGE C  	IMMUTABLE STRICT; -CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot) +CREATE FUNCTION txid_snapshot_out(txid_snapshot)  	RETURNS cstring  	AS 'MODULE_PATHNAME' LANGUAGE C  	IMMUTABLE STRICT; +CREATE FUNCTION txid_snapshot_recv(internal) +	RETURNS txid_snapshot +	AS 'MODULE_PATHNAME' LANGUAGE C +	IMMUTABLE STRICT; +CREATE FUNCTION txid_snapshot_send(txid_snapshot) +	RETURNS bytea +	AS 'MODULE_PATHNAME' LANGUAGE C +	IMMUTABLE STRICT;  --  -- The data type itself @@ -27,42 +44,44 @@ CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot)  CREATE TYPE txid_snapshot (  	INPUT = txid_snapshot_in,  	OUTPUT = txid_snapshot_out, +	RECEIVE = txid_snapshot_recv, +	SEND = txid_snapshot_send,  	INTERNALLENGTH = variable,  	STORAGE = extended,  	ALIGNMENT = double  ); -CREATE OR REPLACE FUNCTION txid_current() +-- +-- Functions for txid +-- +CREATE FUNCTION txid_current()  	RETURNS bigint  	AS 'MODULE_PATHNAME', 'txid_current' LANGUAGE C  	STABLE; -CREATE OR REPLACE FUNCTION txid_current_snapshot() +CREATE FUNCTION txid_current_snapshot()  	RETURNS txid_snapshot  	AS 'MODULE_PATHNAME', 'txid_current_snapshot' LANGUAGE C  	STABLE; -CREATE OR REPLACE FUNCTION txid_snapshot_xmin(txid_snapshot) +CREATE FUNCTION txid_snapshot_xmin(txid_snapshot)  	RETURNS bigint  	AS 'MODULE_PATHNAME', 'txid_snapshot_xmin' LANGUAGE C  	IMMUTABLE STRICT; -CREATE OR REPLACE FUNCTION txid_snapshot_xmax(txid_snapshot) +CREATE FUNCTION txid_snapshot_xmax(txid_snapshot)  	RETURNS bigint  	AS 'MODULE_PATHNAME', 'txid_snapshot_xmax' LANGUAGE C  	IMMUTABLE STRICT; -CREATE OR REPLACE FUNCTION txid_snapshot_xip(txid_snapshot) +CREATE FUNCTION txid_snapshot_xip(txid_snapshot)  	RETURNS setof bigint  	AS 'MODULE_PATHNAME', 'txid_snapshot_xip' LANGUAGE C  	IMMUTABLE STRICT; - --- --- Special comparision functions for visibility checks --- -CREATE OR REPLACE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot) +CREATE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot)  	RETURNS boolean  	AS 'MODULE_PATHNAME', 'txid_visible_in_snapshot' LANGUAGE C  	IMMUTABLE STRICT; +COMMIT; diff --git a/contrib/txid/uninstall_txid.sql b/contrib/txid/uninstall_txid.sql index a28da298455..ac126d60e06 100644 --- a/contrib/txid/uninstall_txid.sql +++ b/contrib/txid/uninstall_txid.sql @@ -1,4 +1,4 @@ - +SET search_path = public;  DROP FUNCTION txid_current();  DROP FUNCTION txid_current_snapshot(); @@ -6,10 +6,6 @@ DROP FUNCTION txid_snapshot_xmin(txid_snapshot);  DROP FUNCTION txid_snapshot_xmax(txid_snapshot);  DROP FUNCTION txid_snapshot_xip(txid_snapshot);  DROP FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot); -DROP FUNCTION txid_not_visible_in_snapshot(bigint, txid_snapshot); - -DROP TYPE txid_snapshot cascade; --- need cascade to drop those: --- DROP FUNCTION txid_snapshot_in(cstring); --- DROP FUNCTION txid_snapshot_out(txid_snapshot); +DROP TYPE txid_snapshot CASCADE; +-- need cascade to drop the I/O functions | 
