summaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c398
1 files changed, 398 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
new file mode 100644
index 00000000000..0ca30c425f3
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -0,0 +1,398 @@
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ * replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+#include "libpq-fe.h"
+#include "access/xlog_internal.h"
+#include "replication/walprotocol.h"
+#include "utils/datetime.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+ int f;
+ char fn[MAXPGPATH];
+
+ XLogFileName(namebuf, timeline, startpoint.xlogid,
+ startpoint.xrecoff / XLOG_SEG_SIZE);
+
+ snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+ f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+ if (f == -1)
+ fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+ progname, namebuf, strerror(errno));
+ return f;
+}
+
+/*
+ * Local version of GetCurrentTimestamp(), since we are not linked with
+ * backend code.
+ */
+static TimestampTz
+localGetCurrentTimestamp(void)
+{
+ TimestampTz result;
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+
+ result = (TimestampTz) tp.tv_sec -
+ ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+#ifdef HAVE_INT64_TIMESTAMP
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+#else
+ result = result + (tp.tv_usec / 1000000.0);
+#endif
+
+ return result;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * If sysidentifier is specified, validate that both the system
+ * identifier and the timeline matches the specified ones
+ * (by sending an extra IDENTIFY_SYSTEM command)
+ *
+ * All received segments will be written to the directory
+ * specified by basedir.
+ *
+ * The segment_finish callback will be called after each segment
+ * has been finished, and the stream_continue callback will be
+ * called every time data is received. If either of these callbacks
+ * return true, the streaming will stop and the function
+ * return. As long as they return false, streaming will continue
+ * indefinitely.
+ *
+ * standby_message_timeout controls how often we send a message
+ * back to the master letting it know our progress, in seconds.
+ * This message will only contain the write location, and never
+ * flush or replay.
+ *
+ * Note: The log position *must* be at a log segment start!
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+{
+ char query[128];
+ char current_walfile_name[MAXPGPATH];
+ PGresult *res;
+ char *copybuf = NULL;
+ int walfile = -1;
+ int64 last_status = -1;
+ XLogRecPtr blockpos = InvalidXLogRecPtr;
+
+ if (sysidentifier != NULL)
+ {
+ /* Validate system identifier and timeline hasn't changed */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ PQclear(res);
+ return false;
+ }
+ if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
+ {
+ fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname);
+ PQclear(res);
+ return false;
+ }
+ if (timeline != atoi(PQgetvalue(res, 0, 1)))
+ {
+ fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname);
+ PQclear(res);
+ return false;
+ }
+ PQclear(res);
+ }
+
+ /* Initiate the replication stream at specified location */
+ snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ {
+ fprintf(stderr, _("%s: could not start replication: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+
+ /*
+ * Receive the actual xlog data
+ */
+ while (1)
+ {
+ int r;
+ int xlogoff;
+ int bytes_left;
+ int bytes_written;
+ int64 now;
+
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
+ /*
+ * Check if we should continue streaming, or abort at this point.
+ */
+ if (stream_continue && stream_continue())
+ {
+ if (walfile != -1)
+ {
+ fsync(walfile);
+ close(walfile);
+ }
+ return true;
+ }
+
+ /*
+ * Potentially send a status message to the master
+ */
+ now = localGetCurrentTimestamp();
+ if (standby_message_timeout > 0 &&
+ last_status < now - standby_message_timeout * 1000000)
+ {
+ /* Time to send feedback! */
+ char replybuf[sizeof(StandbyReplyMessage) + 1];
+ StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
+
+ replymsg->write = blockpos;
+ replymsg->flush = InvalidXLogRecPtr;
+ replymsg->apply = InvalidXLogRecPtr;
+ replymsg->sendTime = now;
+ replybuf[0] = 'r';
+
+ if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
+ PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send feedback packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+
+ last_status = now;
+ }
+
+ r = PQgetCopyData(conn, &copybuf, 1);
+ if (r == 0)
+ {
+ /*
+ * In async mode, and no data available. We block on reading but
+ * not more than the specified timeout, so that we can send a
+ * response back to the client.
+ */
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *timeoutptr;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+ if (standby_message_timeout)
+ {
+ timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+ if (timeout.tv_sec <= 0)
+ timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ timeout.tv_usec = 0;
+ timeoutptr = &timeout;
+ }
+ else
+ timeoutptr = NULL;
+
+ r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ if (r == 0 || (r < 0 && errno == EINTR))
+ {
+ /*
+ * Got a timeout or signal. Continue the loop and either
+ * deliver a status packet to the server or just go back into
+ * blocking.
+ */
+ continue;
+ }
+ else if (r < 0)
+ {
+ fprintf(stderr, _("%s: select() failed: %m\n"), progname);
+ return false;
+ }
+ /* Else there is actually data on the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ continue;
+ }
+ if (r == -1)
+ /* End of copy stream */
+ break;
+ if (r == -2)
+ {
+ fprintf(stderr, _("%s: could not read copy data: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ if (r < STREAMING_HEADER_SIZE + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ progname, r);
+ return false;
+ }
+ if (copybuf[0] != 'w')
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
+ return false;
+ }
+
+ /* Extract WAL location for this block */
+ memcpy(&blockpos, copybuf + 1, 8);
+ xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Verify that the initial location in the stream matches where we
+ * think we are.
+ */
+ if (walfile == -1)
+ {
+ /* No file open yet */
+ if (xlogoff != 0)
+ {
+ fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+ progname, xlogoff);
+ return false;
+ }
+ }
+ else
+ {
+ /* More data in existing segment */
+ /* XXX: store seek value don't reseek all the time */
+ if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ {
+ fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ return false;
+ }
+ }
+
+ bytes_left = r - STREAMING_HEADER_SIZE;
+ bytes_written = 0;
+
+ while (bytes_left)
+ {
+ int bytes_to_write;
+
+ /*
+ * If crossing a WAL boundary, only write up until we reach
+ * XLOG_SEG_SIZE.
+ */
+ if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ else
+ bytes_to_write = bytes_left;
+
+ if (walfile == -1)
+ {
+ walfile = open_walfile(blockpos, timeline,
+ basedir, current_walfile_name);
+ if (walfile == -1)
+ /* Error logged by open_walfile */
+ return false;
+ }
+
+ if (write(walfile,
+ copybuf + STREAMING_HEADER_SIZE + bytes_written,
+ bytes_to_write) != bytes_to_write)
+ {
+ fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+ progname,
+ bytes_to_write,
+ current_walfile_name,
+ strerror(errno));
+ return false;
+ }
+
+ /* Write was successful, advance our position */
+ bytes_written += bytes_to_write;
+ bytes_left -= bytes_to_write;
+ XLByteAdvance(blockpos, bytes_to_write);
+ xlogoff += bytes_to_write;
+
+ /* Did we reach the end of a WAL segment? */
+ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
+ {
+ fsync(walfile);
+ close(walfile);
+ walfile = -1;
+ xlogoff = 0;
+
+ if (segment_finish != NULL)
+ {
+ /*
+ * Callback when the segment finished, and return if it
+ * told us to.
+ */
+ if (segment_finish(blockpos, timeline))
+ return true;
+ }
+ }
+ }
+ /* No more data left to write, start receiving next copy packet */
+ }
+
+ /*
+ * The only way to get out of the loop is if the server shut down the
+ * replication stream. If it's a controlled shutdown, the server will send
+ * a shutdown message, and we'll return the latest xlog location that has
+ * been streamed.
+ */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+ return true;
+}