summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c332
1 files changed, 223 insertions, 109 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2352313b051..27f6354987d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.194 2005/05/31 19:10:28 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.195 2005/06/02 05:55:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -434,6 +434,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
uint32 endLogId, uint32 endLogSeg);
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
+static void SetBkpBlock(BkpBlock *bkpb, Buffer buffer);
static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
static int XLogFileInit(uint32 log, uint32 seg,
@@ -499,8 +500,10 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
- XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
- crc64 rdata_crc;
+ XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
+ XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
+ XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
+ pg_crc32 rdata_crc;
uint32 len,
write_len;
unsigned i;
@@ -531,8 +534,10 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
/*
* Here we scan the rdata list, determine which buffers must be backed
* up, and compute the CRC values for the data. Note that the record
- * header isn't added into the CRC yet since we don't know the final
- * length or info bits quite yet.
+ * header isn't added into the CRC initially since we don't know the
+ * final length or info bits quite yet. Thus, the CRC will represent
+ * the CRC of the whole record in the order "rdata, then backup blocks,
+ * then record header".
*
* We may have to loop back to here if a race condition is detected
* below. We could prevent the race by doing all this work while
@@ -553,7 +558,7 @@ begin:;
dtbuf_bkp[i] = false;
}
- INIT_CRC64(rdata_crc);
+ INIT_CRC32(rdata_crc);
len = 0;
for (rdt = rdata;;)
{
@@ -561,7 +566,7 @@ begin:;
{
/* Simple data, just include it */
len += rdt->len;
- COMP_CRC64(rdata_crc, rdt->data, rdt->len);
+ COMP_CRC32(rdata_crc, rdt->data, rdt->len);
}
else
{
@@ -576,7 +581,7 @@ begin:;
else if (rdt->data)
{
len += rdt->len;
- COMP_CRC64(rdata_crc, rdt->data, rdt->len);
+ COMP_CRC32(rdata_crc, rdt->data, rdt->len);
}
break;
}
@@ -591,26 +596,14 @@ begin:;
dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer));
if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
{
- crc64 dtcrc;
-
dtbuf_bkp[i] = true;
+ SetBkpBlock(&(dtbuf_xlg[i]), rdt->buffer);
rdt->data = NULL;
- INIT_CRC64(dtcrc);
- COMP_CRC64(dtcrc,
- BufferGetBlock(dtbuf[i]),
- BLCKSZ);
- dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
- dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
- COMP_CRC64(dtcrc,
- (char *) &(dtbuf_xlg[i]) + sizeof(crc64),
- sizeof(BkpBlock) - sizeof(crc64));
- FIN_CRC64(dtcrc);
- dtbuf_xlg[i].crc = dtcrc;
}
else if (rdt->data)
{
len += rdt->len;
- COMP_CRC64(rdata_crc, rdt->data, rdt->len);
+ COMP_CRC32(rdata_crc, rdt->data, rdt->len);
}
break;
}
@@ -626,6 +619,39 @@ begin:;
}
/*
+ * Now add the backup block headers and data into the CRC
+ */
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ {
+ if (dtbuf_bkp[i])
+ {
+ BkpBlock *bkpb = &(dtbuf_xlg[i]);
+ char *page;
+
+ COMP_CRC32(rdata_crc,
+ (char *) bkpb,
+ sizeof(BkpBlock));
+ page = (char *) BufferGetBlock(dtbuf[i]);
+ if (bkpb->hole_length == 0)
+ {
+ COMP_CRC32(rdata_crc,
+ page,
+ BLCKSZ);
+ }
+ else
+ {
+ /* must skip the hole */
+ COMP_CRC32(rdata_crc,
+ page,
+ bkpb->hole_offset);
+ COMP_CRC32(rdata_crc,
+ page + (bkpb->hole_offset + bkpb->hole_length),
+ BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+ }
+ }
+ }
+
+ /*
* NOTE: the test for len == 0 here is somewhat fishy, since in theory
* all of the rmgr data might have been suppressed in favor of backup
* blocks. Currently, all callers of XLogInsert provide at least some
@@ -713,23 +739,49 @@ begin:;
write_len = len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
+ BkpBlock *bkpb;
+ char *page;
+
if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
continue;
info |= XLR_SET_BKP_BLOCK(i);
- rdt->next = &(dtbuf_rdt[2 * i]);
+ bkpb = &(dtbuf_xlg[i]);
+ page = (char *) BufferGetBlock(dtbuf[i]);
+
+ rdt->next = &(dtbuf_rdt1[i]);
+ rdt = rdt->next;
- dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]);
- dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
+ rdt->data = (char *) bkpb;
+ rdt->len = sizeof(BkpBlock);
write_len += sizeof(BkpBlock);
- rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);
+ rdt->next = &(dtbuf_rdt2[i]);
+ rdt = rdt->next;
- dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]);
- dtbuf_rdt[2 * i + 1].len = BLCKSZ;
- write_len += BLCKSZ;
- dtbuf_rdt[2 * i + 1].next = NULL;
+ if (bkpb->hole_length == 0)
+ {
+ rdt->data = page;
+ rdt->len = BLCKSZ;
+ write_len += BLCKSZ;
+ rdt->next = NULL;
+ }
+ else
+ {
+ /* must skip the hole */
+ rdt->data = page;
+ rdt->len = bkpb->hole_offset;
+ write_len += bkpb->hole_offset;
+
+ rdt->next = &(dtbuf_rdt3[i]);
+ rdt = rdt->next;
+
+ rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
+ rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
+ write_len += rdt->len;
+ rdt->next = NULL;
+ }
}
/*
@@ -752,14 +804,15 @@ begin:;
record->xl_prev = Insert->PrevRecord;
record->xl_xid = GetCurrentTransactionIdIfAny();
+ record->xl_tot_len = SizeOfXLogRecord + write_len;
record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
- /* Now we can finish computing the main CRC */
- COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
- SizeOfXLogRecord - sizeof(crc64));
- FIN_CRC64(rdata_crc);
+ /* Now we can finish computing the record's CRC */
+ COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
+ SizeOfXLogRecord - sizeof(pg_crc32));
+ FIN_CRC32(rdata_crc);
record->xl_crc = rdata_crc;
/* Compute record's XLOG location */
@@ -885,6 +938,46 @@ begin:;
}
/*
+ * Fill a BkpBlock struct given a buffer containing the page to be saved
+ *
+ * This is nontrivial only because it has to decide whether to apply "hole
+ * compression".
+ */
+static void
+SetBkpBlock(BkpBlock *bkpb, Buffer buffer)
+{
+ PageHeader page;
+ uint16 offset;
+ uint16 length;
+
+ /* Save page identity info */
+ bkpb->node = BufferGetFileNode(buffer);
+ bkpb->block = BufferGetBlockNumber(buffer);
+
+ /* Test whether there is a "hole" containing zeroes in the page */
+ page = (PageHeader) BufferGetBlock(buffer);
+ offset = page->pd_lower;
+ /* Check if pd_lower appears sane at all */
+ if (offset >= SizeOfPageHeaderData && offset < BLCKSZ)
+ {
+ char *spd = (char *) page + offset;
+ char *epd = (char *) page + BLCKSZ;
+ char *pd = spd;
+
+ while (pd < epd && *pd == '\0')
+ pd++;
+
+ length = pd - spd;
+ if (length == 0)
+ offset = 0;
+ }
+ else
+ offset = length = 0;
+ bkpb->hole_offset = offset;
+ bkpb->hole_length = length;
+}
+
+/*
* XLogArchiveNotify
*
* Create an archive notification file
@@ -2276,7 +2369,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
continue;
- memcpy((char *) &bkpb, blk, sizeof(BkpBlock));
+ memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock);
reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);
@@ -2287,7 +2380,21 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
- memcpy((char *) page, blk, BLCKSZ);
+
+ if (bkpb.hole_length == 0)
+ {
+ memcpy((char *) page, blk, BLCKSZ);
+ }
+ else
+ {
+ /* must zero-fill the hole */
+ MemSet((char *) page, 0, BLCKSZ);
+ memcpy((char *) page, blk, bkpb.hole_offset);
+ memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
+ blk + bkpb.hole_offset,
+ BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@@ -2295,7 +2402,7 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
}
}
- blk += BLCKSZ;
+ blk += BLCKSZ - bkpb.hole_length;
}
}
@@ -2309,53 +2416,61 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
static bool
RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{
- crc64 crc;
- crc64 cbuf;
+ pg_crc32 crc;
int i;
uint32 len = record->xl_len;
+ BkpBlock bkpb;
char *blk;
- /* Check CRC of rmgr data and record header */
- INIT_CRC64(crc);
- COMP_CRC64(crc, XLogRecGetData(record), len);
- COMP_CRC64(crc, (char *) record + sizeof(crc64),
- SizeOfXLogRecord - sizeof(crc64));
- FIN_CRC64(crc);
+ /* First the rmgr data */
+ INIT_CRC32(crc);
+ COMP_CRC32(crc, XLogRecGetData(record), len);
- if (!EQ_CRC64(record->xl_crc, crc))
- {
- ereport(emode,
- (errmsg("incorrect resource manager data checksum in record at %X/%X",
- recptr.xlogid, recptr.xrecoff)));
- return (false);
- }
-
- /* Check CRCs of backup blocks, if any */
+ /* Add in the backup blocks, if any */
blk = (char *) XLogRecGetData(record) + len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
+ uint32 blen;
+
if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
continue;
- INIT_CRC64(crc);
- COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
- COMP_CRC64(crc, blk + sizeof(crc64),
- sizeof(BkpBlock) - sizeof(crc64));
- FIN_CRC64(crc);
- memcpy((char *) &cbuf, blk, sizeof(crc64)); /* don't assume
- * alignment */
-
- if (!EQ_CRC64(cbuf, crc))
+ memcpy(&bkpb, blk, sizeof(BkpBlock));
+ if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
{
ereport(emode,
- (errmsg("incorrect checksum of backup block %d in record at %X/%X",
- i + 1, recptr.xlogid, recptr.xrecoff)));
- return (false);
+ (errmsg("incorrect hole size in record at %X/%X",
+ recptr.xlogid, recptr.xrecoff)));
+ return false;
}
- blk += sizeof(BkpBlock) + BLCKSZ;
+ blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
+ COMP_CRC32(crc, blk, blen);
+ blk += blen;
+ }
+
+ /* Check that xl_tot_len agrees with our calculation */
+ if (blk != (char *) record + record->xl_tot_len)
+ {
+ ereport(emode,
+ (errmsg("incorrect total length in record at %X/%X",
+ recptr.xlogid, recptr.xrecoff)));
+ return false;
}
- return (true);
+ /* Finally include the record header */
+ COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
+ SizeOfXLogRecord - sizeof(pg_crc32));
+ FIN_CRC32(crc);
+
+ if (!EQ_CRC32(record->xl_crc, crc))
+ {
+ ereport(emode,
+ (errmsg("incorrect resource manager data checksum in record at %X/%X",
+ recptr.xlogid, recptr.xrecoff)));
+ return false;
+ }
+
+ return true;
}
/*
@@ -2382,7 +2497,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
uint32 targetPageOff;
uint32 targetRecOff;
uint32 pageHeaderSize;
- unsigned i;
if (readBuf == NULL)
{
@@ -2518,6 +2632,15 @@ got_record:;
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
+ if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
+ record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
+ XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+ {
+ ereport(emode,
+ (errmsg("invalid record length at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ goto next_record_is_invalid;
+ }
if (record->xl_rmid > RM_MAX_ID)
{
ereport(emode,
@@ -2558,24 +2681,13 @@ got_record:;
}
/*
- * Compute total length of record including any appended backup
- * blocks.
- */
- total_len = SizeOfXLogRecord + record->xl_len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
- continue;
- total_len += sizeof(BkpBlock) + BLCKSZ;
- }
-
- /*
* Allocate or enlarge readRecordBuf as needed. To avoid useless
* small increases, round its size to a multiple of BLCKSZ, and make
* sure it's at least 4*BLCKSZ to start with. (That is enough for all
* "normal" records, but very large commit or abort records might need
* more space.)
*/
+ total_len = record->xl_tot_len;
if (total_len > readRecordBufSize)
{
uint32 newSize = total_len;
@@ -2666,15 +2778,15 @@ got_record:;
goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
if (BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
- SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
+ MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
{
nextRecord = (XLogRecord *) ((char *) contrecord +
- SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
+ MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
}
EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
- pageHeaderSize + SizeOfXLogContRecord +
- MAXALIGN(contrecord->xl_rem_len);
+ pageHeaderSize +
+ MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
ReadRecPtr = *RecPtr;
return record;
}
@@ -3194,11 +3306,11 @@ WriteControlFile(void)
StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
/* Contents are protected with a CRC */
- INIT_CRC64(ControlFile->crc);
- COMP_CRC64(ControlFile->crc,
- (char *) ControlFile + sizeof(crc64),
- sizeof(ControlFileData) - sizeof(crc64));
- FIN_CRC64(ControlFile->crc);
+ INIT_CRC32(ControlFile->crc);
+ COMP_CRC32(ControlFile->crc,
+ (char *) ControlFile,
+ offsetof(ControlFileData, crc));
+ FIN_CRC32(ControlFile->crc);
/*
* We write out BLCKSZ bytes into pg_control, zero-padding the excess
@@ -3247,7 +3359,7 @@ WriteControlFile(void)
static void
ReadControlFile(void)
{
- crc64 crc;
+ pg_crc32 crc;
int fd;
/*
@@ -3281,13 +3393,13 @@ ReadControlFile(void)
ControlFile->pg_control_version, PG_CONTROL_VERSION),
errhint("It looks like you need to initdb.")));
/* Now check the CRC. */
- INIT_CRC64(crc);
- COMP_CRC64(crc,
- (char *) ControlFile + sizeof(crc64),
- sizeof(ControlFileData) - sizeof(crc64));
- FIN_CRC64(crc);
+ INIT_CRC32(crc);
+ COMP_CRC32(crc,
+ (char *) ControlFile,
+ offsetof(ControlFileData, crc));
+ FIN_CRC32(crc);
- if (!EQ_CRC64(crc, ControlFile->crc))
+ if (!EQ_CRC32(crc, ControlFile->crc))
ereport(FATAL,
(errmsg("incorrect checksum in control file")));
@@ -3396,11 +3508,11 @@ UpdateControlFile(void)
{
int fd;
- INIT_CRC64(ControlFile->crc);
- COMP_CRC64(ControlFile->crc,
- (char *) ControlFile + sizeof(crc64),
- sizeof(ControlFileData) - sizeof(crc64));
- FIN_CRC64(ControlFile->crc);
+ INIT_CRC32(ControlFile->crc);
+ COMP_CRC32(ControlFile->crc,
+ (char *) ControlFile,
+ offsetof(ControlFileData, crc));
+ FIN_CRC32(ControlFile->crc);
fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
@@ -3525,7 +3637,7 @@ BootStrapXLOG(void)
bool use_existent;
uint64 sysidentifier;
struct timeval tv;
- crc64 crc;
+ pg_crc32 crc;
/*
* Select a hopefully-unique system identifier code for this
@@ -3582,16 +3694,17 @@ BootStrapXLOG(void)
record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0;
record->xl_xid = InvalidTransactionId;
+ record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
record->xl_len = sizeof(checkPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
- INIT_CRC64(crc);
- COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
- COMP_CRC64(crc, (char *) record + sizeof(crc64),
- SizeOfXLogRecord - sizeof(crc64));
- FIN_CRC64(crc);
+ INIT_CRC32(crc);
+ COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
+ COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
+ SizeOfXLogRecord - sizeof(pg_crc32));
+ FIN_CRC32(crc);
record->xl_crc = crc;
/* Create first XLOG segment file */
@@ -4694,7 +4807,8 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
}
return NULL;
}
- if (record->xl_len != sizeof(CheckPoint))
+ if (record->xl_len != sizeof(CheckPoint) ||
+ record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
{
switch (whichChkpt)
{