summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/walreceiver.c65
1 files changed, 33 insertions, 32 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f2694db8733..ecb2c3a6d39 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.10 2010/04/20 22:55:03 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -76,9 +76,15 @@ static uint32 recvOff = 0;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;
-static void ProcessWalRcvInterrupts(void);
-static void EnableWalRcvImmediateExit(void);
-static void DisableWalRcvImmediateExit(void);
+/*
+ * LogstreamResult indicates the byte positions that we have already
+ * written/fsynced.
+ */
+static struct
+{
+ XLogRecPtr Write; /* last byte + 1 written out in the standby */
+ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
+} LogstreamResult;
/*
* About SIGTERM handling:
@@ -98,6 +104,21 @@ static void DisableWalRcvImmediateExit(void);
*/
static volatile bool WalRcvImmediateInterruptOK = false;
+/* Prototypes for private functions */
+static void ProcessWalRcvInterrupts(void);
+static void EnableWalRcvImmediateExit(void);
+static void DisableWalRcvImmediateExit(void);
+static void WalRcvDie(int code, Datum arg);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
+static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+static void XLogWalRcvFlush(void);
+
+/* Signal handlers */
+static void WalRcvSigHupHandler(SIGNAL_ARGS);
+static void WalRcvShutdownHandler(SIGNAL_ARGS);
+static void WalRcvQuickDieHandler(SIGNAL_ARGS);
+
+
static void
ProcessWalRcvInterrupts(void)
{
@@ -118,47 +139,25 @@ ProcessWalRcvInterrupts(void)
}
static void
-EnableWalRcvImmediateExit()
+EnableWalRcvImmediateExit(void)
{
WalRcvImmediateInterruptOK = true;
ProcessWalRcvInterrupts();
}
static void
-DisableWalRcvImmediateExit()
+DisableWalRcvImmediateExit(void)
{
WalRcvImmediateInterruptOK = false;
ProcessWalRcvInterrupts();
}
-/* Signal handlers */
-static void WalRcvSigHupHandler(SIGNAL_ARGS);
-static void WalRcvShutdownHandler(SIGNAL_ARGS);
-static void WalRcvQuickDieHandler(SIGNAL_ARGS);
-
-/* Prototypes for private functions */
-static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(void);
-
-/*
- * LogstreamResult indicates the byte positions that we have already
- * written/fsynced.
- */
-static struct
-{
- XLogRecPtr Write; /* last byte + 1 written out in the standby */
- XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
-} LogstreamResult;
-
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
-
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
@@ -398,19 +397,21 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
if (len < sizeof(XLogRecPtr))
ereport(ERROR,
- (errmsg("invalid WAL message received from primary")));
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid WAL message received from primary")));
- recptr = *((XLogRecPtr *) buf);
+ memcpy(&recptr, buf, sizeof(XLogRecPtr));
buf += sizeof(XLogRecPtr);
len -= sizeof(XLogRecPtr);
+
XLogWalRcvWrite(buf, len, recptr);
break;
}
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid replication message type %d",
- type)));
+ errmsg_internal("invalid replication message type %d",
+ type)));
}
}