diff options
Diffstat (limited to 'src/backend/utils/time/snapmgr.c')
-rw-r--r-- | src/backend/utils/time/snapmgr.c | 605 |
1 files changed, 601 insertions, 4 deletions
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 518aaf1af0c..50fb78057d8 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -15,12 +15,16 @@ * handle this reference as an internally-tracked registration, so that this * module is entirely lower-level than ResourceOwners. * + * Likewise, any snapshots that have been exported by pg_export_snapshot + * have regd_count = 1 and are counted in RegisteredSnapshots, but are not + * tracked by any resource owner. + * * These arrangements let us reset MyProc->xmin when there are no snapshots * referenced by this transaction. (One possible improvement would be to be * able to advance Xmin when the snapshot with the earliest Xmin is no longer * referenced. That's a bit harder though, it requires more locking, and - * anyway it should be rather uncommon to keep snapshots referenced for too - * long.) + * anyway it should be rather uncommon to keep temporary snapshots referenced + * for too long.) * * * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group @@ -33,12 +37,16 @@ */ #include "postgres.h" +#include <sys/stat.h> +#include <unistd.h> + #include "access/transam.h" #include "access/xact.h" +#include "miscadmin.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" -#include "utils/memutils.h" +#include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/tqual.h" @@ -111,6 +119,15 @@ bool FirstSnapshotSet = false; */ static Snapshot FirstXactSnapshot = NULL; +/* Define pathname of exported-snapshot files */ +#define SNAPSHOT_EXPORT_DIR "pg_snapshots" +#define XactExportFilePath(path, xid, num, suffix) \ + snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \ + xid, num, suffix) + +/* Current xact's exported snapshots (a list of Snapshot structs) */ +static List *exportedSnapshots = NIL; + static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); @@ -139,7 +156,8 @@ GetTransactionSnapshot(void) * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must * make a copy of it rather than returning CurrentSnapshotData - * directly. + * directly. Furthermore, if we're running in serializable mode, + * predicate.c needs to wrap the snapshot fetch in its own processing. */ if (IsolationUsesXactSnapshot()) { @@ -204,6 +222,88 @@ SnapshotSetCommandId(CommandId curcid) } /* + * SetTransactionSnapshot + * Set the transaction's snapshot from an imported MVCC snapshot. + * + * Note that this is very closely tied to GetTransactionSnapshot --- it + * must take care of all the same considerations as the first-snapshot case + * in GetTransactionSnapshot. + */ +static void +SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) +{ + /* Caller should have checked this already */ + Assert(!FirstSnapshotSet); + + Assert(RegisteredSnapshots == 0); + Assert(FirstXactSnapshot == NULL); + + /* + * Even though we are not going to use the snapshot it computes, we must + * call GetSnapshotData, for two reasons: (1) to be sure that + * CurrentSnapshotData's XID arrays have been allocated, and (2) to update + * RecentXmin and RecentGlobalXmin. (We could alternatively include those + * two variables in exported snapshot files, but it seems better to have + * snapshot importers compute reasonably up-to-date values for them.) + */ + CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData); + + /* + * Now copy appropriate fields from the source snapshot. + */ + CurrentSnapshot->xmin = sourcesnap->xmin; + CurrentSnapshot->xmax = sourcesnap->xmax; + CurrentSnapshot->xcnt = sourcesnap->xcnt; + Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount()); + memcpy(CurrentSnapshot->xip, sourcesnap->xip, + sourcesnap->xcnt * sizeof(TransactionId)); + CurrentSnapshot->subxcnt = sourcesnap->subxcnt; + Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount()); + memcpy(CurrentSnapshot->subxip, sourcesnap->subxip, + sourcesnap->subxcnt * sizeof(TransactionId)); + CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed; + CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery; + /* NB: curcid should NOT be copied, it's a local matter */ + + /* + * Now we have to fix what GetSnapshotData did with MyProc->xmin and + * TransactionXmin. There is a race condition: to make sure we are not + * causing the global xmin to go backwards, we have to test that the + * source transaction is still running, and that has to be done atomically. + * So let procarray.c do it. + * + * Note: in serializable mode, predicate.c will do this a second time. + * It doesn't seem worth contorting the logic here to avoid two calls, + * especially since it's not clear that predicate.c *must* do this. + */ + if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import the requested snapshot"), + errdetail("The source transaction %u is not running anymore.", + sourcexid))); + + /* + * In transaction-snapshot mode, the first snapshot must live until end of + * xact, so we must make a copy of it. Furthermore, if we're running in + * serializable mode, predicate.c needs to do its own processing. + */ + if (IsolationUsesXactSnapshot()) + { + if (IsolationIsSerializable()) + SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid); + /* Make a saved copy */ + CurrentSnapshot = CopySnapshot(CurrentSnapshot); + FirstXactSnapshot = CurrentSnapshot; + /* Mark it as "registered" in FirstXactSnapshot */ + FirstXactSnapshot->regd_count++; + RegisteredSnapshots++; + } + + FirstSnapshotSet = true; +} + +/* * CopySnapshot * Copy the given snapshot. * @@ -558,6 +658,42 @@ AtEOXact_Snapshot(bool isCommit) } FirstXactSnapshot = NULL; + /* + * If we exported any snapshots, clean them up. + */ + if (exportedSnapshots != NIL) + { + TransactionId myxid = GetTopTransactionId(); + int i; + char buf[MAXPGPATH]; + + /* + * Get rid of the files. Unlink failure is only a WARNING because + * (1) it's too late to abort the transaction, and (2) leaving a + * leaked file around has little real consequence anyway. + */ + for (i = 1; i <= list_length(exportedSnapshots); i++) + { + XactExportFilePath(buf, myxid, i, ""); + if (unlink(buf)) + elog(WARNING, "could not unlink file \"%s\": %m", buf); + } + + /* + * As with the FirstXactSnapshot, we needn't spend any effort on + * cleaning up the per-snapshot data structures, but we do need to + * adjust the RegisteredSnapshots count to prevent a warning below. + * + * Note: you might be thinking "why do we have the exportedSnapshots + * list at all? All we need is a counter!". You're right, but we do + * it this way in case we ever feel like improving xmin management. + */ + Assert(RegisteredSnapshots >= list_length(exportedSnapshots)); + RegisteredSnapshots -= list_length(exportedSnapshots); + + exportedSnapshots = NIL; + } + /* On commit, complain about leftover snapshots */ if (isCommit) { @@ -586,3 +722,464 @@ AtEOXact_Snapshot(bool isCommit) SnapshotResetXmin(); } + + +/* + * ExportSnapshot + * Export the snapshot to a file so that other backends can import it. + * Returns the token (the file name) that can be used to import this + * snapshot. + */ +static char * +ExportSnapshot(Snapshot snapshot) +{ + TransactionId topXid; + TransactionId *children; + int nchildren; + int addTopXid; + StringInfoData buf; + FILE *f; + int i; + MemoryContext oldcxt; + char path[MAXPGPATH]; + char pathtmp[MAXPGPATH]; + + /* + * It's tempting to call RequireTransactionChain here, since it's not + * very useful to export a snapshot that will disappear immediately + * afterwards. However, we haven't got enough information to do that, + * since we don't know if we're at top level or not. For example, we + * could be inside a plpgsql function that is going to fire off other + * transactions via dblink. Rather than disallow perfectly legitimate + * usages, don't make a check. + * + * Also note that we don't make any restriction on the transaction's + * isolation level; however, importers must check the level if they + * are serializable. + */ + + /* + * This will assign a transaction ID if we do not yet have one. + */ + topXid = GetTopTransactionId(); + + /* + * We cannot export a snapshot from a subtransaction because there's no + * easy way for importers to verify that the same subtransaction is still + * running. + */ + if (IsSubTransaction()) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("cannot export a snapshot from a subtransaction"))); + + /* + * We do however allow previous committed subtransactions to exist. + * Importers of the snapshot must see them as still running, so get their + * XIDs to add them to the snapshot. + */ + nchildren = xactGetCommittedChildren(&children); + + /* + * Copy the snapshot into TopTransactionContext, add it to the + * exportedSnapshots list, and mark it pseudo-registered. We do this to + * ensure that the snapshot's xmin is honored for the rest of the + * transaction. (Right now, because SnapshotResetXmin is so stupid, this + * is overkill; but later we might make that routine smarter.) + */ + snapshot = CopySnapshot(snapshot); + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + exportedSnapshots = lappend(exportedSnapshots, snapshot); + MemoryContextSwitchTo(oldcxt); + + snapshot->regd_count++; + RegisteredSnapshots++; + + /* + * Fill buf with a text serialization of the snapshot, plus identification + * data about this transaction. The format expected by ImportSnapshot + * is pretty rigid: each line must be fieldname:value. + */ + initStringInfo(&buf); + + appendStringInfo(&buf, "xid:%u\n", topXid); + appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId); + appendStringInfo(&buf, "iso:%d\n", XactIsoLevel); + appendStringInfo(&buf, "ro:%d\n", XactReadOnly); + + appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin); + appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax); + + /* + * We must include our own top transaction ID in the top-xid data, since + * by definition we will still be running when the importing transaction + * adopts the snapshot, but GetSnapshotData never includes our own XID in + * the snapshot. (There must, therefore, be enough room to add it.) + * + * However, it could be that our topXid is after the xmax, in which case + * we shouldn't include it because xip[] members are expected to be before + * xmax. (We need not make the same check for subxip[] members, see + * snapshot.h.) + */ + addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0; + appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid); + for (i = 0; i < snapshot->xcnt; i++) + appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]); + if (addTopXid) + appendStringInfo(&buf, "xip:%u\n", topXid); + + /* + * Similarly, we add our subcommitted child XIDs to the subxid data. + * Here, we have to cope with possible overflow. + */ + if (snapshot->suboverflowed || + snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount()) + appendStringInfoString(&buf, "sof:1\n"); + else + { + appendStringInfoString(&buf, "sof:0\n"); + appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren); + for (i = 0; i < snapshot->subxcnt; i++) + appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]); + for (i = 0; i < nchildren; i++) + appendStringInfo(&buf, "sxp:%u\n", children[i]); + } + appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery); + + /* + * Now write the text representation into a file. We first write to a + * ".tmp" filename, and rename to final filename if no error. This + * ensures that no other backend can read an incomplete file + * (ImportSnapshot won't allow it because of its valid-characters check). + */ + XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp"); + if (!(f = AllocateFile(pathtmp, PG_BINARY_W))) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", pathtmp))); + + if (fwrite(buf.data, buf.len, 1, f) != 1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", pathtmp))); + + /* no fsync() since file need not survive a system crash */ + + if (FreeFile(f)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", pathtmp))); + + /* + * Now that we have written everything into a .tmp file, rename the file + * to remove the .tmp suffix. + */ + XactExportFilePath(path, topXid, list_length(exportedSnapshots), ""); + + if (rename(pathtmp, path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + pathtmp, path))); + + /* + * The basename of the file is what we return from pg_export_snapshot(). + * It's already in path in a textual format and we know that the path + * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash + * and pstrdup it so as not to return the address of a local variable. + */ + return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1); +} + +/* + * pg_export_snapshot + * SQL-callable wrapper for ExportSnapshot. + */ +Datum +pg_export_snapshot(PG_FUNCTION_ARGS) +{ + char *snapshotName; + + snapshotName = ExportSnapshot(GetActiveSnapshot()); + PG_RETURN_TEXT_P(cstring_to_text(snapshotName)); +} + + +/* + * Parsing subroutines for ImportSnapshot: parse a line with the given + * prefix followed by a value, and advance *s to the next line. The + * filename is provided for use in error messages. + */ +static int +parseIntFromText(const char *prefix, char **s, const char *filename) +{ + char *ptr = *s; + int prefixlen = strlen(prefix); + int val; + + if (strncmp(ptr, prefix, prefixlen) != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr += prefixlen; + if (sscanf(ptr, "%d", &val) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr = strchr(ptr, '\n'); + if (!ptr) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + *s = ptr + 1; + return val; +} + +static TransactionId +parseXidFromText(const char *prefix, char **s, const char *filename) +{ + char *ptr = *s; + int prefixlen = strlen(prefix); + TransactionId val; + + if (strncmp(ptr, prefix, prefixlen) != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr += prefixlen; + if (sscanf(ptr, "%u", &val) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr = strchr(ptr, '\n'); + if (!ptr) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + *s = ptr + 1; + return val; +} + +/* + * ImportSnapshot + * Import a previously exported snapshot. The argument should be a + * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file. + * This is called by "SET TRANSACTION SNAPSHOT 'foo'". + */ +void +ImportSnapshot(const char *idstr) +{ + char path[MAXPGPATH]; + FILE *f; + struct stat stat_buf; + char *filebuf; + int xcnt; + int i; + TransactionId src_xid; + Oid src_dbid; + int src_isolevel; + bool src_readonly; + SnapshotData snapshot; + + /* + * Must be at top level of a fresh transaction. Note in particular that + * we check we haven't acquired an XID --- if we have, it's conceivable + * that the snapshot would show it as not running, making for very + * screwy behavior. + */ + if (FirstSnapshotSet || + GetTopTransactionIdIfAny() != InvalidTransactionId || + IsSubTransaction()) + ereport(ERROR, + (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), + errmsg("SET TRANSACTION SNAPSHOT must be called before any query"))); + + /* + * If we are in read committed mode then the next query would execute + * with a new snapshot thus making this function call quite useless. + */ + if (!IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ"))); + + /* + * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do + * this mainly to prevent reading arbitrary files. + */ + if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid snapshot identifier \"%s\"", idstr))); + + /* OK, read the file */ + snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr); + + f = AllocateFile(path, PG_BINARY_R); + if (!f) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid snapshot identifier \"%s\"", idstr))); + + /* get the size of the file so that we know how much memory we need */ + if (fstat(fileno(f), &stat_buf)) + elog(ERROR, "could not stat file \"%s\": %m", path); + + /* and read the file into a palloc'd string */ + filebuf = (char *) palloc(stat_buf.st_size + 1); + if (fread(filebuf, stat_buf.st_size, 1, f) != 1) + elog(ERROR, "could not read file \"%s\": %m", path); + + filebuf[stat_buf.st_size] = '\0'; + + FreeFile(f); + + /* + * Construct a snapshot struct by parsing the file content. + */ + memset(&snapshot, 0, sizeof(snapshot)); + + src_xid = parseXidFromText("xid:", &filebuf, path); + /* we abuse parseXidFromText a bit here ... */ + src_dbid = parseXidFromText("dbid:", &filebuf, path); + src_isolevel = parseIntFromText("iso:", &filebuf, path); + src_readonly = parseIntFromText("ro:", &filebuf, path); + + snapshot.xmin = parseXidFromText("xmin:", &filebuf, path); + snapshot.xmax = parseXidFromText("xmax:", &filebuf, path); + + snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path); + + /* sanity-check the xid count before palloc */ + if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", path))); + + snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId)); + for (i = 0; i < xcnt; i++) + snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path); + + snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path); + + if (!snapshot.suboverflowed) + { + snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path); + + /* sanity-check the xid count before palloc */ + if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", path))); + + snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId)); + for (i = 0; i < xcnt; i++) + snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path); + } + else + { + snapshot.subxcnt = 0; + snapshot.subxip = NULL; + } + + snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path); + + /* + * Do some additional sanity checking, just to protect ourselves. We + * don't trouble to check the array elements, just the most critical + * fields. + */ + if (!TransactionIdIsNormal(src_xid) || + !OidIsValid(src_dbid) || + !TransactionIdIsNormal(snapshot.xmin) || + !TransactionIdIsNormal(snapshot.xmax)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", path))); + + /* + * If we're serializable, the source transaction must be too, otherwise + * predicate.c has problems (SxactGlobalXmin could go backwards). Also, + * a non-read-only transaction can't adopt a snapshot from a read-only + * transaction, as predicate.c handles the cases very differently. + */ + if (IsolationIsSerializable()) + { + if (src_isolevel != XACT_SERIALIZABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction"))); + if (src_readonly && !XactReadOnly) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction"))); + } + + /* + * We cannot import a snapshot that was taken in a different database, + * because vacuum calculates OldestXmin on a per-database basis; so the + * source transaction's xmin doesn't protect us from data loss. This + * restriction could be removed if the source transaction were to mark + * its xmin as being globally applicable. But that would require some + * additional syntax, since that has to be known when the snapshot is + * initially taken. (See pgsql-hackers discussion of 2011-10-21.) + */ + if (src_dbid != MyDatabaseId) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot import a snapshot from a different database"))); + + /* OK, install the snapshot */ + SetTransactionSnapshot(&snapshot, src_xid); +} + +/* + * XactHasExportedSnapshots + * Test whether current transaction has exported any snapshots. + */ +bool +XactHasExportedSnapshots(void) +{ + return (exportedSnapshots != NIL); +} + +/* + * DeleteAllExportedSnapshotFiles + * Clean up any files that have been left behind by a crashed backend + * that had exported snapshots before it died. + * + * This should be called during database startup or crash recovery. + */ +void +DeleteAllExportedSnapshotFiles(void) +{ + char buf[MAXPGPATH]; + DIR *s_dir; + struct dirent *s_de; + + if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR))) + { + /* + * We really should have that directory in a sane cluster setup. But + * then again if we don't, it's not fatal enough to make it FATAL. + * Since we're running in the postmaster, LOG is our best bet. + */ + elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR); + return; + } + + while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL) + { + if (strcmp(s_de->d_name, ".") == 0 || + strcmp(s_de->d_name, "..") == 0) + continue; + + snprintf(buf, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name); + /* Again, unlink failure is not worthy of FATAL */ + if (unlink(buf)) + elog(LOG, "could not unlink file \"%s\": %m", buf); + } + + FreeDir(s_dir); +} |