summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2015-01-03 20:51:52 +0100
committerAndres Freund <andres@anarazel.de>2015-01-03 20:54:13 +0100
commitf961ad4790bddd540ebc5c3b44f752a3869fa866 (patch)
tree816a9e1a7e5ce43034e2360f26d28956a101f930
parent4967e07a9131fa37b758c45fee3bd35ad7fb7035 (diff)
Prevent WAL files created by pg_basebackup -x/X from being archived again.
WAL (and timeline history) files created by pg_basebackup did not maintain the new base backup's archive status. That's currently not a problem if the new node is used as a standby - but if that node is promoted all still existing files can get archived again. With a high wal_keep_segment settings that can happen a significant time later - which is quite confusing. Change both the backend (for the -x/-X fetch case) and pg_basebackup (for -X stream) itself to always mark WAL/timeline files included in the base backup as .done. That's in line with walreceiver.c doing so. The verbosity of the pg_basebackup changes show pretty clearly that it needs some refactoring, but that'd result in not be backpatchable changes. Backpatch to 9.1 where pg_basebackup was introduced. Discussion: 20141205002854.GE21964@awork2.anarazel.de
-rw-r--r--src/backend/replication/basebackup.c24
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c48
-rw-r--r--src/bin/pg_basebackup/pg_receivexlog.c2
-rw-r--r--src/bin/pg_basebackup/receivelog.c54
-rw-r--r--src/bin/pg_basebackup/receivelog.h3
5 files changed, 115 insertions, 16 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index a00bea6fc51..896ab6917a6 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -400,6 +400,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
+ /* send the WAL file itself */
_tarWriteHeader(pathbuf, NULL, &statbuf);
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@@ -424,7 +425,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
}
/* XLogSegSize is a multiple of 512, so no need for padding */
+
FreeFile(fp);
+
+ /*
+ * Mark file as archived, otherwise files can get archived again
+ * after promotion of a new node. This is in line with
+ * walreceiver.c always doing a XLogArchiveForceDone() after a
+ * complete segment.
+ */
+ StatusFilePath(pathbuf, walFiles[i], ".done");
+ sendFileWithContent(pathbuf, "");
}
/*
@@ -447,6 +458,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("could not stat file \"%s\": %m", pathbuf)));
sendFile(pathbuf, pathbuf, &statbuf, false);
+
+ /* unconditionally mark file as archived */
+ StatusFilePath(pathbuf, fname, ".done");
+ sendFileWithContent(pathbuf, "");
}
/* Send CopyDone message for the last tar file */
@@ -881,6 +896,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
}
size += 512; /* Size of the header just added */
+
+ /*
+ * Also send archive_status directory (by hackishly reusing
+ * statbuf from above ...).
+ */
+ if (!sizeonly)
+ _tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
+ size += 512; /* Size of the header just added */
+
continue; /* don't recurse into pg_xlog */
}
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index f13e376d3d3..61d2cb3a73b 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -259,7 +259,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
- true))
+ true, true))
/*
* Any errors will already have been reported in the function process,
@@ -281,6 +281,7 @@ static void
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
{
logstreamer_param *param;
+ char statusdir[MAXPGPATH];
param = xmalloc0(sizeof(logstreamer_param));
param->timeline = timeline;
@@ -314,13 +315,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
/* Error message already written in GetConnection() */
exit(1);
+ snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+
/*
- * Always in plain format, so we can write to basedir/pg_xlog. But the
- * directory entry in the tar file may arrive later, so make sure it's
- * created before we start.
+ * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
+ * basedir/pg_xlog as the directory entry in the tar file may arrive
+ * later.
*/
- snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
- verify_dir_is_empty_or_create(param->xlogdir);
+ snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+ basedir);
+
+ if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+ {
+ fprintf(stderr,
+ _("%s: could not create directory \"%s\": %s\n"),
+ progname, statusdir, strerror(errno));
+ disconnect_and_exit(1);
+ }
/*
* Start a child process and tell it to start streaming. On Unix, this is
@@ -403,6 +414,23 @@ verify_dir_is_empty_or_create(char *dirname)
}
}
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+pg_str_endswith(const char *str, const char *end)
+{
+ size_t slen = strlen(str);
+ size_t elen = strlen(end);
+
+ /* can't be a postfix if longer */
+ if (elen > slen)
+ return false;
+
+ /* compare the end of the strings */
+ str += slen - elen;
+ return strcmp(str, end) == 0;
+}
/*
* Print a progress report based on the global variables. If verbose output
@@ -835,10 +863,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
{
/*
* When streaming WAL, pg_xlog will have been created
- * by the wal receiver process, so just ignore failure
- * on that.
+ * by the wal receiver process. So just ignore creation
+ * failures on related directories.
*/
- if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+ if (!((pg_str_endswith(filename, "/pg_xlog") ||
+ pg_str_endswith(filename, "/archive_status")) &&
+ errno == EEXIST))
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 6c998735d91..8167aebe9d9 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -321,7 +321,7 @@ StreamLog(void)
progname, startpos.xlogid, startpos.xrecoff, timeline);
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
- stop_streaming, standby_message_timeout, false);
+ stop_streaming, standby_message_timeout, false, true);
PQfinish(conn);
}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 3a46767417e..9cd0942de3a 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -44,6 +44,35 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
static int walfile = -1;
+static bool
+mark_file_as_archived(const char *basedir, const char *fname)
+{
+ int fd;
+ static char tmppath[MAXPGPATH];
+
+ snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
+ basedir, fname);
+
+ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
+ progname, tmppath, strerror(errno));
+ return false;
+ }
+
+ if (fsync(fd) != 0)
+ {
+ fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ progname, tmppath, strerror(errno));
+ return false;
+ }
+
+ close(fd);
+
+ return true;
+}
+
/*
* Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is
@@ -133,7 +162,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* completed writing the whole segment.
*/
static bool
-close_walfile(char *basedir, char *walname, bool segment_complete)
+close_walfile(char *basedir, char *walname, bool segment_complete,
+ bool mark_done)
{
off_t currpos = lseek(walfile, 0, SEEK_CUR);
@@ -184,6 +214,19 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
_("%s: not renaming \"%s\", segment is not complete\n"),
progname, walname);
+ /*
+ * Mark file as archived if requested by the caller - pg_basebackup needs
+ * to do so as files can otherwise get archived again after promotion of a
+ * new node. This is in line with walreceiver.c always doing a
+ * XLogArchiveForceDone() after a complete segment.
+ */
+ if (currpos == XLOG_SEG_SIZE && mark_done)
+ {
+ /* writes error message if failed */
+ if (!mark_file_as_archived(basedir, walname))
+ return false;
+ }
+
return true;
}
@@ -284,7 +327,8 @@ bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
- int standby_message_timeout, bool rename_partial)
+ int standby_message_timeout, bool rename_partial,
+ bool mark_done)
{
char query[128];
char current_walfile_name[MAXPGPATH];
@@ -343,7 +387,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
return false;
}
PQclear(res);
-
/*
* Receive the actual xlog data
*/
@@ -367,7 +410,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (stream_stop && stream_stop(blockpos, timeline, false))
{
if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
- rename_partial))
+ rename_partial, mark_done))
/* Potential error message is written by close_walfile */
goto error;
return true;
@@ -579,7 +622,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, current_walfile_name, false))
+ if (!close_walfile(basedir, current_walfile_name, false,
+ mark_done))
/* Error message written in close_walfile() */
goto error;
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 7176a68beaa..5ebf31d7c24 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -13,4 +13,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
- bool rename_partial);
+ bool rename_partial,
+ bool mark_done);