summaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c49
1 files changed, 40 insertions, 9 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 4a5ba5b4263..a2f15a9a03e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
static void WalRcvDie(int code, Datum arg);
+static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
@@ -258,7 +259,7 @@ WalReceiverMain(void)
/* Loop until end-of-streaming or error */
for (;;)
{
- XLogRecPtr recptr;
+ unsigned char type;
char *buf;
int len;
@@ -287,17 +288,17 @@ WalReceiverMain(void)
}
/* Wait a while for data to arrive */
- if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
+ if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
- /* Write received WAL records to disk */
- XLogWalRcvWrite(buf, len, recptr);
+ /* Accept the received data, and process it */
+ XLogWalRcvProcessMsg(type, buf, len);
- /* Receive any more WAL records we can without sleeping */
- while(walrcv_receive(0, &recptr, &buf, &len))
- XLogWalRcvWrite(buf, len, recptr);
+ /* Receive any more data we can without sleeping */
+ while(walrcv_receive(0, &type, &buf, &len))
+ XLogWalRcvProcessMsg(type, buf, len);
/*
- * Now that we've written some records, flush them to disk and
+ * If we've written some records, flush them to disk and
* let the startup process know about them.
*/
XLogWalRcvFlush();
@@ -376,6 +377,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
}
/*
+ * Accept the message from XLOG stream, and process it.
+ */
+static void
+XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+{
+ switch (type)
+ {
+ case 'w': /* WAL records */
+ {
+ XLogRecPtr recptr;
+
+ if (len < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ recptr = *((XLogRecPtr *) buf);
+ buf += sizeof(XLogRecPtr);
+ len -= sizeof(XLogRecPtr);
+ XLogWalRcvWrite(buf, len, recptr);
+ break;
+ }
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication message type %d",
+ type)));
+ }
+}
+
+/*
* Write XLOG data to disk.
*/
static void