summaryrefslogtreecommitdiff
path: root/src/backend/utils/time/snapmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/time/snapmgr.c')
-rw-r--r--src/backend/utils/time/snapmgr.c404
1 files changed, 404 insertions, 0 deletions
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index b88e0120041..19504c35987 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -46,14 +46,18 @@
#include "access/transam.h"
#include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
#include "lib/pairingheap.h"
#include "miscadmin.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
+#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/resowner_private.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@@ -61,6 +65,64 @@
/*
+ * GUC parameters
+ */
+int old_snapshot_threshold; /* number of minutes, -1 disables */
+
+/*
+ * Structure for dealing with old_snapshot_threshold implementation.
+ */
+typedef struct OldSnapshotControlData
+{
+ /*
+ * Variables for old snapshot handling are shared among processes and are
+ * only allowed to move forward.
+ */
+ slock_t mutex_current; /* protect current timestamp */
+ int64 current_timestamp; /* latest snapshot timestamp */
+ slock_t mutex_latest_xmin; /* protect latest snapshot xmin */
+ TransactionId latest_xmin; /* latest snapshot xmin */
+ slock_t mutex_threshold; /* protect threshold fields */
+ int64 threshold_timestamp; /* earlier snapshot is old */
+ TransactionId threshold_xid; /* earlier xid may be gone */
+
+ /*
+ * Keep one xid per minute for old snapshot error handling.
+ *
+ * Use a circular buffer with a head offset, a count of entries currently
+ * used, and a timestamp corresponding to the xid at the head offset. A
+ * count_used value of zero means that there are no times stored; a
+ * count_used value of old_snapshot_threshold means that the buffer is
+ * full and the head must be advanced to add new entries. Use timestamps
+ * aligned to minute boundaries, since that seems less surprising than
+ * aligning based on the first usage timestamp.
+ *
+ * It is OK if the xid for a given time slot is from earlier than
+ * calculated by adding the number of minutes corresponding to the
+ * (possibly wrapped) distance from the head offset to the time of the
+ * head entry, since that just results in the vacuuming of old tuples
+ * being slightly less aggressive. It would not be OK for it to be off in
+ * the other direction, since it might result in vacuuming tuples that are
+ * still expected to be there.
+ *
+ * Use of an SLRU was considered but not chosen because it is more
+ * heavyweight than is needed for this, and would probably not be any less
+ * code to implement.
+ *
+ * Persistence is not needed.
+ */
+ int head_offset; /* subscript of oldest tracked time */
+ int64 head_timestamp; /* time corresponding to head xid */
+ int count_used; /* how many slots are in use */
+ TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
+} OldSnapshotControlData;
+
+typedef struct OldSnapshotControlData *OldSnapshotControl;
+
+static volatile OldSnapshotControl oldSnapshotControl;
+
+
+/*
* CurrentSnapshot points to the only snapshot taken in transaction-snapshot
* mode, and to the latest one taken in a read-committed transaction.
* SecondarySnapshot is a snapshot that's always up-to-date as of the current
@@ -153,6 +215,7 @@ static Snapshot FirstXactSnapshot = NULL;
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
+static int64 AlignTimestampToMinuteBoundary(int64 ts);
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
@@ -174,6 +237,49 @@ typedef struct SerializedSnapshotData
CommandId curcid;
} SerializedSnapshotData;
+Size
+SnapMgrShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(OldSnapshotControlData, xid_by_minute);
+ if (old_snapshot_threshold > 0)
+ size = add_size(size, mul_size(sizeof(TransactionId),
+ old_snapshot_threshold));
+
+ return size;
+}
+
+/*
+ * Initialize for managing old snapshot detection.
+ */
+void
+SnapMgrInit(void)
+{
+ bool found;
+
+ /*
+ * Create or attach to the OldSnapshotControl structure.
+ */
+ oldSnapshotControl = (OldSnapshotControl)
+ ShmemInitStruct("OldSnapshotControlData",
+ SnapMgrShmemSize(), &found);
+
+ if (!found)
+ {
+ SpinLockInit(&oldSnapshotControl->mutex_current);
+ oldSnapshotControl->current_timestamp = 0;
+ SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
+ oldSnapshotControl->latest_xmin = InvalidTransactionId;
+ SpinLockInit(&oldSnapshotControl->mutex_threshold);
+ oldSnapshotControl->threshold_timestamp = 0;
+ oldSnapshotControl->threshold_xid = InvalidTransactionId;
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->head_timestamp = 0;
+ oldSnapshotControl->count_used = 0;
+ }
+}
+
/*
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction.
@@ -1405,6 +1511,304 @@ ThereAreNoPriorRegisteredSnapshots(void)
return false;
}
+
+/*
+ * Return an int64 timestamp which is exactly on a minute boundary.
+ *
+ * If the argument is already aligned, return that value, otherwise move to
+ * the next minute boundary following the given time.
+ */
+static int64
+AlignTimestampToMinuteBoundary(int64 ts)
+{
+ int64 retval = ts + (USECS_PER_MINUTE - 1);
+
+ return retval - (retval % USECS_PER_MINUTE);
+}
+
+/*
+ * Get current timestamp for snapshots as int64 that never moves backward.
+ */
+int64
+GetSnapshotCurrentTimestamp(void)
+{
+ int64 now = GetCurrentIntegerTimestamp();
+
+ /*
+ * Don't let time move backward; if it hasn't advanced, use the old value.
+ */
+ SpinLockAcquire(&oldSnapshotControl->mutex_current);
+ if (now <= oldSnapshotControl->current_timestamp)
+ now = oldSnapshotControl->current_timestamp;
+ else
+ oldSnapshotControl->current_timestamp = now;
+ SpinLockRelease(&oldSnapshotControl->mutex_current);
+
+ return now;
+}
+
+/*
+ * Get timestamp through which vacuum may have processed based on last stored
+ * value for threshold_timestamp.
+ *
+ * XXX: So far, we never trust that a 64-bit value can be read atomically; if
+ * that ever changes, we could get rid of the spinlock here.
+ */
+int64
+GetOldSnapshotThresholdTimestamp(void)
+{
+ int64 threshold_timestamp;
+
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+ return threshold_timestamp;
+}
+
+static void
+SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
+{
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ oldSnapshotControl->threshold_timestamp = ts;
+ oldSnapshotControl->threshold_xid = xlimit;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+}
+
+/*
+ * TransactionIdLimitedForOldSnapshots
+ *
+ * Apply old snapshot limit, if any. This is intended to be called for page
+ * pruning and table vacuuming, to allow old_snapshot_threshold to override
+ * the normal global xmin value. Actual testing for snapshot too old will be
+ * based on whether a snapshot timestamp is prior to the threshold timestamp
+ * set in this function.
+ */
+TransactionId
+TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ Relation relation)
+{
+ if (TransactionIdIsNormal(recentXmin)
+ && old_snapshot_threshold >= 0
+ && RelationNeedsWAL(relation)
+ && !IsCatalogRelation(relation)
+ && !RelationIsAccessibleInLogicalDecoding(relation))
+ {
+ int64 ts = GetSnapshotCurrentTimestamp();
+ TransactionId xlimit = recentXmin;
+ TransactionId latest_xmin = oldSnapshotControl->latest_xmin;
+ bool same_ts_as_threshold = false;
+
+ /*
+ * Zero threshold always overrides to latest xmin, if valid. Without
+ * some heuristic it will find its own snapshot too old on, for
+ * example, a simple UPDATE -- which would make it useless for most
+ * testing, but there is no principled way to ensure that it doesn't
+ * fail in this way. Use a five-second delay to try to get useful
+ * testing behavior, but this may need adjustment.
+ */
+ if (old_snapshot_threshold == 0)
+ {
+ if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
+ && TransactionIdFollows(latest_xmin, xlimit))
+ xlimit = latest_xmin;
+
+ ts -= 5 * USECS_PER_SEC;
+ SetOldSnapshotThresholdTimestamp(ts, xlimit);
+
+ return xlimit;
+ }
+
+ ts = AlignTimestampToMinuteBoundary(ts)
+ - (old_snapshot_threshold * USECS_PER_MINUTE);
+
+ /* Check for fast exit without LW locking. */
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ if (ts == oldSnapshotControl->threshold_timestamp)
+ {
+ xlimit = oldSnapshotControl->threshold_xid;
+ same_ts_as_threshold = true;
+ }
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+ if (!same_ts_as_threshold)
+ {
+ LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
+
+ if (oldSnapshotControl->count_used > 0
+ && ts >= oldSnapshotControl->head_timestamp)
+ {
+ int offset;
+
+ offset = ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE);
+ if (offset > oldSnapshotControl->count_used - 1)
+ offset = oldSnapshotControl->count_used - 1;
+ offset = (oldSnapshotControl->head_offset + offset)
+ % old_snapshot_threshold;
+ xlimit = oldSnapshotControl->xid_by_minute[offset];
+
+ if (NormalTransactionIdFollows(xlimit, recentXmin))
+ SetOldSnapshotThresholdTimestamp(ts, xlimit);
+ }
+
+ LWLockRelease(OldSnapshotTimeMapLock);
+ }
+
+ /*
+ * Failsafe protection against vacuuming work of active transaction.
+ *
+ * This is not an assertion because we avoid the spinlock for
+ * performance, leaving open the possibility that xlimit could advance
+ * and be more current; but it seems prudent to apply this limit. It
+ * might make pruning a tiny bit less agressive than it could be, but
+ * protects against data loss bugs.
+ */
+ if (TransactionIdIsNormal(latest_xmin)
+ && TransactionIdPrecedes(latest_xmin, xlimit))
+ xlimit = latest_xmin;
+
+ if (NormalTransactionIdFollows(xlimit, recentXmin))
+ return xlimit;
+ }
+
+ return recentXmin;
+}
+
+/*
+ * Take care of the circular buffer that maps time to xid.
+ */
+void
+MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
+{
+ int64 ts;
+
+ /* Fast exit when old_snapshot_threshold is not used. */
+ if (old_snapshot_threshold < 0)
+ return;
+
+ /* Keep track of the latest xmin seen by any process. */
+ SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+ if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin))
+ oldSnapshotControl->latest_xmin = xmin;
+ SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
+
+ /* No further tracking needed for 0 (used for testing). */
+ if (old_snapshot_threshold == 0)
+ return;
+
+ /*
+ * We don't want to do something stupid with unusual values, but we don't
+ * want to litter the log with warnings or break otherwise normal
+ * processing for this feature; so if something seems unreasonable, just
+ * log at DEBUG level and return without doing anything.
+ */
+ if (whenTaken < 0)
+ {
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
+ (long) whenTaken);
+ return;
+ }
+ if (!TransactionIdIsNormal(xmin))
+ {
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with xmin = %lu",
+ (unsigned long) xmin);
+ return;
+ }
+
+ ts = AlignTimestampToMinuteBoundary(whenTaken);
+
+ LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
+
+ Assert(oldSnapshotControl->head_offset >= 0);
+ Assert(oldSnapshotControl->head_offset < old_snapshot_threshold);
+ Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
+ Assert(oldSnapshotControl->count_used >= 0);
+ Assert(oldSnapshotControl->count_used <= old_snapshot_threshold);
+
+ if (oldSnapshotControl->count_used == 0)
+ {
+ /* set up first entry for empty mapping */
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->head_timestamp = ts;
+ oldSnapshotControl->count_used = 1;
+ oldSnapshotControl->xid_by_minute[0] = xmin;
+ }
+ else if (ts < oldSnapshotControl->head_timestamp)
+ {
+ /* old ts; log it at DEBUG */
+ LWLockRelease(OldSnapshotTimeMapLock);
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
+ (long) whenTaken);
+ return;
+ }
+ else if (ts <= (oldSnapshotControl->head_timestamp +
+ ((oldSnapshotControl->count_used - 1)
+ * USECS_PER_MINUTE)))
+ {
+ /* existing mapping; advance xid if possible */
+ int bucket = (oldSnapshotControl->head_offset
+ + ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE))
+ % old_snapshot_threshold;
+
+ if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
+ oldSnapshotControl->xid_by_minute[bucket] = xmin;
+ }
+ else
+ {
+ /* We need a new bucket, but it might not be the very next one. */
+ int advance = ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE);
+
+ oldSnapshotControl->head_timestamp = ts;
+
+ if (advance >= old_snapshot_threshold)
+ {
+ /* Advance is so far that all old data is junk; start over. */
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->count_used = 1;
+ oldSnapshotControl->xid_by_minute[0] = xmin;
+ }
+ else
+ {
+ /* Store the new value in one or more buckets. */
+ int i;
+
+ for (i = 0; i < advance; i++)
+ {
+ if (oldSnapshotControl->count_used == old_snapshot_threshold)
+ {
+ /* Map full and new value replaces old head. */
+ int old_head = oldSnapshotControl->head_offset;
+
+ if (old_head == (old_snapshot_threshold - 1))
+ oldSnapshotControl->head_offset = 0;
+ else
+ oldSnapshotControl->head_offset = old_head + 1;
+ oldSnapshotControl->xid_by_minute[old_head] = xmin;
+ }
+ else
+ {
+ /* Extend map to unused entry. */
+ int new_tail = (oldSnapshotControl->head_offset
+ + oldSnapshotControl->count_used)
+ % old_snapshot_threshold;
+
+ oldSnapshotControl->count_used++;
+ oldSnapshotControl->xid_by_minute[new_tail] = xmin;
+ }
+ }
+ }
+ }
+
+ LWLockRelease(OldSnapshotTimeMapLock);
+}
+
+
/*
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
* access to behave just like it did at a certain point in the past.