summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/twophase.c
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2009-12-19 01:32:45 +0000
committerSimon Riggs <simon@2ndQuadrant.com>2009-12-19 01:32:45 +0000
commitefc16ea520679d713d98a2c7bf1453c4ff7b91ec (patch)
tree6a39d2af0704a36281dc7df3ec10823eb3e6de75 /src/backend/access/transam/twophase.c
parent78a09145e0f8322e625bbc7d69fcb865ce4f3034 (diff)
Allow read only connections during recovery, known as Hot Standby.
Enabled by recovery_connections = on (default) and forcing archive recovery using a recovery.conf. Recovery processing now emulates the original transactions as they are replayed, providing full locking and MVCC behaviour for read only queries. Recovery must enter consistent state before connections are allowed, so there is a delay, typically short, before connections succeed. Replay of recovering transactions can conflict and in some cases deadlock with queries during recovery; these result in query cancellation after max_standby_delay seconds have expired. Infrastructure changes have minor effects on normal running, though introduce four new types of WAL record. New test mode "make standbycheck" allows regression tests of static command behaviour on a standby server while in recovery. Typical and extreme dynamic behaviours have been checked via code inspection and manual testing. Few port specific behaviours have been utilised, though primary testing has been on Linux only so far. This commit is the basic patch. Additional changes will follow in this release to enhance some aspects of behaviour, notably improved handling of conflicts, deadlock detection and query cancellation. Changes to VACUUM FULL are also required. Simon Riggs, with significant and lengthy review by Heikki Linnakangas, including streamlined redesign of snapshot creation and two-phase commit. Important contributions from Florian Pflug, Mark Kirkwood, Merlin Moncure, Greg Stark, Gianni Ciolli, Gabriele Bartolini, Hannu Krosing, Robert Haas, Tatsuo Ishii, Hiroyuki Yamada plus support and feedback from many other community members.
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r--src/backend/access/transam/twophase.c185
1 files changed, 157 insertions, 28 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index db5795324b2..4c3a1b901cb 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.56 2009/11/23 09:58:36 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.57 2009/12/19 01:32:33 sriggs Exp $
*
* NOTES
* Each global transaction is associated with a global transaction
@@ -57,6 +57,7 @@
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/procarray.h"
+#include "storage/sinvaladt.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -144,7 +145,10 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ int ninvalmsgs,
+ SharedInvalidationMessage *invalmsgs,
+ bool initfileinval);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
@@ -736,10 +740,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
* 2. TransactionId[] (subtransactions)
* 3. RelFileNode[] (files to be deleted at commit)
* 4. RelFileNode[] (files to be deleted at abort)
- * 5. TwoPhaseRecordOnDisk
- * 6. ...
- * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
- * 8. CRC32
+ * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
+ * 6. TwoPhaseRecordOnDisk
+ * 7. ...
+ * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ * 9. CRC32
*
* Each segment except the final CRC32 is MAXALIGN'd.
*/
@@ -760,6 +765,8 @@ typedef struct TwoPhaseFileHeader
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
+ int32 ninvalmsgs; /* number of cache invalidation messages */
+ bool initfileinval; /* does relcache init file need invalidation? */
char gid[GIDSIZE]; /* GID for transaction */
} TwoPhaseFileHeader;
@@ -835,6 +842,7 @@ StartPrepare(GlobalTransaction gxact)
TransactionId *children;
RelFileNode *commitrels;
RelFileNode *abortrels;
+ SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
records.head = palloc0(sizeof(XLogRecData));
@@ -859,11 +867,16 @@ StartPrepare(GlobalTransaction gxact)
hdr.nsubxacts = xactGetCommittedChildren(&children);
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
+ hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
+ &hdr.initfileinval);
StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
- /* Add the additional info about subxacts and deletable files */
+ /*
+ * Add the additional info about subxacts, deletable files and
+ * cache invalidation messages.
+ */
if (hdr.nsubxacts > 0)
{
save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
@@ -880,6 +893,12 @@ StartPrepare(GlobalTransaction gxact)
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
pfree(abortrels);
}
+ if (hdr.ninvalmsgs > 0)
+ {
+ save_state_data(invalmsgs,
+ hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
+ pfree(invalmsgs);
+ }
}
/*
@@ -1071,7 +1090,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* contents of the file. Otherwise return NULL.
*/
static char *
-ReadTwoPhaseFile(TransactionId xid)
+ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
{
char path[MAXPGPATH];
char *buf;
@@ -1087,10 +1106,11 @@ ReadTwoPhaseFile(TransactionId xid)
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
{
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
+ if (give_warnings)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not open two-phase state file \"%s\": %m",
+ path)));
return NULL;
}
@@ -1103,10 +1123,11 @@ ReadTwoPhaseFile(TransactionId xid)
if (fstat(fd, &stat))
{
close(fd);
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not stat two-phase state file \"%s\": %m",
- path)));
+ if (give_warnings)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not stat two-phase state file \"%s\": %m",
+ path)));
return NULL;
}
@@ -1134,10 +1155,11 @@ ReadTwoPhaseFile(TransactionId xid)
if (read(fd, buf, stat.st_size) != stat.st_size)
{
close(fd);
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not read two-phase state file \"%s\": %m",
- path)));
+ if (give_warnings)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not read two-phase state file \"%s\": %m",
+ path)));
pfree(buf);
return NULL;
}
@@ -1166,6 +1188,30 @@ ReadTwoPhaseFile(TransactionId xid)
return buf;
}
+/*
+ * Confirms an xid is prepared, during recovery
+ */
+bool
+StandbyTransactionIdIsPrepared(TransactionId xid)
+{
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ bool result;
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid, false);
+ if (buf == NULL)
+ return false;
+
+ /* Check header also */
+ hdr = (TwoPhaseFileHeader *) buf;
+ result = TransactionIdEquals(hdr->xid, xid);
+ pfree(buf);
+
+ return result;
+}
/*
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
@@ -1184,6 +1230,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
RelFileNode *abortrels;
RelFileNode *delrels;
int ndelrels;
+ SharedInvalidationMessage *invalmsgs;
int i;
/*
@@ -1196,7 +1243,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* Read and validate the state file
*/
- buf = ReadTwoPhaseFile(xid);
+ buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
@@ -1215,6 +1262,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ invalmsgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
/* compute latestXid among all children */
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
@@ -1230,7 +1279,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
if (isCommit)
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
- hdr->ncommitrels, commitrels);
+ hdr->ncommitrels, commitrels,
+ hdr->ninvalmsgs, invalmsgs,
+ hdr->initfileinval);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
@@ -1277,6 +1328,18 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
smgrclose(srel);
}
+ /*
+ * Handle cache invalidation messages.
+ *
+ * Relcache init file invalidation requires processing both
+ * before and after we send the SI messages. See AtEOXact_Inval()
+ */
+ if (hdr->initfileinval)
+ RelationCacheInitFileInvalidate(true);
+ SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
+ if (hdr->initfileinval)
+ RelationCacheInitFileInvalidate(false);
+
/* And now do the callbacks */
if (isCommit)
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
@@ -1528,14 +1591,21 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* Our other responsibility is to determine and return the oldest valid XID
* among the prepared xacts (if none, return ShmemVariableCache->nextXid).
* This is needed to synchronize pg_subtrans startup properly.
+ *
+ * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
+ * top-level xids is stored in *xids_p. The number of entries in the array
+ * is returned in *nxids_p.
*/
TransactionId
-PrescanPreparedTransactions(void)
+PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId result = origNextXid;
DIR *cldir;
struct dirent *clde;
+ TransactionId *xids = NULL;
+ int nxids = 0;
+ int allocsize = 0;
cldir = AllocateDir(TWOPHASE_DIR);
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
@@ -1567,7 +1637,7 @@ PrescanPreparedTransactions(void)
*/
/* Read and validate file */
- buf = ReadTwoPhaseFile(xid);
+ buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
{
ereport(WARNING,
@@ -1615,11 +1685,36 @@ PrescanPreparedTransactions(void)
}
}
+
+ if (xids_p)
+ {
+ if (nxids == allocsize)
+ {
+ if (nxids == 0)
+ {
+ allocsize = 10;
+ xids = palloc(allocsize * sizeof(TransactionId));
+ }
+ else
+ {
+ allocsize = allocsize * 2;
+ xids = repalloc(xids, allocsize * sizeof(TransactionId));
+ }
+ }
+ xids[nxids++] = xid;
+ }
+
pfree(buf);
}
}
FreeDir(cldir);
+ if (xids_p)
+ {
+ *xids_p = xids;
+ *nxids_p = nxids;
+ }
+
return result;
}
@@ -1636,6 +1731,7 @@ RecoverPreparedTransactions(void)
char dir[MAXPGPATH];
DIR *cldir;
struct dirent *clde;
+ bool overwriteOK = false;
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
@@ -1666,7 +1762,7 @@ RecoverPreparedTransactions(void)
}
/* Read and validate file */
- buf = ReadTwoPhaseFile(xid);
+ buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
{
ereport(WARNING,
@@ -1687,6 +1783,15 @@ RecoverPreparedTransactions(void)
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+
+ /*
+ * It's possible that SubTransSetParent has been set before, if the
+ * prepared transaction generated xid assignment records. Test
+ * here must match one used in AssignTransactionId().
+ */
+ if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
+ overwriteOK = true;
/*
* Reconstruct subtrans state for the transaction --- needed
@@ -1696,7 +1801,7 @@ RecoverPreparedTransactions(void)
* hierarchy, but there's no need to restore that exactly.
*/
for (i = 0; i < hdr->nsubxacts; i++)
- SubTransSetParent(subxids[i], xid);
+ SubTransSetParent(subxids[i], xid, overwriteOK);
/*
* Recreate its GXACT and dummy PGPROC
@@ -1719,6 +1824,14 @@ RecoverPreparedTransactions(void)
*/
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+ /*
+ * Release locks held by the standby process after we process each
+ * prepared transaction. As a result, we don't need too many
+ * additional locks at any one time.
+ */
+ if (InHotStandby)
+ StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+
pfree(buf);
}
}
@@ -1739,9 +1852,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ int ninvalmsgs,
+ SharedInvalidationMessage *invalmsgs,
+ bool initfileinval)
{
- XLogRecData rdata[3];
+ XLogRecData rdata[4];
int lastrdata = 0;
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
@@ -1754,8 +1870,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Emit the XLOG commit record */
xlrec.xid = xid;
xlrec.crec.xact_time = GetCurrentTimestamp();
+ xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
+ xlrec.crec.nmsgs = 0;
xlrec.crec.nrels = nrels;
xlrec.crec.nsubxacts = nchildren;
+ xlrec.crec.nmsgs = ninvalmsgs;
+
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommitPrepared;
rdata[0].buffer = InvalidBuffer;
@@ -1777,6 +1897,15 @@ RecordTransactionCommitPrepared(TransactionId xid,
rdata[2].buffer = InvalidBuffer;
lastrdata = 2;
}
+ /* dump cache invalidation messages */
+ if (ninvalmsgs > 0)
+ {
+ rdata[lastrdata].next = &(rdata[3]);
+ rdata[3].data = (char *) invalmsgs;
+ rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
+ rdata[3].buffer = InvalidBuffer;
+ lastrdata = 3;
+ }
rdata[lastrdata].next = NULL;
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);