summaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c71
1 files changed, 26 insertions, 45 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45a3b2ef294..2c04df08ed1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -87,8 +87,7 @@ int replication_timeout = 60 * 1000; /* maximum time to send one
* but for walsender to read the XLOG.
*/
static int sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/*
@@ -977,10 +976,8 @@ XLogRead(char *buf, XLogRecPtr startptr, Size count)
char *p;
XLogRecPtr recptr;
Size nbytes;
- uint32 lastRemovedLog;
- uint32 lastRemovedSeg;
- uint32 log;
- uint32 seg;
+ XLogSegNo lastRemovedSegNo;
+ XLogSegNo segno;
retry:
p = buf;
@@ -995,7 +992,7 @@ retry:
startoff = recptr.xrecoff % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
char path[MAXPGPATH];
@@ -1003,8 +1000,8 @@ retry:
if (sendFile >= 0)
close(sendFile);
- XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+ XLByteToSeg(recptr, sendSegNo);
+ XLogFilePath(path, ThisTimeLineID, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
@@ -1015,20 +1012,15 @@ retry:
* removed or recycled.
*/
if (errno == ENOENT)
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
- path, sendId, sendSeg)));
+ errmsg("could not open file \"%s\": %m",
+ path)));
}
sendOff = 0;
}
@@ -1039,8 +1031,9 @@ retry:
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- sendId, sendSeg, startoff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ startoff)));
sendOff = startoff;
}
@@ -1052,11 +1045,13 @@ retry:
readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
+ {
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u, "
- "length %lu: %m",
- sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ sendOff, (unsigned long) segbytes)));
+ }
/* Update state for read */
XLByteAdvance(recptr, readbytes);
@@ -1073,19 +1068,13 @@ retry:
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startptr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, log, seg);
+ XLogGetLastRemoved(&lastRemovedSegNo);
+ XLByteToSeg(startptr, segno);
+ if (segno <= lastRemovedSegNo)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, segno))));
/*
* During recovery, the currently-open WAL file might be replaced with the
@@ -1165,24 +1154,13 @@ XLogSend(char *msgbuf, bool *caughtup)
* SendRqstPtr never points to the middle of a WAL record.
*/
startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
if (endptr.xlogid != startptr.xlogid)
{
/* Don't cross a logfile boundary within one message */
Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
+ endptr.xrecoff = 0;
}
/* if we went beyond SendRqstPtr, back off */
@@ -1198,7 +1176,10 @@ XLogSend(char *msgbuf, bool *caughtup)
*caughtup = false;
}
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ if (endptr.xrecoff == 0)
+ nbytes = 0x100000000L - (uint64) startptr.xrecoff;
+ else
+ nbytes = endptr.xrecoff - startptr.xrecoff;
Assert(nbytes <= MAX_SEND_SIZE);
/*