diff options
| author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2013-01-17 20:23:00 +0200 |
|---|---|---|
| committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2013-01-17 20:23:00 +0200 |
| commit | 0b6329130e8e4576e97ff763f0e773347e1a88af (patch) | |
| tree | 7902ba1fa99ac8124232122ce16231cff0b0e21e /src/backend/replication/walsender.c | |
| parent | 8ae35e91807508872cabd3b0e8db35fc78e194ac (diff) | |
Make pg_receivexlog and pg_basebackup -X stream work across timeline switches.
This mirrors the changes done earlier to the server in standby mode. When
receivelog reaches the end of a timeline, as reported by the server, it
fetches the timeline history file of the next timeline, and restarts
streaming from the new timeline by issuing a new START_STREAMING command.
When pg_receivexlog crosses a timeline, it leaves the .partial suffix on the
last segment on the old timeline. This helps you to tell apart a partial
segment left in the directory because of a timeline switch, and a completed
segment. If you just follow a single server, it won't make a difference, but
it can be significant in more complicated scenarios where new WAL is still
generated on the old timeline.
This includes two small changes to the streaming replication protocol:
First, when you reach the end of timeline while streaming, the server now
sends the TLI of the next timeline in the server's history to the client.
pg_receivexlog uses that as the next timeline, so that it doesn't need to
parse the timeline history file like a standby server does. Second, when
BASE_BACKUP command sends the begin and end WAL positions, it now also sends
the timeline IDs corresponding the positions.
Diffstat (limited to 'src/backend/replication/walsender.c')
| -rw-r--r-- | src/backend/replication/walsender.c | 50 |
1 files changed, 44 insertions, 6 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ad7d1c911b3..ba138e73da3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -117,6 +117,7 @@ static uint32 sendOff = 0; * history forked off from that timeline at sendTimeLineValidUpto. */ static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; @@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd) * requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); - switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, + &sendTimeLineNextTLI); list_free_deep(timeLineHistory); /* @@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd) streamingDoneSending = streamingDoneReceiving = false; /* If there is nothing to stream, don't even enter COPY mode */ - if (!sendTimeLineIsHistoric || - cmd->startpoint < sendTimeLineValidUpto) + if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* * When we first start replication the standby will be behind the primary. @@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd) if (walsender_ready_to_stop) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); + + Assert(streamingDoneSending && streamingDoneReceiving); + } + + /* + * Copy is finished now. Send a single-row result set indicating the next + * timeline. + */ + if (sendTimeLineIsHistoric) + { + char str[11]; + snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 1, 2); /* 1 field */ + + /* Field header */ + pq_sendstring(&buf, "next_tli"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* + * int8 may seem like a surprising data type for this, but in theory + * int4 would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint(&buf, INT8OID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 1, 2); /* number of columns */ + pq_sendint(&buf, strlen(str), 4); /* length */ + pq_sendbytes(&buf, str, strlen(str)); + pq_endmessage(&buf); } - /* Get out of COPY mode (CommandComplete). */ - EndCommand("COPY 0", DestRemote); + /* Send CommandComplete message */ + pq_puttextmessage('C', "START_STREAMING"); } /* @@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup) List *history; history = readTimeLineHistory(ThisTimeLineID); - sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sentPtr <= sendTimeLineValidUpto); + Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); /* the current send pointer should be <= the switchpoint */ |
