diff options
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r-- | src/backend/access/transam/twophase.c | 185 |
1 files changed, 157 insertions, 28 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index db5795324b2..4c3a1b901cb 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.56 2009/11/23 09:58:36 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.57 2009/12/19 01:32:33 sriggs Exp $ * * NOTES * Each global transaction is associated with a global transaction @@ -57,6 +57,7 @@ #include "pgstat.h" #include "storage/fd.h" #include "storage/procarray.h" +#include "storage/sinvaladt.h" #include "storage/smgr.h" #include "utils/builtins.h" #include "utils/memutils.h" @@ -144,7 +145,10 @@ static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + int ninvalmsgs, + SharedInvalidationMessage *invalmsgs, + bool initfileinval); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, @@ -736,10 +740,11 @@ TwoPhaseGetDummyProc(TransactionId xid) * 2. TransactionId[] (subtransactions) * 3. RelFileNode[] (files to be deleted at commit) * 4. RelFileNode[] (files to be deleted at abort) - * 5. TwoPhaseRecordOnDisk - * 6. ... - * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID) - * 8. CRC32 + * 5. SharedInvalidationMessage[] (inval messages to be sent at commit) + * 6. TwoPhaseRecordOnDisk + * 7. ... + * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID) + * 9. CRC32 * * Each segment except the final CRC32 is MAXALIGN'd. */ @@ -760,6 +765,8 @@ typedef struct TwoPhaseFileHeader int32 nsubxacts; /* number of following subxact XIDs */ int32 ncommitrels; /* number of delete-on-commit rels */ int32 nabortrels; /* number of delete-on-abort rels */ + int32 ninvalmsgs; /* number of cache invalidation messages */ + bool initfileinval; /* does relcache init file need invalidation? */ char gid[GIDSIZE]; /* GID for transaction */ } TwoPhaseFileHeader; @@ -835,6 +842,7 @@ StartPrepare(GlobalTransaction gxact) TransactionId *children; RelFileNode *commitrels; RelFileNode *abortrels; + SharedInvalidationMessage *invalmsgs; /* Initialize linked list */ records.head = palloc0(sizeof(XLogRecData)); @@ -859,11 +867,16 @@ StartPrepare(GlobalTransaction gxact) hdr.nsubxacts = xactGetCommittedChildren(&children); hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels, NULL); hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels, NULL); + hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs, + &hdr.initfileinval); StrNCpy(hdr.gid, gxact->gid, GIDSIZE); save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); - /* Add the additional info about subxacts and deletable files */ + /* + * Add the additional info about subxacts, deletable files and + * cache invalidation messages. + */ if (hdr.nsubxacts > 0) { save_state_data(children, hdr.nsubxacts * sizeof(TransactionId)); @@ -880,6 +893,12 @@ StartPrepare(GlobalTransaction gxact) save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode)); pfree(abortrels); } + if (hdr.ninvalmsgs > 0) + { + save_state_data(invalmsgs, + hdr.ninvalmsgs * sizeof(SharedInvalidationMessage)); + pfree(invalmsgs); + } } /* @@ -1071,7 +1090,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * contents of the file. Otherwise return NULL. */ static char * -ReadTwoPhaseFile(TransactionId xid) +ReadTwoPhaseFile(TransactionId xid, bool give_warnings) { char path[MAXPGPATH]; char *buf; @@ -1087,10 +1106,11 @@ ReadTwoPhaseFile(TransactionId xid) fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (fd < 0) { - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not open two-phase state file \"%s\": %m", - path))); + if (give_warnings) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open two-phase state file \"%s\": %m", + path))); return NULL; } @@ -1103,10 +1123,11 @@ ReadTwoPhaseFile(TransactionId xid) if (fstat(fd, &stat)) { close(fd); - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not stat two-phase state file \"%s\": %m", - path))); + if (give_warnings) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not stat two-phase state file \"%s\": %m", + path))); return NULL; } @@ -1134,10 +1155,11 @@ ReadTwoPhaseFile(TransactionId xid) if (read(fd, buf, stat.st_size) != stat.st_size) { close(fd); - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not read two-phase state file \"%s\": %m", - path))); + if (give_warnings) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read two-phase state file \"%s\": %m", + path))); pfree(buf); return NULL; } @@ -1166,6 +1188,30 @@ ReadTwoPhaseFile(TransactionId xid) return buf; } +/* + * Confirms an xid is prepared, during recovery + */ +bool +StandbyTransactionIdIsPrepared(TransactionId xid) +{ + char *buf; + TwoPhaseFileHeader *hdr; + bool result; + + Assert(TransactionIdIsValid(xid)); + + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid, false); + if (buf == NULL) + return false; + + /* Check header also */ + hdr = (TwoPhaseFileHeader *) buf; + result = TransactionIdEquals(hdr->xid, xid); + pfree(buf); + + return result; +} /* * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED @@ -1184,6 +1230,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RelFileNode *abortrels; RelFileNode *delrels; int ndelrels; + SharedInvalidationMessage *invalmsgs; int i; /* @@ -1196,7 +1243,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* * Read and validate the state file */ - buf = ReadTwoPhaseFile(xid); + buf = ReadTwoPhaseFile(xid, true); if (buf == NULL) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), @@ -1215,6 +1262,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); abortrels = (RelFileNode *) bufptr; bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + invalmsgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); /* compute latestXid among all children */ latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children); @@ -1230,7 +1279,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) if (isCommit) RecordTransactionCommitPrepared(xid, hdr->nsubxacts, children, - hdr->ncommitrels, commitrels); + hdr->ncommitrels, commitrels, + hdr->ninvalmsgs, invalmsgs, + hdr->initfileinval); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, @@ -1277,6 +1328,18 @@ FinishPreparedTransaction(const char *gid, bool isCommit) smgrclose(srel); } + /* + * Handle cache invalidation messages. + * + * Relcache init file invalidation requires processing both + * before and after we send the SI messages. See AtEOXact_Inval() + */ + if (hdr->initfileinval) + RelationCacheInitFileInvalidate(true); + SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs); + if (hdr->initfileinval) + RelationCacheInitFileInvalidate(false); + /* And now do the callbacks */ if (isCommit) ProcessRecords(bufptr, xid, twophase_postcommit_callbacks); @@ -1528,14 +1591,21 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * Our other responsibility is to determine and return the oldest valid XID * among the prepared xacts (if none, return ShmemVariableCache->nextXid). * This is needed to synchronize pg_subtrans startup properly. + * + * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all + * top-level xids is stored in *xids_p. The number of entries in the array + * is returned in *nxids_p. */ TransactionId -PrescanPreparedTransactions(void) +PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) { TransactionId origNextXid = ShmemVariableCache->nextXid; TransactionId result = origNextXid; DIR *cldir; struct dirent *clde; + TransactionId *xids = NULL; + int nxids = 0; + int allocsize = 0; cldir = AllocateDir(TWOPHASE_DIR); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) @@ -1567,7 +1637,7 @@ PrescanPreparedTransactions(void) */ /* Read and validate file */ - buf = ReadTwoPhaseFile(xid); + buf = ReadTwoPhaseFile(xid, true); if (buf == NULL) { ereport(WARNING, @@ -1615,11 +1685,36 @@ PrescanPreparedTransactions(void) } } + + if (xids_p) + { + if (nxids == allocsize) + { + if (nxids == 0) + { + allocsize = 10; + xids = palloc(allocsize * sizeof(TransactionId)); + } + else + { + allocsize = allocsize * 2; + xids = repalloc(xids, allocsize * sizeof(TransactionId)); + } + } + xids[nxids++] = xid; + } + pfree(buf); } } FreeDir(cldir); + if (xids_p) + { + *xids_p = xids; + *nxids_p = nxids; + } + return result; } @@ -1636,6 +1731,7 @@ RecoverPreparedTransactions(void) char dir[MAXPGPATH]; DIR *cldir; struct dirent *clde; + bool overwriteOK = false; snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR); @@ -1666,7 +1762,7 @@ RecoverPreparedTransactions(void) } /* Read and validate file */ - buf = ReadTwoPhaseFile(xid); + buf = ReadTwoPhaseFile(xid, true); if (buf == NULL) { ereport(WARNING, @@ -1687,6 +1783,15 @@ RecoverPreparedTransactions(void) bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + + /* + * It's possible that SubTransSetParent has been set before, if the + * prepared transaction generated xid assignment records. Test + * here must match one used in AssignTransactionId(). + */ + if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS) + overwriteOK = true; /* * Reconstruct subtrans state for the transaction --- needed @@ -1696,7 +1801,7 @@ RecoverPreparedTransactions(void) * hierarchy, but there's no need to restore that exactly. */ for (i = 0; i < hdr->nsubxacts; i++) - SubTransSetParent(subxids[i], xid); + SubTransSetParent(subxids[i], xid, overwriteOK); /* * Recreate its GXACT and dummy PGPROC @@ -1719,6 +1824,14 @@ RecoverPreparedTransactions(void) */ ProcessRecords(bufptr, xid, twophase_recover_callbacks); + /* + * Release locks held by the standby process after we process each + * prepared transaction. As a result, we don't need too many + * additional locks at any one time. + */ + if (InHotStandby) + StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids); + pfree(buf); } } @@ -1739,9 +1852,12 @@ RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + int ninvalmsgs, + SharedInvalidationMessage *invalmsgs, + bool initfileinval) { - XLogRecData rdata[3]; + XLogRecData rdata[4]; int lastrdata = 0; xl_xact_commit_prepared xlrec; XLogRecPtr recptr; @@ -1754,8 +1870,12 @@ RecordTransactionCommitPrepared(TransactionId xid, /* Emit the XLOG commit record */ xlrec.xid = xid; xlrec.crec.xact_time = GetCurrentTimestamp(); + xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0; + xlrec.crec.nmsgs = 0; xlrec.crec.nrels = nrels; xlrec.crec.nsubxacts = nchildren; + xlrec.crec.nmsgs = ninvalmsgs; + rdata[0].data = (char *) (&xlrec); rdata[0].len = MinSizeOfXactCommitPrepared; rdata[0].buffer = InvalidBuffer; @@ -1777,6 +1897,15 @@ RecordTransactionCommitPrepared(TransactionId xid, rdata[2].buffer = InvalidBuffer; lastrdata = 2; } + /* dump cache invalidation messages */ + if (ninvalmsgs > 0) + { + rdata[lastrdata].next = &(rdata[3]); + rdata[3].data = (char *) invalmsgs; + rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage); + rdata[3].buffer = InvalidBuffer; + lastrdata = 3; + } rdata[lastrdata].next = NULL; recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata); |