diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2010-02-03 09:47:19 +0000 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2010-02-03 09:47:19 +0000 |
commit | 808969d0e7ac2d2fdbd915c6a6ac9ec68b6f63f9 (patch) | |
tree | 0215738e25ef02df0eedee365a6a38ca57f7df37 /src/backend/replication/walreceiver.c | |
parent | 47c5b8f5588da67a95dca8cb14b2bc1b7f291e15 (diff) |
Add a message type header to the CopyData messages sent from primary
to standby in streaming replication. While we only have one message type
at the moment, adding a message type header makes this easier to extend.
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 49 |
1 files changed, 40 insertions, 9 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 4a5ba5b4263..a2f15a9a03e 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.3 2010/02/03 09:47:19 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -135,6 +135,7 @@ static void WalRcvQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ static void WalRcvDie(int code, Datum arg); +static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); @@ -258,7 +259,7 @@ WalReceiverMain(void) /* Loop until end-of-streaming or error */ for (;;) { - XLogRecPtr recptr; + unsigned char type; char *buf; int len; @@ -287,17 +288,17 @@ WalReceiverMain(void) } /* Wait a while for data to arrive */ - if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len)) + if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { - /* Write received WAL records to disk */ - XLogWalRcvWrite(buf, len, recptr); + /* Accept the received data, and process it */ + XLogWalRcvProcessMsg(type, buf, len); - /* Receive any more WAL records we can without sleeping */ - while(walrcv_receive(0, &recptr, &buf, &len)) - XLogWalRcvWrite(buf, len, recptr); + /* Receive any more data we can without sleeping */ + while(walrcv_receive(0, &type, &buf, &len)) + XLogWalRcvProcessMsg(type, buf, len); /* - * Now that we've written some records, flush them to disk and + * If we've written some records, flush them to disk and * let the startup process know about them. */ XLogWalRcvFlush(); @@ -376,6 +377,36 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) } /* + * Accept the message from XLOG stream, and process it. + */ +static void +XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) +{ + switch (type) + { + case 'w': /* WAL records */ + { + XLogRecPtr recptr; + + if (len < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + recptr = *((XLogRecPtr *) buf); + buf += sizeof(XLogRecPtr); + len -= sizeof(XLogRecPtr); + XLogWalRcvWrite(buf, len, recptr); + break; + } + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid replication message type %d", + type))); + } +} + +/* * Write XLOG data to disk. */ static void |