diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 332 |
1 files changed, 223 insertions, 109 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 2352313b051..27f6354987d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.194 2005/05/31 19:10:28 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.195 2005/06/02 05:55:28 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -434,6 +434,7 @@ static void exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg); static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); +static void SetBkpBlock(BkpBlock *bkpb, Buffer buffer); static bool AdvanceXLInsertBuffer(void); static void XLogWrite(XLogwrtRqst WriteRqst); static int XLogFileInit(uint32 log, uint32 seg, @@ -499,8 +500,10 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS]; - XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS]; - crc64 rdata_crc; + XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; + XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; + XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; + pg_crc32 rdata_crc; uint32 len, write_len; unsigned i; @@ -531,8 +534,10 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) /* * Here we scan the rdata list, determine which buffers must be backed * up, and compute the CRC values for the data. Note that the record - * header isn't added into the CRC yet since we don't know the final - * length or info bits quite yet. + * header isn't added into the CRC initially since we don't know the + * final length or info bits quite yet. Thus, the CRC will represent + * the CRC of the whole record in the order "rdata, then backup blocks, + * then record header". * * We may have to loop back to here if a race condition is detected * below. We could prevent the race by doing all this work while @@ -553,7 +558,7 @@ begin:; dtbuf_bkp[i] = false; } - INIT_CRC64(rdata_crc); + INIT_CRC32(rdata_crc); len = 0; for (rdt = rdata;;) { @@ -561,7 +566,7 @@ begin:; { /* Simple data, just include it */ len += rdt->len; - COMP_CRC64(rdata_crc, rdt->data, rdt->len); + COMP_CRC32(rdata_crc, rdt->data, rdt->len); } else { @@ -576,7 +581,7 @@ begin:; else if (rdt->data) { len += rdt->len; - COMP_CRC64(rdata_crc, rdt->data, rdt->len); + COMP_CRC32(rdata_crc, rdt->data, rdt->len); } break; } @@ -591,26 +596,14 @@ begin:; dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer)); if (XLByteLE(dtbuf_lsn[i], RedoRecPtr)) { - crc64 dtcrc; - dtbuf_bkp[i] = true; + SetBkpBlock(&(dtbuf_xlg[i]), rdt->buffer); rdt->data = NULL; - INIT_CRC64(dtcrc); - COMP_CRC64(dtcrc, - BufferGetBlock(dtbuf[i]), - BLCKSZ); - dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]); - dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]); - COMP_CRC64(dtcrc, - (char *) &(dtbuf_xlg[i]) + sizeof(crc64), - sizeof(BkpBlock) - sizeof(crc64)); - FIN_CRC64(dtcrc); - dtbuf_xlg[i].crc = dtcrc; } else if (rdt->data) { len += rdt->len; - COMP_CRC64(rdata_crc, rdt->data, rdt->len); + COMP_CRC32(rdata_crc, rdt->data, rdt->len); } break; } @@ -626,6 +619,39 @@ begin:; } /* + * Now add the backup block headers and data into the CRC + */ + for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + { + if (dtbuf_bkp[i]) + { + BkpBlock *bkpb = &(dtbuf_xlg[i]); + char *page; + + COMP_CRC32(rdata_crc, + (char *) bkpb, + sizeof(BkpBlock)); + page = (char *) BufferGetBlock(dtbuf[i]); + if (bkpb->hole_length == 0) + { + COMP_CRC32(rdata_crc, + page, + BLCKSZ); + } + else + { + /* must skip the hole */ + COMP_CRC32(rdata_crc, + page, + bkpb->hole_offset); + COMP_CRC32(rdata_crc, + page + (bkpb->hole_offset + bkpb->hole_length), + BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); + } + } + } + + /* * NOTE: the test for len == 0 here is somewhat fishy, since in theory * all of the rmgr data might have been suppressed in favor of backup * blocks. Currently, all callers of XLogInsert provide at least some @@ -713,23 +739,49 @@ begin:; write_len = len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { + BkpBlock *bkpb; + char *page; + if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i])) continue; info |= XLR_SET_BKP_BLOCK(i); - rdt->next = &(dtbuf_rdt[2 * i]); + bkpb = &(dtbuf_xlg[i]); + page = (char *) BufferGetBlock(dtbuf[i]); + + rdt->next = &(dtbuf_rdt1[i]); + rdt = rdt->next; - dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]); - dtbuf_rdt[2 * i].len = sizeof(BkpBlock); + rdt->data = (char *) bkpb; + rdt->len = sizeof(BkpBlock); write_len += sizeof(BkpBlock); - rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]); + rdt->next = &(dtbuf_rdt2[i]); + rdt = rdt->next; - dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]); - dtbuf_rdt[2 * i + 1].len = BLCKSZ; - write_len += BLCKSZ; - dtbuf_rdt[2 * i + 1].next = NULL; + if (bkpb->hole_length == 0) + { + rdt->data = page; + rdt->len = BLCKSZ; + write_len += BLCKSZ; + rdt->next = NULL; + } + else + { + /* must skip the hole */ + rdt->data = page; + rdt->len = bkpb->hole_offset; + write_len += bkpb->hole_offset; + + rdt->next = &(dtbuf_rdt3[i]); + rdt = rdt->next; + + rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); + rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); + write_len += rdt->len; + rdt->next = NULL; + } } /* @@ -752,14 +804,15 @@ begin:; record->xl_prev = Insert->PrevRecord; record->xl_xid = GetCurrentTransactionIdIfAny(); + record->xl_tot_len = SizeOfXLogRecord + write_len; record->xl_len = len; /* doesn't include backup blocks */ record->xl_info = info; record->xl_rmid = rmid; - /* Now we can finish computing the main CRC */ - COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64), - SizeOfXLogRecord - sizeof(crc64)); - FIN_CRC64(rdata_crc); + /* Now we can finish computing the record's CRC */ + COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32), + SizeOfXLogRecord - sizeof(pg_crc32)); + FIN_CRC32(rdata_crc); record->xl_crc = rdata_crc; /* Compute record's XLOG location */ @@ -885,6 +938,46 @@ begin:; } /* + * Fill a BkpBlock struct given a buffer containing the page to be saved + * + * This is nontrivial only because it has to decide whether to apply "hole + * compression". + */ +static void +SetBkpBlock(BkpBlock *bkpb, Buffer buffer) +{ + PageHeader page; + uint16 offset; + uint16 length; + + /* Save page identity info */ + bkpb->node = BufferGetFileNode(buffer); + bkpb->block = BufferGetBlockNumber(buffer); + + /* Test whether there is a "hole" containing zeroes in the page */ + page = (PageHeader) BufferGetBlock(buffer); + offset = page->pd_lower; + /* Check if pd_lower appears sane at all */ + if (offset >= SizeOfPageHeaderData && offset < BLCKSZ) + { + char *spd = (char *) page + offset; + char *epd = (char *) page + BLCKSZ; + char *pd = spd; + + while (pd < epd && *pd == '\0') + pd++; + + length = pd - spd; + if (length == 0) + offset = 0; + } + else + offset = length = 0; + bkpb->hole_offset = offset; + bkpb->hole_length = length; +} + +/* * XLogArchiveNotify * * Create an archive notification file @@ -2276,7 +2369,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) continue; - memcpy((char *) &bkpb, blk, sizeof(BkpBlock)); + memcpy(&bkpb, blk, sizeof(BkpBlock)); blk += sizeof(BkpBlock); reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node); @@ -2287,7 +2380,21 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) if (BufferIsValid(buffer)) { page = (Page) BufferGetPage(buffer); - memcpy((char *) page, blk, BLCKSZ); + + if (bkpb.hole_length == 0) + { + memcpy((char *) page, blk, BLCKSZ); + } + else + { + /* must zero-fill the hole */ + MemSet((char *) page, 0, BLCKSZ); + memcpy((char *) page, blk, bkpb.hole_offset); + memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), + blk + bkpb.hole_offset, + BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -2295,7 +2402,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) } } - blk += BLCKSZ; + blk += BLCKSZ - bkpb.hole_length; } } @@ -2309,53 +2416,61 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn) static bool RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) { - crc64 crc; - crc64 cbuf; + pg_crc32 crc; int i; uint32 len = record->xl_len; + BkpBlock bkpb; char *blk; - /* Check CRC of rmgr data and record header */ - INIT_CRC64(crc); - COMP_CRC64(crc, XLogRecGetData(record), len); - COMP_CRC64(crc, (char *) record + sizeof(crc64), - SizeOfXLogRecord - sizeof(crc64)); - FIN_CRC64(crc); + /* First the rmgr data */ + INIT_CRC32(crc); + COMP_CRC32(crc, XLogRecGetData(record), len); - if (!EQ_CRC64(record->xl_crc, crc)) - { - ereport(emode, - (errmsg("incorrect resource manager data checksum in record at %X/%X", - recptr.xlogid, recptr.xrecoff))); - return (false); - } - - /* Check CRCs of backup blocks, if any */ + /* Add in the backup blocks, if any */ blk = (char *) XLogRecGetData(record) + len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { + uint32 blen; + if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) continue; - INIT_CRC64(crc); - COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ); - COMP_CRC64(crc, blk + sizeof(crc64), - sizeof(BkpBlock) - sizeof(crc64)); - FIN_CRC64(crc); - memcpy((char *) &cbuf, blk, sizeof(crc64)); /* don't assume - * alignment */ - - if (!EQ_CRC64(cbuf, crc)) + memcpy(&bkpb, blk, sizeof(BkpBlock)); + if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ) { ereport(emode, - (errmsg("incorrect checksum of backup block %d in record at %X/%X", - i + 1, recptr.xlogid, recptr.xrecoff))); - return (false); + (errmsg("incorrect hole size in record at %X/%X", + recptr.xlogid, recptr.xrecoff))); + return false; } - blk += sizeof(BkpBlock) + BLCKSZ; + blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length; + COMP_CRC32(crc, blk, blen); + blk += blen; + } + + /* Check that xl_tot_len agrees with our calculation */ + if (blk != (char *) record + record->xl_tot_len) + { + ereport(emode, + (errmsg("incorrect total length in record at %X/%X", + recptr.xlogid, recptr.xrecoff))); + return false; } - return (true); + /* Finally include the record header */ + COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), + SizeOfXLogRecord - sizeof(pg_crc32)); + FIN_CRC32(crc); + + if (!EQ_CRC32(record->xl_crc, crc)) + { + ereport(emode, + (errmsg("incorrect resource manager data checksum in record at %X/%X", + recptr.xlogid, recptr.xrecoff))); + return false; + } + + return true; } /* @@ -2382,7 +2497,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) uint32 targetPageOff; uint32 targetRecOff; uint32 pageHeaderSize; - unsigned i; if (readBuf == NULL) { @@ -2518,6 +2632,15 @@ got_record:; RecPtr->xlogid, RecPtr->xrecoff))); goto next_record_is_invalid; } + if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || + record->xl_tot_len > SizeOfXLogRecord + record->xl_len + + XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) + { + ereport(emode, + (errmsg("invalid record length at %X/%X", + RecPtr->xlogid, RecPtr->xrecoff))); + goto next_record_is_invalid; + } if (record->xl_rmid > RM_MAX_ID) { ereport(emode, @@ -2558,24 +2681,13 @@ got_record:; } /* - * Compute total length of record including any appended backup - * blocks. - */ - total_len = SizeOfXLogRecord + record->xl_len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) - continue; - total_len += sizeof(BkpBlock) + BLCKSZ; - } - - /* * Allocate or enlarge readRecordBuf as needed. To avoid useless * small increases, round its size to a multiple of BLCKSZ, and make * sure it's at least 4*BLCKSZ to start with. (That is enough for all * "normal" records, but very large commit or abort records might need * more space.) */ + total_len = record->xl_tot_len; if (total_len > readRecordBufSize) { uint32 newSize = total_len; @@ -2666,15 +2778,15 @@ got_record:; goto next_record_is_invalid; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); if (BLCKSZ - SizeOfXLogRecord >= pageHeaderSize + - SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len)) + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len)) { nextRecord = (XLogRecord *) ((char *) contrecord + - SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len)); + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len)); } EndRecPtr.xlogid = readId; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + - pageHeaderSize + SizeOfXLogContRecord + - MAXALIGN(contrecord->xl_rem_len); + pageHeaderSize + + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len); ReadRecPtr = *RecPtr; return record; } @@ -3194,11 +3306,11 @@ WriteControlFile(void) StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN); /* Contents are protected with a CRC */ - INIT_CRC64(ControlFile->crc); - COMP_CRC64(ControlFile->crc, - (char *) ControlFile + sizeof(crc64), - sizeof(ControlFileData) - sizeof(crc64)); - FIN_CRC64(ControlFile->crc); + INIT_CRC32(ControlFile->crc); + COMP_CRC32(ControlFile->crc, + (char *) ControlFile, + offsetof(ControlFileData, crc)); + FIN_CRC32(ControlFile->crc); /* * We write out BLCKSZ bytes into pg_control, zero-padding the excess @@ -3247,7 +3359,7 @@ WriteControlFile(void) static void ReadControlFile(void) { - crc64 crc; + pg_crc32 crc; int fd; /* @@ -3281,13 +3393,13 @@ ReadControlFile(void) ControlFile->pg_control_version, PG_CONTROL_VERSION), errhint("It looks like you need to initdb."))); /* Now check the CRC. */ - INIT_CRC64(crc); - COMP_CRC64(crc, - (char *) ControlFile + sizeof(crc64), - sizeof(ControlFileData) - sizeof(crc64)); - FIN_CRC64(crc); + INIT_CRC32(crc); + COMP_CRC32(crc, + (char *) ControlFile, + offsetof(ControlFileData, crc)); + FIN_CRC32(crc); - if (!EQ_CRC64(crc, ControlFile->crc)) + if (!EQ_CRC32(crc, ControlFile->crc)) ereport(FATAL, (errmsg("incorrect checksum in control file"))); @@ -3396,11 +3508,11 @@ UpdateControlFile(void) { int fd; - INIT_CRC64(ControlFile->crc); - COMP_CRC64(ControlFile->crc, - (char *) ControlFile + sizeof(crc64), - sizeof(ControlFileData) - sizeof(crc64)); - FIN_CRC64(ControlFile->crc); + INIT_CRC32(ControlFile->crc); + COMP_CRC32(ControlFile->crc, + (char *) ControlFile, + offsetof(ControlFileData, crc)); + FIN_CRC32(ControlFile->crc); fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) @@ -3525,7 +3637,7 @@ BootStrapXLOG(void) bool use_existent; uint64 sysidentifier; struct timeval tv; - crc64 crc; + pg_crc32 crc; /* * Select a hopefully-unique system identifier code for this @@ -3582,16 +3694,17 @@ BootStrapXLOG(void) record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0; record->xl_xid = InvalidTransactionId; + record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint); record->xl_len = sizeof(checkPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_rmid = RM_XLOG_ID; memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint)); - INIT_CRC64(crc); - COMP_CRC64(crc, &checkPoint, sizeof(checkPoint)); - COMP_CRC64(crc, (char *) record + sizeof(crc64), - SizeOfXLogRecord - sizeof(crc64)); - FIN_CRC64(crc); + INIT_CRC32(crc); + COMP_CRC32(crc, &checkPoint, sizeof(checkPoint)); + COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), + SizeOfXLogRecord - sizeof(pg_crc32)); + FIN_CRC32(crc); record->xl_crc = crc; /* Create first XLOG segment file */ @@ -4694,7 +4807,8 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) } return NULL; } - if (record->xl_len != sizeof(CheckPoint)) + if (record->xl_len != sizeof(CheckPoint) || + record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint)) { switch (whichChkpt) { |