summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/twophase.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r--src/backend/access/transam/twophase.c185
1 files changed, 157 insertions, 28 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index db5795324b2..4c3a1b901cb 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.56 2009/11/23 09:58:36 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.57 2009/12/19 01:32:33 sriggs Exp $
*
* NOTES
* Each global transaction is associated with a global transaction
@@ -57,6 +57,7 @@
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/procarray.h"
+#include "storage/sinvaladt.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@@ -144,7 +145,10 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ int ninvalmsgs,
+ SharedInvalidationMessage *invalmsgs,
+ bool initfileinval);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
@@ -736,10 +740,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
* 2. TransactionId[] (subtransactions)
* 3. RelFileNode[] (files to be deleted at commit)
* 4. RelFileNode[] (files to be deleted at abort)
- * 5. TwoPhaseRecordOnDisk
- * 6. ...
- * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
- * 8. CRC32
+ * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
+ * 6. TwoPhaseRecordOnDisk
+ * 7. ...
+ * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
+ * 9. CRC32
*
* Each segment except the final CRC32 is MAXALIGN'd.
*/
@@ -760,6 +765,8 @@ typedef struct TwoPhaseFileHeader
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
+ int32 ninvalmsgs; /* number of cache invalidation messages */
+ bool initfileinval; /* does relcache init file need invalidation? */
char gid[GIDSIZE]; /* GID for transaction */
} TwoPhaseFileHeader;
@@ -835,6 +842,7 @@ StartPrepare(GlobalTransaction gxact)
TransactionId *children;
RelFileNode *commitrels;
RelFileNode *abortrels;
+ SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
records.head = palloc0(sizeof(XLogRecData));
@@ -859,11 +867,16 @@ StartPrepare(GlobalTransaction gxact)
hdr.nsubxacts = xactGetCommittedChildren(&children);
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL);
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL);
+ hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
+ &hdr.initfileinval);
StrNCpy(hdr.gid, gxact->gid, GIDSIZE);
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
- /* Add the additional info about subxacts and deletable files */
+ /*
+ * Add the additional info about subxacts, deletable files and
+ * cache invalidation messages.
+ */
if (hdr.nsubxacts > 0)
{
save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
@@ -880,6 +893,12 @@ StartPrepare(GlobalTransaction gxact)
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
pfree(abortrels);
}
+ if (hdr.ninvalmsgs > 0)
+ {
+ save_state_data(invalmsgs,
+ hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
+ pfree(invalmsgs);
+ }
}
/*
@@ -1071,7 +1090,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* contents of the file. Otherwise return NULL.
*/
static char *
-ReadTwoPhaseFile(TransactionId xid)
+ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
{
char path[MAXPGPATH];
char *buf;
@@ -1087,10 +1106,11 @@ ReadTwoPhaseFile(TransactionId xid)
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
{
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not open two-phase state file \"%s\": %m",
- path)));
+ if (give_warnings)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not open two-phase state file \"%s\": %m",
+ path)));
return NULL;
}
@@ -1103,10 +1123,11 @@ ReadTwoPhaseFile(TransactionId xid)
if (fstat(fd, &stat))
{
close(fd);
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not stat two-phase state file \"%s\": %m",
- path)));
+ if (give_warnings)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not stat two-phase state file \"%s\": %m",
+ path)));
return NULL;
}
@@ -1134,10 +1155,11 @@ ReadTwoPhaseFile(TransactionId xid)
if (read(fd, buf, stat.st_size) != stat.st_size)
{
close(fd);
- ereport(WARNING,
- (errcode_for_file_access(),
- errmsg("could not read two-phase state file \"%s\": %m",
- path)));
+ if (give_warnings)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not read two-phase state file \"%s\": %m",
+ path)));
pfree(buf);
return NULL;
}
@@ -1166,6 +1188,30 @@ ReadTwoPhaseFile(TransactionId xid)
return buf;
}
+/*
+ * Confirms an xid is prepared, during recovery
+ */
+bool
+StandbyTransactionIdIsPrepared(TransactionId xid)
+{
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+ bool result;
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* Read and validate file */
+ buf = ReadTwoPhaseFile(xid, false);
+ if (buf == NULL)
+ return false;
+
+ /* Check header also */
+ hdr = (TwoPhaseFileHeader *) buf;
+ result = TransactionIdEquals(hdr->xid, xid);
+ pfree(buf);
+
+ return result;
+}
/*
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
@@ -1184,6 +1230,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
RelFileNode *abortrels;
RelFileNode *delrels;
int ndelrels;
+ SharedInvalidationMessage *invalmsgs;
int i;
/*
@@ -1196,7 +1243,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* Read and validate the state file
*/
- buf = ReadTwoPhaseFile(xid);
+ buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
@@ -1215,6 +1262,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ invalmsgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
/* compute latestXid among all children */
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
@@ -1230,7 +1279,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
if (isCommit)
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
- hdr->ncommitrels, commitrels);
+ hdr->ncommitrels, commitrels,
+ hdr->ninvalmsgs, invalmsgs,
+ hdr->initfileinval);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
@@ -1277,6 +1328,18 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
smgrclose(srel);
}
+ /*
+ * Handle cache invalidation messages.
+ *
+ * Relcache init file invalidation requires processing both
+ * before and after we send the SI messages. See AtEOXact_Inval()
+ */
+ if (hdr->initfileinval)
+ RelationCacheInitFileInvalidate(true);
+ SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
+ if (hdr->initfileinval)
+ RelationCacheInitFileInvalidate(false);
+
/* And now do the callbacks */
if (isCommit)
ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
@@ -1528,14 +1591,21 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* Our other responsibility is to determine and return the oldest valid XID
* among the prepared xacts (if none, return ShmemVariableCache->nextXid).
* This is needed to synchronize pg_subtrans startup properly.
+ *
+ * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
+ * top-level xids is stored in *xids_p. The number of entries in the array
+ * is returned in *nxids_p.
*/
TransactionId
-PrescanPreparedTransactions(void)
+PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
{
TransactionId origNextXid = ShmemVariableCache->nextXid;
TransactionId result = origNextXid;
DIR *cldir;
struct dirent *clde;
+ TransactionId *xids = NULL;
+ int nxids = 0;
+ int allocsize = 0;
cldir = AllocateDir(TWOPHASE_DIR);
while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
@@ -1567,7 +1637,7 @@ PrescanPreparedTransactions(void)
*/
/* Read and validate file */
- buf = ReadTwoPhaseFile(xid);
+ buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
{
ereport(WARNING,
@@ -1615,11 +1685,36 @@ PrescanPreparedTransactions(void)
}
}
+
+ if (xids_p)
+ {
+ if (nxids == allocsize)
+ {
+ if (nxids == 0)
+ {
+ allocsize = 10;
+ xids = palloc(allocsize * sizeof(TransactionId));
+ }
+ else
+ {
+ allocsize = allocsize * 2;
+ xids = repalloc(xids, allocsize * sizeof(TransactionId));
+ }
+ }
+ xids[nxids++] = xid;
+ }
+
pfree(buf);
}
}
FreeDir(cldir);
+ if (xids_p)
+ {
+ *xids_p = xids;
+ *nxids_p = nxids;
+ }
+
return result;
}
@@ -1636,6 +1731,7 @@ RecoverPreparedTransactions(void)
char dir[MAXPGPATH];
DIR *cldir;
struct dirent *clde;
+ bool overwriteOK = false;
snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
@@ -1666,7 +1762,7 @@ RecoverPreparedTransactions(void)
}
/* Read and validate file */
- buf = ReadTwoPhaseFile(xid);
+ buf = ReadTwoPhaseFile(xid, true);
if (buf == NULL)
{
ereport(WARNING,
@@ -1687,6 +1783,15 @@ RecoverPreparedTransactions(void)
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+
+ /*
+ * It's possible that SubTransSetParent has been set before, if the
+ * prepared transaction generated xid assignment records. Test
+ * here must match one used in AssignTransactionId().
+ */
+ if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
+ overwriteOK = true;
/*
* Reconstruct subtrans state for the transaction --- needed
@@ -1696,7 +1801,7 @@ RecoverPreparedTransactions(void)
* hierarchy, but there's no need to restore that exactly.
*/
for (i = 0; i < hdr->nsubxacts; i++)
- SubTransSetParent(subxids[i], xid);
+ SubTransSetParent(subxids[i], xid, overwriteOK);
/*
* Recreate its GXACT and dummy PGPROC
@@ -1719,6 +1824,14 @@ RecoverPreparedTransactions(void)
*/
ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+ /*
+ * Release locks held by the standby process after we process each
+ * prepared transaction. As a result, we don't need too many
+ * additional locks at any one time.
+ */
+ if (InHotStandby)
+ StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+
pfree(buf);
}
}
@@ -1739,9 +1852,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ int ninvalmsgs,
+ SharedInvalidationMessage *invalmsgs,
+ bool initfileinval)
{
- XLogRecData rdata[3];
+ XLogRecData rdata[4];
int lastrdata = 0;
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
@@ -1754,8 +1870,12 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Emit the XLOG commit record */
xlrec.xid = xid;
xlrec.crec.xact_time = GetCurrentTimestamp();
+ xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
+ xlrec.crec.nmsgs = 0;
xlrec.crec.nrels = nrels;
xlrec.crec.nsubxacts = nchildren;
+ xlrec.crec.nmsgs = ninvalmsgs;
+
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommitPrepared;
rdata[0].buffer = InvalidBuffer;
@@ -1777,6 +1897,15 @@ RecordTransactionCommitPrepared(TransactionId xid,
rdata[2].buffer = InvalidBuffer;
lastrdata = 2;
}
+ /* dump cache invalidation messages */
+ if (ninvalmsgs > 0)
+ {
+ rdata[lastrdata].next = &(rdata[3]);
+ rdata[3].data = (char *) invalmsgs;
+ rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
+ rdata[3].buffer = InvalidBuffer;
+ lastrdata = 3;
+ }
rdata[lastrdata].next = NULL;
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);