diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_receivexlog.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_receivexlog.c | 465 |
1 files changed, 465 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c new file mode 100644 index 00000000000..ba533d35978 --- /dev/null +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -0,0 +1,465 @@ +/*------------------------------------------------------------------------- + * + * pg_receivexlog.c - receive streaming transaction log data and write it + * to a local file. + * + * Author: Magnus Hagander <magnus@hagander.net> + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_receivexlog.c + *------------------------------------------------------------------------- + */ + +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "access/xlog_internal.h" + +#include "receivelog.h" +#include "streamutil.h" + +#include <dirent.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include "getopt_long.h" + +/* Global options */ +char *basedir = NULL; +int verbose = 0; +int standby_message_timeout = 10; /* 10 sec = default */ +volatile bool time_to_abort = false; + + +static void usage(void); +static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); +static void StreamLog(); +static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); + +static void +usage(void) +{ + printf(_("%s receives PostgreSQL streaming transaction logs\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nOptions controlling the output:\n")); + printf(_(" -D, --dir=directory receive xlog files into this directory\n")); + printf(_("\nGeneral options:\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_("\nConnection options:\n")); + printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); + printf(_(" -p, --port=PORT database server port number\n")); + printf(_(" -U, --username=NAME connect as specified database user\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n")); +} + +static bool +segment_callback(XLogRecPtr segendpos, uint32 timeline) +{ + char fn[MAXPGPATH]; + struct stat statbuf; + + if (verbose) + fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), + progname, segendpos.xlogid, segendpos.xrecoff, timeline); + + /* + * Check if there is a partial file for the name we just finished, and if + * there is, remove it under the assumption that we have now got all the + * data we need. + */ + segendpos.xrecoff /= XLOG_SEG_SIZE; + PrevLogSeg(segendpos.xlogid, segendpos.xrecoff); + snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial", + basedir, timeline, + segendpos.xlogid, + segendpos.xrecoff); + if (stat(fn, &statbuf) == 0) + { + /* File existed, get rid of it */ + if (verbose) + fprintf(stderr, _("%s: removing file \"%s\"\n"), + progname, fn); + unlink(fn); + } + + /* + * Never abort from this - we handle all aborting in continue_streaming() + */ + return false; +} + +static bool +continue_streaming(void) +{ + if (time_to_abort) + { + fprintf(stderr, _("%s: received interrupt signal, exiting.\n"), + progname); + return true; + } + return false; +} + +/* + * Determine starting location for streaming, based on: + * 1. If there are existing xlog segments, start at the end of the last one + * 2. If the last one is a partial segment, rename it and start over, since + * we don't sync after every write. + * 3. If no existing xlog exists, start from the beginning of the current + * WAL segment. + */ +static XLogRecPtr +FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) +{ + DIR *dir; + struct dirent *dirent; + int i; + bool b; + uint32 high_log = 0; + uint32 high_seg = 0; + bool partial = false; + + dir = opendir(basedir); + if (dir == NULL) + { + fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), + progname, basedir, strerror(errno)); + disconnect_and_exit(1); + } + + while ((dirent = readdir(dir)) != NULL) + { + char fullpath[MAXPGPATH]; + struct stat statbuf; + uint32 tli, + log, + seg; + + if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, "..")) + continue; + + /* xlog files are always 24 characters */ + if (strlen(dirent->d_name) != 24) + continue; + + /* Filenames are always made out of 0-9 and A-F */ + b = false; + for (i = 0; i < 24; i++) + { + if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') && + !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F')) + { + b = true; + break; + } + } + if (b) + continue; + + /* + * Looks like an xlog file. Parse its position. + */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + + /* Ignore any files that are for another timeline */ + if (tli != currenttimeline) + continue; + + /* Check if this is a completed segment or not */ + snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); + if (stat(fullpath, &statbuf) != 0) + { + fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + + if (statbuf.st_size == 16 * 1024 * 1024) + { + /* Completed segment */ + if (log > high_log || + (log == high_log && seg > high_seg)) + { + high_log = log; + high_seg = seg; + continue; + } + } + else + { + /* + * This is a partial file. Rename it out of the way. + */ + char newfn[MAXPGPATH]; + + fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"), + progname, dirent->d_name, dirent->d_name); + + snprintf(newfn, sizeof(newfn), "%s/%s.partial", + basedir, dirent->d_name); + + if (stat(newfn, &statbuf) == 0) + { + /* + * XXX: perhaps we should only error out if the existing file + * is larger? + */ + fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"), + progname, newfn); + disconnect_and_exit(1); + } + if (rename(fullpath, newfn) != 0) + { + fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"), + progname, fullpath, newfn, strerror(errno)); + disconnect_and_exit(1); + } + + /* Don't continue looking for more, we assume this is the last */ + partial = true; + break; + } + } + + closedir(dir); + + if (high_log > 0 || high_seg > 0) + { + XLogRecPtr high_ptr; + + if (!partial) + { + /* + * If the segment was partial, the pointer is already at the right + * location since we want to re-transmit that segment. If it was + * not, we need to move it to the next segment, since we are + * tracking the last one that was complete. + */ + NextLogSeg(high_log, high_seg); + } + + high_ptr.xlogid = high_log; + high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE; + + return high_ptr; + } + else + return currentpos; +} + +/* + * Start the log streaming + */ +static void +StreamLog(void) +{ + PGresult *res; + uint32 timeline; + XLogRecPtr startpos; + + /* + * Connect in replication mode to the server + */ + conn = GetConnection(); + + /* + * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog + * position. + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + timeline = atoi(PQgetvalue(res, 0, 1)); + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"), + progname, PQgetvalue(res, 0, 2)); + disconnect_and_exit(1); + } + PQclear(res); + + /* + * Figure out where to start streaming. + */ + startpos = FindStreamingStart(startpos, timeline); + + /* + * Always start streaming at the beginning of a segment + */ + startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE; + + /* + * Start the replication + */ + if (verbose) + fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), + progname, startpos.xlogid, startpos.xrecoff, timeline); + + ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, + segment_callback, continue_streaming, + standby_message_timeout); +} + +/* + * When sigint is called, just tell the system to exit at the next possible + * moment. + */ +static void +sigint_handler(int signum) +{ + time_to_abort = true; +} + +int +main(int argc, char **argv) +{ + static struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"version", no_argument, NULL, 'V'}, + {"dir", required_argument, NULL, 'D'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"username", required_argument, NULL, 'U'}, + {"no-password", no_argument, NULL, 'w'}, + {"password", no_argument, NULL, 'W'}, + {"statusint", required_argument, NULL, 's'}, + {"verbose", no_argument, NULL, 'v'}, + {NULL, 0, NULL, 0} + }; + int c; + + int option_index; + + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 + || strcmp(argv[1], "--version") == 0) + { + puts("pg_receivexlog (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv", + long_options, &option_index)) != -1) + { + switch (c) + { + case 'D': + basedir = xstrdup(optarg); + break; + case 'h': + dbhost = xstrdup(optarg); + break; + case 'p': + if (atoi(optarg) <= 0) + { + fprintf(stderr, _("%s: invalid port number \"%s\"\n"), + progname, optarg); + exit(1); + } + dbport = xstrdup(optarg); + break; + case 'U': + dbuser = xstrdup(optarg); + break; + case 'w': + dbgetpassword = -1; + break; + case 'W': + dbgetpassword = 1; + break; + case 's': + standby_message_timeout = atoi(optarg); + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 'v': + verbose++; + break; + default: + + /* + * getopt_long already emitted a complaint + */ + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + + /* + * Any non-option arguments? + */ + if (optind < argc) + { + fprintf(stderr, + _("%s: too many command-line arguments (first is \"%s\")\n"), + progname, argv[optind]); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* + * Required arguments + */ + if (basedir == NULL) + { + fprintf(stderr, _("%s: no target directory specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + +#ifndef WIN32 + pqsignal(SIGINT, sigint_handler); +#endif + + StreamLog(); + + exit(0); +} |