summaryrefslogtreecommitdiff
path: root/src/backend/storage/ipc
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2009-12-19 01:32:45 +0000
committerSimon Riggs <simon@2ndQuadrant.com>2009-12-19 01:32:45 +0000
commitefc16ea520679d713d98a2c7bf1453c4ff7b91ec (patch)
tree6a39d2af0704a36281dc7df3ec10823eb3e6de75 /src/backend/storage/ipc
parent78a09145e0f8322e625bbc7d69fcb865ce4f3034 (diff)
Allow read only connections during recovery, known as Hot Standby.
Enabled by recovery_connections = on (default) and forcing archive recovery using a recovery.conf. Recovery processing now emulates the original transactions as they are replayed, providing full locking and MVCC behaviour for read only queries. Recovery must enter consistent state before connections are allowed, so there is a delay, typically short, before connections succeed. Replay of recovering transactions can conflict and in some cases deadlock with queries during recovery; these result in query cancellation after max_standby_delay seconds have expired. Infrastructure changes have minor effects on normal running, though introduce four new types of WAL record. New test mode "make standbycheck" allows regression tests of static command behaviour on a standby server while in recovery. Typical and extreme dynamic behaviours have been checked via code inspection and manual testing. Few port specific behaviours have been utilised, though primary testing has been on Linux only so far. This commit is the basic patch. Additional changes will follow in this release to enhance some aspects of behaviour, notably improved handling of conflicts, deadlock detection and query cancellation. Changes to VACUUM FULL are also required. Simon Riggs, with significant and lengthy review by Heikki Linnakangas, including streamlined redesign of snapshot creation and two-phase commit. Important contributions from Florian Pflug, Mark Kirkwood, Merlin Moncure, Greg Stark, Gianni Ciolli, Gabriele Bartolini, Hannu Krosing, Robert Haas, Tatsuo Ishii, Hiroyuki Yamada plus support and feedback from many other community members.
Diffstat (limited to 'src/backend/storage/ipc')
-rw-r--r--src/backend/storage/ipc/Makefile4
-rw-r--r--src/backend/storage/ipc/procarray.c1127
-rw-r--r--src/backend/storage/ipc/sinvaladt.c18
-rw-r--r--src/backend/storage/ipc/standby.c717
4 files changed, 1846 insertions, 20 deletions
diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index 20ac1e75e45..1d897c5afba 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -1,7 +1,7 @@
#
# Makefile for storage/ipc
#
-# $PostgreSQL: pgsql/src/backend/storage/ipc/Makefile,v 1.22 2009/07/31 20:26:23 tgl Exp $
+# $PostgreSQL: pgsql/src/backend/storage/ipc/Makefile,v 1.23 2009/12/19 01:32:35 sriggs Exp $
#
subdir = src/backend/storage/ipc
@@ -16,6 +16,6 @@ endif
endif
OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
- sinval.o sinvaladt.o
+ sinval.o sinvaladt.o standby.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 9a3d2f62606..c4ddf8f2bd8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -17,13 +17,27 @@
* as are the myProcLocks lists. They can be distinguished from regular
* backend PGPROCs at need by checking for pid == 0.
*
+ * During recovery, we also keep a list of XIDs representing transactions
+ * that are known to be running at current point in WAL recovery. This
+ * list is kept in the KnownAssignedXids array, and updated by watching
+ * the sequence of arriving xids. This is very important because if we leave
+ * those xids out of the snapshot then they will appear to be already complete.
+ * Later, when they have actually completed this could lead to confusion as to
+ * whether those xids are visible or not, blowing a huge hole in MVCC.
+ * We need 'em.
+ *
+ * It is theoretically possible for a FATAL error to explode before writing
+ * an abort record. This could tie up KnownAssignedXids indefinitely, so
+ * we prune the array when a valid list of running xids arrives. These quirks,
+ * if they do ever exist in reality will not effect the correctness of
+ * snapshots.
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.51 2009/07/29 15:57:11 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.52 2009/12/19 01:32:35 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,14 +45,18 @@
#include <signal.h>
+#include "access/clog.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/twophase.h"
#include "miscadmin.h"
#include "storage/procarray.h"
+#include "storage/standby.h"
+#include "utils/builtins.h"
#include "utils/snapmgr.h"
+static RunningTransactionsData CurrentRunningXactsData;
/* Our shared memory area */
typedef struct ProcArrayStruct
@@ -46,6 +64,14 @@ typedef struct ProcArrayStruct
int numProcs; /* number of valid procs entries */
int maxProcs; /* allocated size of procs array */
+ int numKnownAssignedXids; /* current number of known assigned xids */
+ int maxKnownAssignedXids; /* allocated size of known assigned xids */
+ /*
+ * Highest subxid that overflowed KnownAssignedXids array. Similar to
+ * overflowing cached subxids in PGPROC entries.
+ */
+ TransactionId lastOverflowedXid;
+
/*
* We declare procs[] as 1 entry because C wants a fixed-size array, but
* actually it is maxProcs entries long.
@@ -55,6 +81,24 @@ typedef struct ProcArrayStruct
static ProcArrayStruct *procArray;
+/*
+ * Bookkeeping for tracking emulated transactions in recovery
+ */
+static HTAB *KnownAssignedXidsHash;
+static TransactionId latestObservedXid = InvalidTransactionId;
+
+/*
+ * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
+ * the highest xid that might still be running that we don't have in
+ * KnownAssignedXids.
+ */
+static TransactionId standbySnapshotPendingXmin;
+
+/*
+ * Oldest transaction still running according to the running-xacts snapshot
+ * we initialized standby mode from.
+ */
+static TransactionId snapshotOldestActiveXid;
#ifdef XIDCACHE_DEBUG
@@ -90,6 +134,17 @@ static void DisplayXidCache(void);
#define xc_slow_answer_inc() ((void) 0)
#endif /* XIDCACHE_DEBUG */
+/* Primitives for KnownAssignedXids array handling for standby */
+static Size KnownAssignedXidsShmemSize(int size);
+static void KnownAssignedXidsInit(int size);
+static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
+static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax);
+static bool KnownAssignedXidsExist(TransactionId xid);
+static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
+static void KnownAssignedXidsRemove(TransactionId xid);
+static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
+static void KnownAssignedXidsDisplay(int trace_level);
/*
* Report shared-memory space needed by CreateSharedProcArray.
@@ -100,8 +155,22 @@ ProcArrayShmemSize(void)
Size size;
size = offsetof(ProcArrayStruct, procs);
- size = add_size(size, mul_size(sizeof(PGPROC *),
- add_size(MaxBackends, max_prepared_xacts)));
+
+ /* Normal processing - MyProc slots */
+#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
+ size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
+
+ /*
+ * During recovery processing we have a data structure called KnownAssignedXids,
+ * created in shared memory. Local data structures are also created in various
+ * backends during GetSnapshotData(), TransactionIdIsInProgress() and
+ * GetRunningTransactionData(). All of the main structures created in those
+ * functions must be identically sized, since we may at times copy the whole
+ * of the data structures around. We refer to this as TOTAL_MAX_CACHED_SUBXIDS.
+ */
+#define TOTAL_MAX_CACHED_SUBXIDS ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
+ if (XLogRequestRecoveryConnections)
+ size = add_size(size, KnownAssignedXidsShmemSize(TOTAL_MAX_CACHED_SUBXIDS));
return size;
}
@@ -116,15 +185,21 @@ CreateSharedProcArray(void)
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
- ShmemInitStruct("Proc Array", ProcArrayShmemSize(), &found);
+ ShmemInitStruct("Proc Array",
+ mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS),
+ &found);
if (!found)
{
/*
* We're the first - initialize.
*/
+ /* Normal processing */
procArray->numProcs = 0;
- procArray->maxProcs = MaxBackends + max_prepared_xacts;
+ procArray->maxProcs = PROCARRAY_MAXPROCS;
+
+ if (XLogRequestRecoveryConnections)
+ KnownAssignedXidsInit(TOTAL_MAX_CACHED_SUBXIDS);
}
}
@@ -302,6 +377,7 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId;
+ proc->recoveryConflictMode = 0;
/* redundant, but just in case */
proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
@@ -312,6 +388,220 @@ ProcArrayClearTransaction(PGPROC *proc)
proc->subxids.overflowed = false;
}
+void
+ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
+{
+ snapshotOldestActiveXid = oldestActiveXid;
+}
+
+/*
+ * ProcArrayApplyRecoveryInfo -- apply recovery info about xids
+ *
+ * Takes us through 3 states: Uninitialized, Pending and Ready.
+ * Normal case is to go all the way to Ready straight away, though there
+ * are atypical cases where we need to take it in steps.
+ *
+ * Use the data about running transactions on master to create the initial
+ * state of KnownAssignedXids. We also these records to regularly prune
+ * KnownAssignedXids because we know it is possible that some transactions
+ * with FATAL errors do not write abort records, which could cause eventual
+ * overflow.
+ *
+ * Only used during recovery. Notice the signature is very similar to a
+ * _redo function and its difficult to decide exactly where this code should
+ * reside.
+ */
+void
+ProcArrayApplyRecoveryInfo(RunningTransactions running)
+{
+ int xid_index; /* main loop */
+ TransactionId *xids;
+ int nxids;
+
+ Assert(standbyState >= STANDBY_INITIALIZED);
+
+ /*
+ * Remove stale transactions, if any.
+ */
+ ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
+ StandbyReleaseOldLocks(running->oldestRunningXid);
+
+ /*
+ * If our snapshot is already valid, nothing else to do...
+ */
+ if (standbyState == STANDBY_SNAPSHOT_READY)
+ return;
+
+ /*
+ * If our initial RunningXactData had an overflowed snapshot then we
+ * knew we were missing some subxids from our snapshot. We can use
+ * this data as an initial snapshot, but we cannot yet mark it valid.
+ * We know that the missing subxids are equal to or earlier than
+ * nextXid. After we initialise we continue to apply changes during
+ * recovery, so once the oldestRunningXid is later than the nextXid
+ * from the initial snapshot we know that we no longer have missing
+ * information and can mark the snapshot as valid.
+ */
+ if (standbyState == STANDBY_SNAPSHOT_PENDING)
+ {
+ if (TransactionIdPrecedes(standbySnapshotPendingXmin,
+ running->oldestRunningXid))
+ {
+ standbyState = STANDBY_SNAPSHOT_READY;
+ elog(trace_recovery(DEBUG2),
+ "running xact data now proven complete");
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshots are now enabled");
+ }
+ return;
+ }
+
+ /*
+ * OK, we need to initialise from the RunningXactData record
+ */
+ latestObservedXid = running->nextXid;
+ TransactionIdRetreat(latestObservedXid);
+
+ /*
+ * If the snapshot overflowed, then we still initialise with what we
+ * know, but the recovery snapshot isn't fully valid yet because we
+ * know there are some subxids missing (ergo we don't know which ones)
+ */
+ if (!running->subxid_overflow)
+ {
+ standbyState = STANDBY_SNAPSHOT_READY;
+ standbySnapshotPendingXmin = InvalidTransactionId;
+ }
+ else
+ {
+ standbyState = STANDBY_SNAPSHOT_PENDING;
+ standbySnapshotPendingXmin = latestObservedXid;
+ ereport(LOG,
+ (errmsg("consistent state delayed because recovery snapshot incomplete")));
+ }
+
+ nxids = running->xcnt;
+ xids = running->xids;
+
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+ /*
+ * Scan through the incoming array of RunningXacts and collect xids.
+ * We don't use SubtransSetParent because it doesn't matter yet. If
+ * we aren't overflowed then all xids will fit in snapshot and so we
+ * don't need subtrans. If we later overflow, an xid assignment record
+ * will add xids to subtrans. If RunningXacts is overflowed then we
+ * don't have enough information to correctly update subtrans anyway.
+ */
+
+ /*
+ * Nobody else is running yet, but take locks anyhow
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /* Reset latestCompletedXid */
+ ShmemVariableCache->latestCompletedXid = running->nextXid;
+ TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
+
+ /*
+ * Add our new xids into the array
+ */
+ for (xid_index = 0; xid_index < running->xcnt; xid_index++)
+ {
+ TransactionId xid = running->xids[xid_index];
+
+ /*
+ * The running-xacts snapshot can contain xids that did finish between
+ * when the snapshot was taken and when it was written to WAL. Such
+ * transactions are not running anymore, so ignore them.
+ */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ continue;
+
+ KnownAssignedXidsAdd(&xid, 1);
+ }
+
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+
+ /*
+ * Update lastOverflowedXid if the snapshot had overflown. We don't know
+ * the exact value for this, so conservatively assume that it's nextXid-1
+ */
+ if (running->subxid_overflow &&
+ TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid))
+ procArray->lastOverflowedXid = latestObservedXid;
+ else if (TransactionIdFollows(running->oldestRunningXid,
+ procArray->lastOverflowedXid))
+ procArray->lastOverflowedXid = InvalidTransactionId;
+
+ LWLockRelease(ProcArrayLock);
+
+ /* nextXid must be beyond any observed xid */
+ if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
+ ShmemVariableCache->nextXid = running->nextXid;
+
+ elog(trace_recovery(DEBUG2),
+ "running transaction data initialized");
+ if (standbyState == STANDBY_SNAPSHOT_READY)
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshots are now enabled");
+}
+
+void
+ProcArrayApplyXidAssignment(TransactionId topxid,
+ int nsubxids, TransactionId *subxids)
+{
+ TransactionId max_xid;
+ int i;
+
+ if (standbyState < STANDBY_SNAPSHOT_PENDING)
+ return;
+
+ max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
+
+ /*
+ * Mark all the subtransactions as observed.
+ *
+ * NOTE: This will fail if the subxid contains too many previously
+ * unobserved xids to fit into known-assigned-xids. That shouldn't happen
+ * as the code stands, because xid-assignment records should never contain
+ * more than PGPROC_MAX_CACHED_SUBXIDS entries.
+ */
+ RecordKnownAssignedTransactionIds(max_xid);
+
+ /*
+ * Notice that we update pg_subtrans with the top-level xid, rather
+ * than the parent xid. This is a difference between normal
+ * processing and recovery, yet is still correct in all cases. The
+ * reason is that subtransaction commit is not marked in clog until
+ * commit processing, so all aborted subtransactions have already been
+ * clearly marked in clog. As a result we are able to refer directly
+ * to the top-level transaction's state rather than skipping through
+ * all the intermediate states in the subtransaction tree. This
+ * should be the first time we have attempted to SubTransSetParent().
+ */
+ for (i = 0; i < nsubxids; i++)
+ SubTransSetParent(subxids[i], topxid, false);
+
+ /*
+ * Uses same locking as transaction commit
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * Remove from known-assigned-xacts.
+ */
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i]);
+
+ /*
+ * Advance lastOverflowedXid when required.
+ */
+ if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
+ procArray->lastOverflowedXid = max_xid;
+
+ LWLockRelease(ProcArrayLock);
+}
/*
* TransactionIdIsInProgress -- is given transaction running in some backend
@@ -384,8 +674,15 @@ TransactionIdIsInProgress(TransactionId xid)
*/
if (xids == NULL)
{
- xids = (TransactionId *)
- malloc(arrayP->maxProcs * sizeof(TransactionId));
+ /*
+ * In hot standby mode, reserve enough space to hold all xids in
+ * the known-assigned list. If we later finish recovery, we no longer
+ * need the bigger array, but we don't bother to shrink it.
+ */
+ int maxxids = RecoveryInProgress() ?
+ arrayP->maxProcs : TOTAL_MAX_CACHED_SUBXIDS;
+
+ xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
if (xids == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -465,11 +762,35 @@ TransactionIdIsInProgress(TransactionId xid)
xids[nxids++] = pxid;
}
+ /* In hot standby mode, check the known-assigned-xids list. */
+ if (RecoveryInProgress())
+ {
+ /* none of the PGPROC entries should have XIDs in hot standby mode */
+ Assert(nxids == 0);
+
+ if (KnownAssignedXidsExist(xid))
+ {
+ LWLockRelease(ProcArrayLock);
+ /* XXX: should we have a separate counter for this? */
+ /* xc_by_main_xid_inc(); */
+ return true;
+ }
+
+ /*
+ * If the KnownAssignedXids overflowed, we have to check
+ * pg_subtrans too. Copy all xids from KnownAssignedXids that are
+ * lower than xid, since if xid is a subtransaction its parent will
+ * always have a lower value.
+ */
+ if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
+ nxids = KnownAssignedXidsGet(xids, xid);
+ }
+
LWLockRelease(ProcArrayLock);
/*
* If none of the relevant caches overflowed, we know the Xid is not
- * running without looking at pg_subtrans.
+ * running without even looking at pg_subtrans.
*/
if (nxids == 0)
{
@@ -590,6 +911,9 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
TransactionId result;
int index;
+ /* Cannot look for individual databases during recovery */
+ Assert(allDbs || !RecoveryInProgress());
+
LWLockAcquire(ProcArrayLock, LW_SHARED);
/*
@@ -635,6 +959,13 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
LWLockRelease(ProcArrayLock);
+ /*
+ * Compute the cutoff XID, being careful not to generate a "permanent" XID
+ */
+ result -= vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(result))
+ result = FirstNormalTransactionId;
+
return result;
}
@@ -656,7 +987,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
* but since PGPROC has only a limited cache area for subxact XIDs, full
* information may not be available. If we find any overflowed subxid arrays,
* we have to mark the snapshot's subxid data as overflowed, and extra work
- * will need to be done to determine what's running (see XidInMVCCSnapshot()
+ * *may* need to be done to determine what's running (see XidInMVCCSnapshot()
* in tqual.c).
*
* We also update the following backend-global variables:
@@ -681,6 +1012,7 @@ GetSnapshotData(Snapshot snapshot)
int index;
int count = 0;
int subcount = 0;
+ bool suboverflowed = false;
Assert(snapshot != NULL);
@@ -698,7 +1030,8 @@ GetSnapshotData(Snapshot snapshot)
if (snapshot->xip == NULL)
{
/*
- * First call for this snapshot
+ * First call for this snapshot. Snapshot is same size whether
+ * or not we are in recovery, see later comments.
*/
snapshot->xip = (TransactionId *)
malloc(arrayP->maxProcs * sizeof(TransactionId));
@@ -708,13 +1041,15 @@ GetSnapshotData(Snapshot snapshot)
errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *)
- malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+ snapshot->takenDuringRecovery = RecoveryInProgress();
+
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin.
@@ -763,6 +1098,7 @@ GetSnapshotData(Snapshot snapshot)
*/
if (TransactionIdIsNormal(xid))
{
+ Assert(!snapshot->takenDuringRecovery);
if (TransactionIdFollowsOrEquals(xid, xmax))
continue;
if (proc != MyProc)
@@ -785,16 +1121,17 @@ GetSnapshotData(Snapshot snapshot)
*
* Again, our own XIDs are not included in the snapshot.
*/
- if (subcount >= 0 && proc != MyProc)
+ if (!suboverflowed && proc != MyProc)
{
if (proc->subxids.overflowed)
- subcount = -1; /* overflowed */
+ suboverflowed = true;
else
{
int nxids = proc->subxids.nxids;
if (nxids > 0)
{
+ Assert(!snapshot->takenDuringRecovery);
memcpy(snapshot->subxip + subcount,
(void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
@@ -804,6 +1141,40 @@ GetSnapshotData(Snapshot snapshot)
}
}
+ /*
+ * If in recovery get any known assigned xids.
+ */
+ if (snapshot->takenDuringRecovery)
+ {
+ Assert(count == 0);
+
+ /*
+ * We store all xids directly into subxip[]. Here's why:
+ *
+ * In recovery we don't know which xids are top-level and which are
+ * subxacts, a design choice that greatly simplifies xid processing.
+ *
+ * It seems like we would want to try to put xids into xip[] only,
+ * but that is fairly small. We would either need to make that bigger
+ * or to increase the rate at which we WAL-log xid assignment;
+ * neither is an appealing choice.
+ *
+ * We could try to store xids into xip[] first and then into subxip[]
+ * if there are too many xids. That only works if the snapshot doesn't
+ * overflow because we do not search subxip[] in that case. A simpler
+ * way is to just store all xids in the subxact array because this
+ * is by far the bigger array. We just leave the xip array empty.
+ *
+ * Either way we need to change the way XidInMVCCSnapshot() works
+ * depending upon when the snapshot was taken, or change normal
+ * snapshot processing so it matches.
+ */
+ subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin, xmax);
+
+ if (TransactionIdPrecedes(xmin, procArray->lastOverflowedXid))
+ suboverflowed = true;
+ }
+
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = xmin;
@@ -818,13 +1189,16 @@ GetSnapshotData(Snapshot snapshot)
globalxmin = xmin;
/* Update global variables too */
- RecentGlobalXmin = globalxmin;
+ RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
+ if (!TransactionIdIsNormal(RecentGlobalXmin))
+ RecentGlobalXmin = FirstNormalTransactionId;
RecentXmin = xmin;
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->subxcnt = subcount;
+ snapshot->suboverflowed = suboverflowed;
snapshot->curcid = GetCurrentCommandId(false);
@@ -840,6 +1214,129 @@ GetSnapshotData(Snapshot snapshot)
}
/*
+ * GetRunningTransactionData -- returns information about running transactions.
+ *
+ * Similar to GetSnapshotData but returning more information. We include
+ * all PGPROCs with an assigned TransactionId, even VACUUM processes.
+ *
+ * This is never executed during recovery so there is no need to look at
+ * KnownAssignedXids.
+ *
+ * We don't worry about updating other counters, we want to keep this as
+ * simple as possible and leave GetSnapshotData() as the primary code for
+ * that bookkeeping.
+ */
+RunningTransactions
+GetRunningTransactionData(void)
+{
+ ProcArrayStruct *arrayP = procArray;
+ RunningTransactions CurrentRunningXacts = (RunningTransactions) &CurrentRunningXactsData;
+ TransactionId latestCompletedXid;
+ TransactionId oldestRunningXid;
+ TransactionId *xids;
+ int index;
+ int count;
+ int subcount;
+ bool suboverflowed;
+
+ Assert(!RecoveryInProgress());
+
+ /*
+ * Allocating space for maxProcs xids is usually overkill; numProcs would
+ * be sufficient. But it seems better to do the malloc while not holding
+ * the lock, so we can't look at numProcs. Likewise, we allocate much
+ * more subxip storage than is probably needed.
+ *
+ * Should only be allocated for bgwriter, since only ever executed
+ * during checkpoints.
+ */
+ if (CurrentRunningXacts->xids == NULL)
+ {
+ /*
+ * First call
+ */
+ CurrentRunningXacts->xids = (TransactionId *)
+ malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ if (CurrentRunningXacts->xids == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ xids = CurrentRunningXacts->xids;
+
+ count = subcount = 0;
+ suboverflowed = false;
+
+ /*
+ * Ensure that no xids enter or leave the procarray while we obtain
+ * snapshot.
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ LWLockAcquire(XidGenLock, LW_SHARED);
+
+ latestCompletedXid = ShmemVariableCache->latestCompletedXid;
+
+ oldestRunningXid = ShmemVariableCache->nextXid;
+ /*
+ * Spin over procArray collecting all xids and subxids.
+ */
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+ TransactionId xid;
+ int nxids;
+
+ /* Fetch xid just once - see GetNewTransactionId */
+ xid = proc->xid;
+
+ /*
+ * We don't need to store transactions that don't have a TransactionId
+ * yet because they will not show as running on a standby server.
+ */
+ if (!TransactionIdIsValid(xid))
+ continue;
+
+ xids[count++] = xid;
+
+ if (TransactionIdPrecedes(xid, oldestRunningXid))
+ oldestRunningXid = xid;
+
+ /*
+ * Save subtransaction XIDs. Other backends can't add or remove entries
+ * while we're holding XidGenLock.
+ */
+ nxids = proc->subxids.nxids;
+ if (nxids > 0)
+ {
+ memcpy(&xids[count], (void *) proc->subxids.xids,
+ nxids * sizeof(TransactionId));
+ count += nxids;
+ subcount += nxids;
+
+ if (proc->subxids.overflowed)
+ suboverflowed = true;
+
+ /*
+ * Top-level XID of a transaction is always greater than any of
+ * its subxids, so we don't need to check if any of the subxids
+ * are smaller than oldestRunningXid
+ */
+ }
+ }
+
+ CurrentRunningXacts->xcnt = count;
+ CurrentRunningXacts->subxid_overflow = suboverflowed;
+ CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
+ CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
+
+ LWLockRelease(XidGenLock);
+ LWLockRelease(ProcArrayLock);
+
+ return CurrentRunningXacts;
+}
+
+/*
* GetTransactionsInCommit -- Get the XIDs of transactions that are committing
*
* Constructs an array of XIDs of transactions that are currently in commit
@@ -1101,6 +1598,154 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
return vxids;
}
+/*
+ * GetConflictingVirtualXIDs -- returns an array of currently active VXIDs.
+ *
+ * The array is palloc'd and is terminated with an invalid VXID.
+ *
+ * Usage is limited to conflict resolution during recovery on standby servers.
+ * limitXmin is supplied as either latestRemovedXid, or InvalidTransactionId
+ * in cases where we cannot accurately determine a value for latestRemovedXid.
+ * If limitXmin is InvalidTransactionId then we know that the very
+ * latest xid that might have caused a cleanup record will be
+ * latestCompletedXid, so we set limitXmin to be latestCompletedXid instead.
+ * We then skip any backends with xmin > limitXmin. This means that
+ * cleanup records don't conflict with some recent snapshots.
+ *
+ * We replace InvalidTransactionId with latestCompletedXid here because
+ * this is the most convenient place to do that, while we hold ProcArrayLock.
+ * The originator of the cleanup record wanted to avoid checking the value of
+ * latestCompletedXid since doing so would be a performance issue during
+ * normal running, so we check it essentially for free on the standby.
+ *
+ * If dbOid is valid we skip backends attached to other databases. Some
+ * callers choose to skipExistingConflicts.
+ *
+ * Be careful to *not* pfree the result from this function. We reuse
+ * this array sufficiently often that we use malloc for the result.
+ */
+VirtualTransactionId *
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+ bool skipExistingConflicts)
+{
+ static VirtualTransactionId *vxids;
+ ProcArrayStruct *arrayP = procArray;
+ int count = 0;
+ int index;
+
+ /*
+ * If not first time through, get workspace to remember main XIDs in. We
+ * malloc it permanently to avoid repeated palloc/pfree overhead.
+ * Allow result space, remembering room for a terminator.
+ */
+ if (vxids == NULL)
+ {
+ vxids = (VirtualTransactionId *)
+ malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
+ if (vxids == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ /*
+ * If we don't know the TransactionId that created the conflict, set
+ * it to latestCompletedXid which is the latest possible value.
+ */
+ if (!TransactionIdIsValid(limitXmin))
+ limitXmin = ShmemVariableCache->latestCompletedXid;
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+
+ /* Exclude prepared transactions */
+ if (proc->pid == 0)
+ continue;
+
+ if (skipExistingConflicts && proc->recoveryConflictMode > 0)
+ continue;
+
+ if (!OidIsValid(dbOid) ||
+ proc->databaseId == dbOid)
+ {
+ /* Fetch xmin just once - can't change on us, but good coding */
+ TransactionId pxmin = proc->xmin;
+
+ /*
+ * We ignore an invalid pxmin because this means that backend
+ * has no snapshot and cannot get another one while we hold exclusive lock.
+ */
+ if (TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin))
+ {
+ VirtualTransactionId vxid;
+
+ GET_VXID_FROM_PGPROC(vxid, *proc);
+ if (VirtualTransactionIdIsValid(vxid))
+ vxids[count++] = vxid;
+ }
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ /* add the terminator */
+ vxids[count].backendId = InvalidBackendId;
+ vxids[count].localTransactionId = InvalidLocalTransactionId;
+
+ return vxids;
+}
+
+/*
+ * CancelVirtualTransaction - used in recovery conflict processing
+ *
+ * Returns pid of the process signaled, or 0 if not found.
+ */
+pid_t
+CancelVirtualTransaction(VirtualTransactionId vxid, int cancel_mode)
+{
+ ProcArrayStruct *arrayP = procArray;
+ int index;
+ pid_t pid = 0;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ VirtualTransactionId procvxid;
+ PGPROC *proc = arrayP->procs[index];
+
+ GET_VXID_FROM_PGPROC(procvxid, *proc);
+
+ if (procvxid.backendId == vxid.backendId &&
+ procvxid.localTransactionId == vxid.localTransactionId)
+ {
+ /*
+ * Issue orders for the proc to read next time it receives SIGINT
+ */
+ if (proc->recoveryConflictMode < cancel_mode)
+ proc->recoveryConflictMode = cancel_mode;
+
+ pid = proc->pid;
+ break;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ if (pid != 0)
+ {
+ /*
+ * Kill the pid if it's still here. If not, that's what we wanted
+ * so ignore any errors.
+ */
+ kill(pid, SIGINT);
+ }
+
+ return pid;
+}
/*
* CountActiveBackends --- count backends (other than myself) that are in
@@ -1400,3 +2045,457 @@ DisplayXidCache(void)
}
#endif /* XIDCACHE_DEBUG */
+
+/* ----------------------------------------------
+ * KnownAssignedTransactions sub-module
+ * ----------------------------------------------
+ */
+
+/*
+ * In Hot Standby mode, we maintain a list of transactions that are (or were)
+ * running in the master at the current point in WAL.
+ *
+ * RecordKnownAssignedTransactionIds() should be run for *every* WAL record
+ * type apart from XLOG_XACT_RUNNING_XACTS, since that initialises the first
+ * snapshot so that RecordKnownAssignedTransactionIds() can be callsed. Uses
+ * local variables, so should only be called by Startup process.
+ *
+ * We record all xids that we know have been assigned. That includes
+ * all the xids on the WAL record, plus all unobserved xids that
+ * we can deduce have been assigned. We can deduce the existence of
+ * unobserved xids because we know xids are in sequence, with no gaps.
+ *
+ * During recovery we do not fret too much about the distinction between
+ * top-level xids and subtransaction xids. We hold both together in
+ * a hash table called KnownAssignedXids. In backends, this is copied into
+ * snapshots in GetSnapshotData(), taking advantage
+ * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
+ * either. Subtransaction xids are effectively treated as top-level xids
+ * and in the typical case pg_subtrans is *not* maintained (and that
+ * does not effect visibility).
+ *
+ * KnownAssignedXids expands as new xids are observed or inferred, and
+ * contracts when transaction completion records arrive. We have room in a
+ * snapshot to hold maxProcs * (1 + PGPROC_MAX_CACHED_SUBXIDS) xids, so
+ * every transaction must report their subtransaction xids in a special
+ * WAL assignment record every PGPROC_MAX_CACHED_SUBXIDS. This allows us
+ * to remove the subtransaction xids and update pg_subtrans instead. Snapshots
+ * are still correct yet we don't overflow SnapshotData structure. When we do
+ * this we need
+ * to keep track of which xids caused the snapshot to overflow. We do that
+ * by simply tracking the lastOverflowedXid - if it is within the bounds of
+ * the KnownAssignedXids then we know the snapshot overflowed. (Note that
+ * subxid overflow occurs on primary when 65th subxid arrives, whereas on
+ * standby it occurs when 64th subxid arrives - that is not an error).
+ *
+ * Should FATAL errors result in a backend on primary disappearing before
+ * it can write an abort record then we just leave those xids in
+ * KnownAssignedXids. They actually aborted but we think they were running;
+ * the distinction is irrelevant because either way any changes done by the
+ * transaction are not visible to backends in the standby.
+ * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
+ * ensure we do not overflow.
+ *
+ * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
+ * xids that are not present.
+ */
+void
+RecordKnownAssignedTransactionIds(TransactionId xid)
+{
+ /*
+ * Skip processing if the current snapshot is not initialized.
+ */
+ if (standbyState < STANDBY_SNAPSHOT_PENDING)
+ return;
+
+ /*
+ * We can see WAL records before the running-xacts snapshot that
+ * contain XIDs that are not in the running-xacts snapshot, but that we
+ * know to have finished before the running-xacts snapshot was taken.
+ * Don't waste precious shared memory by keeping them in the hash table.
+ *
+ * We can also see WAL records before the running-xacts snapshot that
+ * contain XIDs that are not in the running-xacts snapshot for a different
+ * reason: the transaction started *after* the running-xacts snapshot
+ * was taken, but before it was written to WAL. We must be careful to
+ * not ignore such XIDs. Because such a transaction started after the
+ * running-xacts snapshot was taken, it must have an XID larger than
+ * the oldest XID according to the running-xacts snapshot.
+ */
+ if (TransactionIdPrecedes(xid, snapshotOldestActiveXid))
+ return;
+
+ ereport(trace_recovery(DEBUG4),
+ (errmsg("record known xact %u latestObservedXid %u",
+ xid, latestObservedXid)));
+
+ /*
+ * When a newly observed xid arrives, it is frequently the case
+ * that it is *not* the next xid in sequence. When this occurs, we
+ * must treat the intervening xids as running also.
+ */
+ if (TransactionIdFollows(xid, latestObservedXid))
+ {
+ TransactionId next_expected_xid = latestObservedXid;
+ TransactionIdAdvance(next_expected_xid);
+
+ /*
+ * Locking requirement is currently higher than for xid assignment
+ * in normal running. However, we only get called here for new
+ * high xids - so on a multi-processor where it is common that xids
+ * arrive out of order the average number of locks per assignment
+ * will actually reduce. So not too worried about this locking.
+ *
+ * XXX It does seem possible that we could add a whole range
+ * of numbers atomically to KnownAssignedXids, if we use a sorted
+ * list for KnownAssignedXids. But that design also increases the
+ * length of time we hold lock when we process commits/aborts, so
+ * on balance don't worry about this.
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
+ {
+ if (TransactionIdPrecedes(next_expected_xid, xid))
+ ereport(trace_recovery(DEBUG4),
+ (errmsg("recording unobserved xid %u (latestObservedXid %u)",
+ next_expected_xid, latestObservedXid)));
+ KnownAssignedXidsAdd(&next_expected_xid, 1);
+
+ /*
+ * Extend clog and subtrans like we do in GetNewTransactionId()
+ * during normal operation
+ */
+ ExtendCLOG(next_expected_xid);
+ ExtendSUBTRANS(next_expected_xid);
+
+ TransactionIdAdvance(next_expected_xid);
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ latestObservedXid = xid;
+ }
+
+ /* nextXid must be beyond any observed xid */
+ if (TransactionIdFollowsOrEquals(latestObservedXid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = latestObservedXid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+}
+
+void
+ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
+ TransactionId *subxids)
+{
+ int i;
+ TransactionId max_xid;
+
+ if (standbyState == STANDBY_DISABLED)
+ return;
+
+ max_xid = TransactionIdLatest(xid, nsubxids, subxids);
+
+ /*
+ * Uses same locking as transaction commit
+ */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ if (TransactionIdIsValid(xid))
+ KnownAssignedXidsRemove(xid);
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i]);
+
+ /* Like in ProcArrayRemove, advance latestCompletedXid */
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->latestCompletedXid))
+ ShmemVariableCache->latestCompletedXid = max_xid;
+
+ LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireAllKnownAssignedTransactionIds(void)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
+ LWLockRelease(ProcArrayLock);
+}
+
+void
+ExpireOldKnownAssignedTransactionIds(TransactionId xid)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsRemoveMany(xid, true);
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * Private module functions to manipulate KnownAssignedXids
+ *
+ * There are 3 main users of the KnownAssignedXids data structure:
+ *
+ * * backends taking snapshots
+ * * startup process adding new knownassigned xids
+ * * startup process removing xids as transactions end
+ *
+ * If we make KnownAssignedXids a simple sorted array then the first two
+ * operations are fast, but the last one is at least O(N). If we make
+ * KnownAssignedXids a hash table then the last two operations are fast,
+ * though we have to do more work at snapshot time. Doing more work at
+ * commit could slow down taking snapshots anyway because of lwlock
+ * contention. Scanning the hash table is O(N) on the max size of the array,
+ * so performs poorly in comparison when we have very low numbers of
+ * write transactions to process. But at least it is constant overhead
+ * and a sequential memory scan will utilise hardware memory readahead
+ * to give much improved performance. In any case the emphasis must be on
+ * having the standby process changes quickly so that it can provide
+ * high availability. So we choose to implement as a hash table.
+ */
+
+static Size
+KnownAssignedXidsShmemSize(int size)
+{
+ return hash_estimate_size(size, sizeof(TransactionId));
+}
+
+static void
+KnownAssignedXidsInit(int size)
+{
+ HASHCTL info;
+
+ /* assume no locking is needed yet */
+
+ info.keysize = sizeof(TransactionId);
+ info.entrysize = sizeof(TransactionId);
+ info.hash = tag_hash;
+
+ KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
+ size, size,
+ &info,
+ HASH_ELEM | HASH_FUNCTION);
+
+ if (!KnownAssignedXidsHash)
+ elog(FATAL, "could not initialize known assigned xids hash table");
+
+ procArray->numKnownAssignedXids = 0;
+ procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
+ procArray->lastOverflowedXid = InvalidTransactionId;
+}
+
+/*
+ * Add xids into KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsAdd(TransactionId *xids, int nxids)
+{
+ TransactionId *result;
+ bool found;
+ int i;
+
+ for (i = 0; i < nxids; i++)
+ {
+ Assert(TransactionIdIsValid(xids[i]));
+
+ elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);
+
+ procArray->numKnownAssignedXids++;
+ if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids)
+ {
+ KnownAssignedXidsDisplay(LOG);
+ LWLockRelease(ProcArrayLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("too many KnownAssignedXids")));
+ }
+
+ result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
+ &found);
+
+ if (!result)
+ {
+ LWLockRelease(ProcArrayLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+ }
+
+ if (found)
+ {
+ KnownAssignedXidsDisplay(LOG);
+ LWLockRelease(ProcArrayLock);
+ elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]);
+ }
+ }
+}
+
+/*
+ * Is an xid present in KnownAssignedXids?
+ *
+ * Must be called while holding ProcArrayLock in shared mode
+ */
+static bool
+KnownAssignedXidsExist(TransactionId xid)
+{
+ bool found;
+ (void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
+ return found;
+}
+
+/*
+ * Remove one xid from anywhere in KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsRemove(TransactionId xid)
+{
+ bool found;
+
+ Assert(TransactionIdIsValid(xid));
+
+ elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
+
+ (void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);
+
+ if (found)
+ procArray->numKnownAssignedXids--;
+ Assert(procArray->numKnownAssignedXids >= 0);
+
+ /*
+ * We can fail to find an xid if the xid came from a subtransaction
+ * that aborts, though the xid hadn't yet been reported and no WAL records
+ * have been written using the subxid. In that case the abort record will
+ * contain that subxid and we haven't seen it before.
+ *
+ * If we fail to find it for other reasons it might be a problem, but
+ * it isn't much use to log that it happened, since we can't divine much
+ * from just an isolated xid value.
+ */
+}
+
+/*
+ * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
+ * We filter out anything higher than xmax.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
+{
+ TransactionId xtmp = InvalidTransactionId;
+
+ return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
+}
+
+/*
+ * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
+ * to the lowest xid value seen if not already lower.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax)
+{
+ HASH_SEQ_STATUS status;
+ TransactionId *knownXid;
+ int count = 0;
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ {
+ /*
+ * Filter out anything higher than xmax
+ */
+ if (TransactionIdPrecedes(xmax, *knownXid))
+ continue;
+
+ *xarray = *knownXid;
+ xarray++;
+ count++;
+
+ /* update xmin if required */
+ if (TransactionIdPrecedes(*knownXid, *xmin))
+ *xmin = *knownXid;
+ }
+
+ return count;
+}
+
+/*
+ * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
+ * then clear the whole table.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
+{
+ TransactionId *knownXid;
+ HASH_SEQ_STATUS status;
+
+ if (TransactionIdIsValid(xid))
+ elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
+ else
+ elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ {
+ TransactionId removeXid = *knownXid;
+ bool found;
+
+ if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
+ {
+ if (keepPreparedXacts && StandbyTransactionIdIsPrepared(xid))
+ continue;
+ else
+ {
+ (void) hash_search(KnownAssignedXidsHash, &removeXid,
+ HASH_REMOVE, &found);
+ if (found)
+ procArray->numKnownAssignedXids--;
+ Assert(procArray->numKnownAssignedXids >= 0);
+ }
+ }
+ }
+}
+
+/*
+ * Display KnownAssignedXids to provide debug trail
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+void
+KnownAssignedXidsDisplay(int trace_level)
+{
+ HASH_SEQ_STATUS status;
+ TransactionId *knownXid;
+ StringInfoData buf;
+ TransactionId *xids;
+ int nxids;
+ int i;
+
+ xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
+ nxids = 0;
+
+ hash_seq_init(&status, KnownAssignedXidsHash);
+ while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
+ xids[nxids++] = *knownXid;
+
+ qsort(xids, nxids, sizeof(TransactionId), xidComparator);
+
+ initStringInfo(&buf);
+
+ for (i = 0; i < nxids; i++)
+ appendStringInfo(&buf, "%u ", xids[i]);
+
+ elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);
+
+ pfree(buf.data);
+}
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index dfa0ad7b5eb..e33664fc488 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.79 2009/07/31 20:26:23 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.80 2009/12/19 01:32:35 sriggs Exp $
*
*-------------------------------------------------------------------------
*/
@@ -145,6 +145,13 @@ typedef struct ProcState
bool signaled; /* backend has been sent catchup signal */
/*
+ * Backend only sends invalidations, never receives them. This only makes sense
+ * for Startup process during recovery because it doesn't maintain a relcache,
+ * yet it fires inval messages to allow query backends to see schema changes.
+ */
+ bool sendOnly; /* backend only sends, never receives */
+
+ /*
* Next LocalTransactionId to use for each idle backend slot. We keep
* this here because it is indexed by BackendId and it is convenient to
* copy the value to and from local memory when MyBackendId is set. It's
@@ -249,7 +256,7 @@ CreateSharedInvalidationState(void)
* Initialize a new backend to operate on the sinval buffer
*/
void
-SharedInvalBackendInit(void)
+SharedInvalBackendInit(bool sendOnly)
{
int index;
ProcState *stateP = NULL;
@@ -308,6 +315,7 @@ SharedInvalBackendInit(void)
stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false;
stateP->signaled = false;
+ stateP->sendOnly = sendOnly;
LWLockRelease(SInvalWriteLock);
@@ -579,7 +587,9 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
* furthest-back backend that needs signaling (if any), and reset any
- * backends that are too far back.
+ * backends that are too far back. Note that because we ignore sendOnly
+ * backends here it is possible for them to keep sending messages without
+ * a problem even when they are the only active backend.
*/
min = segP->maxMsgNum;
minsig = min - SIG_THRESHOLD;
@@ -591,7 +601,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
int n = stateP->nextMsgNum;
/* Ignore if inactive or already in reset state */
- if (stateP->procPid == 0 || stateP->resetState)
+ if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
continue;
/*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
new file mode 100644
index 00000000000..38bc005820b
--- /dev/null
+++ b/src/backend/storage/ipc/standby.c
@@ -0,0 +1,717 @@
+/*-------------------------------------------------------------------------
+ *
+ * standby.c
+ * Misc functions used in Hot Standby mode.
+ *
+ * InitRecoveryTransactionEnvironment()
+ * ShutdownRecoveryTransactionEnvironment()
+ *
+ * ResolveRecoveryConflictWithVirtualXIDs()
+ *
+ * All functions for handling RM_STANDBY_ID, which relate to
+ * AccessExclusiveLocks and starting snapshots for Hot Standby mode.
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.1 2009/12/19 01:32:35 sriggs Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/sinvaladt.h"
+#include "storage/standby.h"
+#include "utils/ps_status.h"
+
+int vacuum_defer_cleanup_age;
+
+static List *RecoveryLockList;
+
+static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
+static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+
+/*
+ * InitRecoveryTransactionEnvironment
+ * Initiallize tracking of in-progress transactions in master
+ *
+ * We need to issue shared invalidations and hold locks. Holding locks
+ * means others may want to wait on us, so we need to make lock table
+ * inserts to appear like a transaction. We could create and delete
+ * lock table entries for each transaction but its simpler just to create
+ * one permanent entry and leave it there all the time. Locks are then
+ * acquired and released as needed. Yes, this means you can see the
+ * Startup process in pg_locks once we have run this.
+ */
+void
+InitRecoveryTransactionEnvironment(void)
+{
+ VirtualTransactionId vxid;
+
+ /*
+ * Initialise shared invalidation management for Startup process,
+ * being careful to register ourselves as a sendOnly process so
+ * we don't need to read messages, nor will we get signalled
+ * when the queue starts filling up.
+ */
+ SharedInvalBackendInit(true);
+
+ /*
+ * Record the PID and PGPROC structure of the startup process.
+ */
+ PublishStartupProcessInformation();
+
+ /*
+ * Lock a virtual transaction id for Startup process.
+ *
+ * We need to do GetNextLocalTransactionId() because
+ * SharedInvalBackendInit() leaves localTransactionid invalid and
+ * the lock manager doesn't like that at all.
+ *
+ * Note that we don't need to run XactLockTableInsert() because nobody
+ * needs to wait on xids. That sounds a little strange, but table locks
+ * are held by vxids and row level locks are held by xids. All queries
+ * hold AccessShareLocks so never block while we write or lock new rows.
+ */
+ vxid.backendId = MyBackendId;
+ vxid.localTransactionId = GetNextLocalTransactionId();
+ VirtualXactLockTableInsert(vxid);
+
+ standbyState = STANDBY_INITIALIZED;
+}
+
+/*
+ * ShutdownRecoveryTransactionEnvironment
+ * Shut down transaction tracking
+ *
+ * Prepare to switch from hot standby mode to normal operation. Shut down
+ * recovery-time transaction tracking.
+ */
+void
+ShutdownRecoveryTransactionEnvironment(void)
+{
+ /* Mark all tracked in-progress transactions as finished. */
+ ExpireAllKnownAssignedTransactionIds();
+
+ /* Release all locks the tracked transactions were holding */
+ StandbyReleaseAllLocks();
+}
+
+
+/*
+ * -----------------------------------------------------
+ * Standby wait timers and backend cancel logic
+ * -----------------------------------------------------
+ */
+
+#define STANDBY_INITIAL_WAIT_US 1000
+static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
+
+/*
+ * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
+ * We wait here for a while then return. If we decide we can't wait any
+ * more then we return true, if we can wait some more return false.
+ */
+static bool
+WaitExceedsMaxStandbyDelay(void)
+{
+ long delay_secs;
+ int delay_usecs;
+
+ /* max_standby_delay = -1 means wait forever, if necessary */
+ if (MaxStandbyDelay < 0)
+ return false;
+
+ /* Are we past max_standby_delay? */
+ TimestampDifference(GetLatestXLogTime(), GetCurrentTimestamp(),
+ &delay_secs, &delay_usecs);
+ if (delay_secs > MaxStandbyDelay)
+ return true;
+
+ /*
+ * Sleep, then do bookkeeping.
+ */
+ pg_usleep(standbyWait_us);
+
+ /*
+ * Progressively increase the sleep times.
+ */
+ standbyWait_us *= 2;
+ if (standbyWait_us > 1000000)
+ standbyWait_us = 1000000;
+ if (standbyWait_us > MaxStandbyDelay * 1000000 / 4)
+ standbyWait_us = MaxStandbyDelay * 1000000 / 4;
+
+ return false;
+}
+
+/*
+ * This is the main executioner for any query backend that conflicts with
+ * recovery processing. Judgement has already been passed on it within
+ * a specific rmgr. Here we just issue the orders to the procs. The procs
+ * then throw the required error as instructed.
+ *
+ * We may ask for a specific cancel_mode, typically ERROR or FATAL.
+ */
+void
+ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
+ char *reason, int cancel_mode)
+{
+ char waitactivitymsg[100];
+
+ Assert(cancel_mode > 0);
+
+ while (VirtualTransactionIdIsValid(*waitlist))
+ {
+ long wait_s;
+ int wait_us; /* wait in microseconds (us) */
+ TimestampTz waitStart;
+ bool logged;
+
+ waitStart = GetCurrentTimestamp();
+ standbyWait_us = STANDBY_INITIAL_WAIT_US;
+ logged = false;
+
+ /* wait until the virtual xid is gone */
+ while(!ConditionalVirtualXactLockTableWait(*waitlist))
+ {
+ /*
+ * Report if we have been waiting for a while now...
+ */
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampDifference(waitStart, now, &wait_s, &wait_us);
+ if (!logged && (wait_s > 0 || wait_us > 500000))
+ {
+ const char *oldactivitymsg;
+ int len;
+
+ oldactivitymsg = get_ps_display(&len);
+ snprintf(waitactivitymsg, sizeof(waitactivitymsg),
+ "waiting for max_standby_delay (%u ms)",
+ MaxStandbyDelay);
+ set_ps_display(waitactivitymsg, false);
+ if (len > 100)
+ len = 100;
+ memcpy(waitactivitymsg, oldactivitymsg, len);
+
+ ereport(trace_recovery(DEBUG5),
+ (errmsg("virtual transaction %u/%u is blocking %s",
+ waitlist->backendId,
+ waitlist->localTransactionId,
+ reason)));
+
+ pgstat_report_waiting(true);
+
+ logged = true;
+ }
+
+ /* Is it time to kill it? */
+ if (WaitExceedsMaxStandbyDelay())
+ {
+ pid_t pid;
+
+ /*
+ * Now find out who to throw out of the balloon.
+ */
+ Assert(VirtualTransactionIdIsValid(*waitlist));
+ pid = CancelVirtualTransaction(*waitlist, cancel_mode);
+
+ if (pid != 0)
+ {
+ /*
+ * Startup process debug messages
+ */
+ switch (cancel_mode)
+ {
+ case CONFLICT_MODE_FATAL:
+ elog(trace_recovery(DEBUG1),
+ "recovery disconnects session with pid %d because of conflict with %s",
+ pid,
+ reason);
+ break;
+ case CONFLICT_MODE_ERROR:
+ elog(trace_recovery(DEBUG1),
+ "recovery cancels virtual transaction %u/%u pid %d because of conflict with %s",
+ waitlist->backendId,
+ waitlist->localTransactionId,
+ pid,
+ reason);
+ break;
+ default:
+ /* No conflict pending, so fall through */
+ break;
+ }
+
+ /*
+ * Wait awhile for it to die so that we avoid flooding an
+ * unresponsive backend when system is heavily loaded.
+ */
+ pg_usleep(5000);
+ }
+ }
+ }
+
+ /* Reset ps display */
+ if (logged)
+ {
+ set_ps_display(waitactivitymsg, false);
+ pgstat_report_waiting(false);
+ }
+
+ /* The virtual transaction is gone now, wait for the next one */
+ waitlist++;
+ }
+}
+
+/*
+ * -----------------------------------------------------
+ * Locking in Recovery Mode
+ * -----------------------------------------------------
+ *
+ * All locks are held by the Startup process using a single virtual
+ * transaction. This implementation is both simpler and in some senses,
+ * more correct. The locks held mean "some original transaction held
+ * this lock, so query access is not allowed at this time". So the Startup
+ * process is the proxy by which the original locks are implemented.
+ *
+ * We only keep track of AccessExclusiveLocks, which are only ever held by
+ * one transaction on one relation, and don't worry about lock queuing.
+ *
+ * We keep a single dynamically expandible list of locks in local memory,
+ * RelationLockList, so we can keep track of the various entried made by
+ * the Startup process's virtual xid in the shared lock table.
+ *
+ * List elements use type xl_rel_lock, since the WAL record type exactly
+ * matches the information that we need to keep track of.
+ *
+ * We use session locks rather than normal locks so we don't need
+ * ResourceOwners.
+ */
+
+
+void
+StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
+{
+ xl_standby_lock *newlock;
+ LOCKTAG locktag;
+ bool report_memory_error = false;
+ int num_attempts = 0;
+
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ return;
+
+ elog(trace_recovery(DEBUG4),
+ "adding recovery lock: db %d rel %d", dbOid, relOid);
+
+ /* dbOid is InvalidOid when we are locking a shared relation. */
+ Assert(OidIsValid(relOid));
+
+ newlock = palloc(sizeof(xl_standby_lock));
+ newlock->xid = xid;
+ newlock->dbOid = dbOid;
+ newlock->relOid = relOid;
+ RecoveryLockList = lappend(RecoveryLockList, newlock);
+
+ /*
+ * Attempt to acquire the lock as requested.
+ */
+ SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
+
+ /*
+ * Wait for lock to clear or kill anyone in our way.
+ */
+ while (LockAcquireExtended(&locktag, AccessExclusiveLock,
+ true, true, report_memory_error)
+ == LOCKACQUIRE_NOT_AVAIL)
+ {
+ VirtualTransactionId *backends;
+
+ /*
+ * If blowing away everybody with conflicting locks doesn't work,
+ * after the first two attempts then we just start blowing everybody
+ * away until it does work. We do this because its likely that we
+ * either have too many locks and we just can't get one at all,
+ * or that there are many people crowding for the same table.
+ * Recovery must win; the end justifies the means.
+ */
+ if (++num_attempts < 3)
+ backends = GetLockConflicts(&locktag, AccessExclusiveLock);
+ else
+ {
+ backends = GetConflictingVirtualXIDs(InvalidTransactionId,
+ InvalidOid,
+ true);
+ report_memory_error = true;
+ }
+
+ ResolveRecoveryConflictWithVirtualXIDs(backends,
+ "exclusive lock",
+ CONFLICT_MODE_ERROR);
+ }
+}
+
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+ ListCell *cell,
+ *prev,
+ *next;
+
+ /*
+ * Release all matching locks and remove them from list
+ */
+ prev = NULL;
+ for (cell = list_head(RecoveryLockList); cell; cell = next)
+ {
+ xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+ next = lnext(cell);
+
+ if (!TransactionIdIsValid(xid) || lock->xid == xid)
+ {
+ LOCKTAG locktag;
+
+ elog(trace_recovery(DEBUG4),
+ "releasing recovery lock: xid %u db %d rel %d",
+ lock->xid, lock->dbOid, lock->relOid);
+ SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+ if (!LockRelease(&locktag, AccessExclusiveLock, true))
+ elog(trace_recovery(LOG),
+ "RecoveryLockList contains entry for lock "
+ "no longer recorded by lock manager "
+ "xid %u database %d relation %d",
+ lock->xid, lock->dbOid, lock->relOid);
+
+ RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+ pfree(lock);
+ }
+ else
+ prev = cell;
+ }
+}
+
+/*
+ * Release locks for a transaction tree, starting at xid down, from
+ * RecoveryLockList.
+ *
+ * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
+ * to remove any AccessExclusiveLocks requested by a transaction.
+ */
+void
+StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
+{
+ int i;
+
+ StandbyReleaseLocks(xid);
+
+ for (i = 0; i < nsubxids; i++)
+ StandbyReleaseLocks(subxids[i]);
+}
+
+/*
+ * StandbyReleaseOldLocks
+ * Release standby locks held by XIDs < removeXid
+ * In some cases, keep prepared transactions.
+ */
+static void
+StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts)
+{
+ ListCell *cell,
+ *prev,
+ *next;
+ LOCKTAG locktag;
+
+ /*
+ * Release all matching locks.
+ */
+ prev = NULL;
+ for (cell = list_head(RecoveryLockList); cell; cell = next)
+ {
+ xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+ next = lnext(cell);
+
+ if (!TransactionIdIsValid(removeXid) || TransactionIdPrecedes(lock->xid, removeXid))
+ {
+ if (keepPreparedXacts && StandbyTransactionIdIsPrepared(lock->xid))
+ continue;
+ elog(trace_recovery(DEBUG4),
+ "releasing recovery lock: xid %u db %d rel %d",
+ lock->xid, lock->dbOid, lock->relOid);
+ SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+ if (!LockRelease(&locktag, AccessExclusiveLock, true))
+ elog(trace_recovery(LOG),
+ "RecoveryLockList contains entry for lock "
+ "no longer recorded by lock manager "
+ "xid %u database %d relation %d",
+ lock->xid, lock->dbOid, lock->relOid);
+ RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
+ pfree(lock);
+ }
+ else
+ prev = cell;
+ }
+}
+
+/*
+ * Called at end of recovery and when we see a shutdown checkpoint.
+ */
+void
+StandbyReleaseAllLocks(void)
+{
+ elog(trace_recovery(DEBUG2), "release all standby locks");
+ StandbyReleaseLocksMany(InvalidTransactionId, false);
+}
+
+/*
+ * StandbyReleaseOldLocks
+ * Release standby locks held by XIDs < removeXid, as long
+ * as their not prepared transactions.
+ */
+void
+StandbyReleaseOldLocks(TransactionId removeXid)
+{
+ StandbyReleaseLocksMany(removeXid, true);
+}
+
+/*
+ * --------------------------------------------------------------------
+ * Recovery handling for Rmgr RM_STANDBY_ID
+ *
+ * These record types will only be created if XLogStandbyInfoActive()
+ * --------------------------------------------------------------------
+ */
+
+void
+standby_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
+
+ /* Do nothing if we're not in standby mode */
+ if (standbyState == STANDBY_DISABLED)
+ return;
+
+ if (info == XLOG_STANDBY_LOCK)
+ {
+ xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
+ int i;
+
+ for (i = 0; i < xlrec->nlocks; i++)
+ StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
+ xlrec->locks[i].dbOid,
+ xlrec->locks[i].relOid);
+ }
+ else if (info == XLOG_RUNNING_XACTS)
+ {
+ xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
+ RunningTransactionsData running;
+
+ running.xcnt = xlrec->xcnt;
+ running.subxid_overflow = xlrec->subxid_overflow;
+ running.nextXid = xlrec->nextXid;
+ running.oldestRunningXid = xlrec->oldestRunningXid;
+ running.xids = xlrec->xids;
+
+ ProcArrayApplyRecoveryInfo(&running);
+ }
+ else
+ elog(PANIC, "relation_redo: unknown op code %u", info);
+}
+
+static void
+standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
+{
+ int i;
+
+ appendStringInfo(buf,
+ " nextXid %u oldestRunningXid %u",
+ xlrec->nextXid,
+ xlrec->oldestRunningXid);
+ if (xlrec->xcnt > 0)
+ {
+ appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
+ for (i = 0; i < xlrec->xcnt; i++)
+ appendStringInfo(buf, " %u", xlrec->xids[i]);
+ }
+
+ if (xlrec->subxid_overflow)
+ appendStringInfo(buf, "; subxid ovf");
+}
+
+void
+standby_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ if (info == XLOG_STANDBY_LOCK)
+ {
+ xl_standby_locks *xlrec = (xl_standby_locks *) rec;
+ int i;
+
+ appendStringInfo(buf, "AccessExclusive locks:");
+
+ for (i = 0; i < xlrec->nlocks; i++)
+ appendStringInfo(buf, " xid %u db %d rel %d",
+ xlrec->locks[i].xid, xlrec->locks[i].dbOid,
+ xlrec->locks[i].relOid);
+ }
+ else if (info == XLOG_RUNNING_XACTS)
+ {
+ xl_running_xacts *xlrec = (xl_running_xacts *) rec;
+
+ appendStringInfo(buf, " running xacts:");
+ standby_desc_running_xacts(buf, xlrec);
+ }
+ else
+ appendStringInfo(buf, "UNKNOWN");
+}
+
+/*
+ * Log details of the current snapshot to WAL. This allows the snapshot state
+ * to be reconstructed on the standby.
+ */
+void
+LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
+{
+ RunningTransactions running;
+ xl_standby_lock *locks;
+ int nlocks;
+
+ Assert(XLogStandbyInfoActive());
+
+ /*
+ * Get details of any AccessExclusiveLocks being held at the moment.
+ */
+ locks = GetRunningTransactionLocks(&nlocks);
+ if (nlocks > 0)
+ LogAccessExclusiveLocks(nlocks, locks);
+
+ /*
+ * Log details of all in-progress transactions. This should be the last
+ * record we write, because standby will open up when it sees this.
+ */
+ running = GetRunningTransactionData();
+ LogCurrentRunningXacts(running);
+
+ *oldestActiveXid = running->oldestRunningXid;
+ *nextXid = running->nextXid;
+}
+
+/*
+ * Record an enhanced snapshot of running transactions into WAL.
+ *
+ * The definitions of RunningTransactionData and xl_xact_running_xacts
+ * are similar. We keep them separate because xl_xact_running_xacts
+ * is a contiguous chunk of memory and never exists fully until it is
+ * assembled in WAL.
+ */
+static void
+LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
+{
+ xl_running_xacts xlrec;
+ XLogRecData rdata[2];
+ int lastrdata = 0;
+ XLogRecPtr recptr;
+
+ xlrec.xcnt = CurrRunningXacts->xcnt;
+ xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
+ xlrec.nextXid = CurrRunningXacts->nextXid;
+ xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
+
+ /* Header */
+ rdata[0].data = (char *) (&xlrec);
+ rdata[0].len = MinSizeOfXactRunningXacts;
+ rdata[0].buffer = InvalidBuffer;
+
+ /* array of TransactionIds */
+ if (xlrec.xcnt > 0)
+ {
+ rdata[0].next = &(rdata[1]);
+ rdata[1].data = (char *) CurrRunningXacts->xids;
+ rdata[1].len = xlrec.xcnt * sizeof(TransactionId);
+ rdata[1].buffer = InvalidBuffer;
+ lastrdata = 1;
+ }
+
+ rdata[lastrdata].next = NULL;
+
+ recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS, rdata);
+
+ if (CurrRunningXacts->subxid_overflow)
+ ereport(trace_recovery(DEBUG2),
+ (errmsg("snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u next xid %u)",
+ CurrRunningXacts->xcnt,
+ recptr.xlogid, recptr.xrecoff,
+ CurrRunningXacts->oldestRunningXid,
+ CurrRunningXacts->nextXid)));
+ else
+ ereport(trace_recovery(DEBUG2),
+ (errmsg("snapshot of %u running transaction ids (lsn %X/%X oldest xid %u next xid %u)",
+ CurrRunningXacts->xcnt,
+ recptr.xlogid, recptr.xrecoff,
+ CurrRunningXacts->oldestRunningXid,
+ CurrRunningXacts->nextXid)));
+
+}
+
+/*
+ * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
+ * logged, as described in backend/storage/lmgr/README.
+ */
+static void
+LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
+{
+ XLogRecData rdata[2];
+ xl_standby_locks xlrec;
+
+ xlrec.nlocks = nlocks;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = offsetof(xl_standby_locks, locks);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = &rdata[1];
+
+ rdata[1].data = (char *) locks;
+ rdata[1].len = nlocks * sizeof(xl_standby_lock);
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].next = NULL;
+
+ (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK, rdata);
+}
+
+/*
+ * Individual logging of AccessExclusiveLocks for use during LockAcquire()
+ */
+void
+LogAccessExclusiveLock(Oid dbOid, Oid relOid)
+{
+ xl_standby_lock xlrec;
+
+ /*
+ * Ensure that a TransactionId has been assigned to this transaction.
+ * We don't actually need the xid yet but if we don't do this then
+ * RecordTransactionCommit() and RecordTransactionAbort() will optimise
+ * away the transaction completion record which recovery relies upon to
+ * release locks. It's a hack, but for a corner case not worth adding
+ * code for into the main commit path.
+ */
+ xlrec.xid = GetTopTransactionId();
+
+ /*
+ * Decode the locktag back to the original values, to avoid
+ * sending lots of empty bytes with every message. See
+ * lock.h to check how a locktag is defined for LOCKTAG_RELATION
+ */
+ xlrec.dbOid = dbOid;
+ xlrec.relOid = relOid;
+
+ LogAccessExclusiveLocks(1, &xlrec);
+}