summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2021-11-05 10:22:07 -0400
committerRobert Haas <rhaas@postgresql.org>2021-11-05 10:26:18 -0400
commit23a1c6578c87fca0e361c4f5f9a07df5ae1f9858 (patch)
tree511a7d2c6e759b9754fdd37ff2ab5af7859eab2c
parent00a354a13560dc529ac34a303c85c265aaf033b7 (diff)
Introduce 'bbstreamer' abstraction to modularize pg_basebackup.
pg_basebackup knows how to do quite a few things with a backup that it gets from the server, like just write out the files, or compress them first, or even parse the tar format and inject a modified postgresql.auto.conf file into the archive generated by the server. Unforatunely, this makes pg_basebackup.c a very large source file, and also somewhat difficult to enhance, because for example the knowledge that the server is sending us a 'tar' file rather than some other sort of archive is spread all over the place rather than centralized. In an effort to improve this situation, this commit invents a new 'bbstreamer' abstraction. Each archive received from the server is fed to a bbstreamer which may choose to dispose of it or pass it along to some other bbstreamer. Chunks may also be "labelled" according to whether they are part of the payload data of a file in the archive or part of the archive metadata. So, for example, if we want to take a tar file, modify the postgresql.auto.conf file it contains, and the gzip the result and write it out, we can use a bbstreamer_tar_parser to parse the tar file received from the server, a bbstreamer_recovery_injector to modify the contents of postgresql.auto.conf, a bbstreamer_tar_archiver to replace the tar headers for the file modified in the previous step with newly-built ones that are correct for the modified file, and a bbstreamer_gzip_writer to gzip and write the resulting data. Only the objects with "tar" in the name know anything about the tar archive format, and in theory we could re-archive using some other format rather than "tar" if somebody wanted to write the code. These chances do add a substantial amount of code, but I think the result is a lot more maintainable and extensible. pg_basebackup.c itself shrinks by roughly a third, with a lot of the complexity previously contained there moving into the newly-added files. Patch by me. The larger patch series of which this is a part has been reviewed and tested at various times by Andres Freund, Sumanta Mukherjee, Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja, Mark Dilger, Sergei Kornilov, and Jeevan Ladhe. Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
-rw-r--r--src/bin/pg_basebackup/Makefile12
-rw-r--r--src/bin/pg_basebackup/bbstreamer.h217
-rw-r--r--src/bin/pg_basebackup/bbstreamer_file.c579
-rw-r--r--src/bin/pg_basebackup/bbstreamer_inject.c250
-rw-r--r--src/bin/pg_basebackup/bbstreamer_tar.c444
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c912
6 files changed, 1687 insertions, 727 deletions
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index fd920fc197e..4f5ac6df1a2 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -35,10 +35,16 @@ OBJS = \
streamutil.o \
walmethods.o
+BBOBJS = \
+ pg_basebackup.o \
+ bbstreamer_file.o \
+ bbstreamer_inject.o \
+ bbstreamer_tar.o
+
all: pg_basebackup pg_receivewal pg_recvlogical
-pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
- $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
+ $(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils
$(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -61,7 +67,7 @@ uninstall:
clean distclean maintainer-clean:
rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \
- pg_basebackup.o pg_receivewal.o pg_recvlogical.o \
+ $(BBOBJS) pg_receivewal.o pg_recvlogical.o \
$(OBJS)
rm -rf tmp_check
diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
new file mode 100644
index 00000000000..b24dc848c1b
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer.h
@@ -0,0 +1,217 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer.h
+ *
+ * Each tar archive returned by the server is passed to one or more
+ * bbstreamer objects for further processing. The bbstreamer may do
+ * something simple, like write the archive to a file, perhaps after
+ * compressing it, but it can also do more complicated things, like
+ * annotating the byte stream to indicate which parts of the data
+ * correspond to tar headers or trailing padding, vs. which parts are
+ * payload data. A subsequent bbstreamer may use this information to
+ * make further decisions about how to process the data; for example,
+ * it might choose to modify the archive contents.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef BBSTREAMER_H
+#define BBSTREAMER_H
+
+#include "lib/stringinfo.h"
+#include "pqexpbuffer.h"
+
+struct bbstreamer;
+struct bbstreamer_ops;
+typedef struct bbstreamer bbstreamer;
+typedef struct bbstreamer_ops bbstreamer_ops;
+
+/*
+ * Each chunk of archive data passed to a bbstreamer is classified into one
+ * of these categories. When data is first received from the remote server,
+ * each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will
+ * be of whatever size the remote server chose to send.
+ *
+ * If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all
+ * chunks should be labelled as one of the other types listed here. In
+ * addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and
+ * exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if
+ * that means a zero-length call. There can be any number of
+ * BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There
+ * should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
+ * last BBSTREAMER_MEMBER_TRAILER chunk.
+ *
+ * In theory, we could need other classifications here, such as a way of
+ * indicating an archive header, but the "tar" format doesn't need anything
+ * else, so for the time being there's no point.
+ */
+typedef enum
+{
+ BBSTREAMER_UNKNOWN,
+ BBSTREAMER_MEMBER_HEADER,
+ BBSTREAMER_MEMBER_CONTENTS,
+ BBSTREAMER_MEMBER_TRAILER,
+ BBSTREAMER_ARCHIVE_TRAILER
+} bbstreamer_archive_context;
+
+/*
+ * Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER,
+ * BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also
+ * pass a pointer to an instance of this struct. The details are expected
+ * to be present in the archive header and used to fill the struct, after
+ * which all subsequent calls for the same archive member are expected to
+ * pass the same details.
+ */
+typedef struct
+{
+ char pathname[MAXPGPATH];
+ pgoff_t size;
+ mode_t mode;
+ uid_t uid;
+ gid_t gid;
+ bool is_directory;
+ bool is_link;
+ char linktarget[MAXPGPATH];
+} bbstreamer_member;
+
+/*
+ * Generally, each type of bbstreamer will define its own struct, but the
+ * first element should be 'bbstreamer base'. A bbstreamer that does not
+ * require any additional private data could use this structure directly.
+ *
+ * bbs_ops is a pointer to the bbstreamer_ops object which contains the
+ * function pointers appropriate to this type of bbstreamer.
+ *
+ * bbs_next is a pointer to the successor bbstreamer, for those types of
+ * bbstreamer which forward data to a successor. It need not be used and
+ * should be set to NULL when not relevant.
+ *
+ * bbs_buffer is a buffer for accumulating data for temporary storage. Each
+ * type of bbstreamer makes its own decisions about whether and how to use
+ * this buffer.
+ */
+struct bbstreamer
+{
+ const bbstreamer_ops *bbs_ops;
+ bbstreamer *bbs_next;
+ StringInfoData bbs_buffer;
+};
+
+/*
+ * There are three callbacks for a bbstreamer. The 'content' callback is
+ * called repeatedly, as described in the bbstreamer_archive_context comments.
+ * Then, the 'finalize' callback is called once at the end, to give the
+ * bbstreamer a chance to perform cleanup such as closing files. Finally,
+ * because this code is running in a frontend environment where, as of this
+ * writing, there are no memory contexts, the 'free' callback is called to
+ * release memory. These callbacks should always be invoked using the static
+ * inline functions defined below.
+ */
+struct bbstreamer_ops
+{
+ void (*content) (bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+ void (*finalize) (bbstreamer *streamer);
+ void (*free) (bbstreamer *streamer);
+};
+
+/* Send some content to a bbstreamer. */
+static inline void
+bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->content(streamer, member, data, len, context);
+}
+
+/* Finalize a bbstreamer. */
+static inline void
+bbstreamer_finalize(bbstreamer *streamer)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->finalize(streamer);
+}
+
+/* Free a bbstreamer. */
+static inline void
+bbstreamer_free(bbstreamer *streamer)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->free(streamer);
+}
+
+/*
+ * This is a convenience method for use when implementing a bbstreamer; it is
+ * not for use by outside callers. It adds the amount of data specified by
+ * 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data'
+ * accordingly.
+ */
+static inline void
+bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len,
+ int nbytes)
+{
+ Assert(nbytes <= *len);
+
+ appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
+ *len -= nbytes;
+ *data += nbytes;
+}
+
+/*
+ * This is a convenence method for use when implementing a bbstreamer; it is
+ * not for use by outsider callers. It attempts to add enough data to the
+ * bbstreamer's buffer to reach a length of target_bytes and adjusts '*len'
+ * and '*data' accordingly. It returns true if the target length has been
+ * reached and false otherwise.
+ */
+static inline bool
+bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len,
+ int target_bytes)
+{
+ int buflen = streamer->bbs_buffer.len;
+
+ if (buflen >= target_bytes)
+ {
+ /* Target length already reached; nothing to do. */
+ return true;
+ }
+
+ if (buflen + *len < target_bytes)
+ {
+ /* Not enough data to reach target length; buffer all of it. */
+ bbstreamer_buffer_bytes(streamer, data, len, *len);
+ return false;
+ }
+
+ /* Buffer just enough to reach the target length. */
+ bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
+ return true;
+}
+
+/*
+ * Functions for creating bbstreamer objects of various types. See the header
+ * comments for each of these functions for details.
+ */
+extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file);
+extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file,
+ int compresslevel);
+extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *));
+
+extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
+extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
+
+extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next,
+ bool is_recovery_guc_supported,
+ PQExpBuffer recoveryconfcontents);
+extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname,
+ char *data, int len);
+
+#endif
diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/bbstreamer_file.c
new file mode 100644
index 00000000000..03e1ea25505
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_file.c
@@ -0,0 +1,579 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_file.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_file.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include <unistd.h>
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+typedef struct bbstreamer_plain_writer
+{
+ bbstreamer base;
+ char *pathname;
+ FILE *file;
+ bool should_close_file;
+} bbstreamer_plain_writer;
+
+#ifdef HAVE_LIBZ
+typedef struct bbstreamer_gzip_writer
+{
+ bbstreamer base;
+ char *pathname;
+ gzFile gzfile;
+} bbstreamer_gzip_writer;
+#endif
+
+typedef struct bbstreamer_extractor
+{
+ bbstreamer base;
+ char *basepath;
+ const char *(*link_map) (const char *);
+ void (*report_output_file) (const char *);
+ char filename[MAXPGPATH];
+ FILE *file;
+} bbstreamer_extractor;
+
+static void bbstreamer_plain_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_plain_writer_finalize(bbstreamer *streamer);
+static void bbstreamer_plain_writer_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_plain_writer_ops = {
+ .content = bbstreamer_plain_writer_content,
+ .finalize = bbstreamer_plain_writer_finalize,
+ .free = bbstreamer_plain_writer_free
+};
+
+#ifdef HAVE_LIBZ
+static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
+static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
+static const char *get_gz_error(gzFile gzf);
+
+const bbstreamer_ops bbstreamer_gzip_writer_ops = {
+ .content = bbstreamer_gzip_writer_content,
+ .finalize = bbstreamer_gzip_writer_finalize,
+ .free = bbstreamer_gzip_writer_free
+};
+#endif
+
+static void bbstreamer_extractor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_extractor_finalize(bbstreamer *streamer);
+static void bbstreamer_extractor_free(bbstreamer *streamer);
+static void extract_directory(const char *filename, mode_t mode);
+static void extract_link(const char *filename, const char *linktarget);
+static FILE *create_file_for_extract(const char *filename, mode_t mode);
+
+const bbstreamer_ops bbstreamer_extractor_ops = {
+ .content = bbstreamer_extractor_content,
+ .finalize = bbstreamer_extractor_finalize,
+ .free = bbstreamer_extractor_free
+};
+
+/*
+ * Create a bbstreamer that just writes data to a file.
+ *
+ * The caller must specify a pathname and may specify a file. The pathname is
+ * used for error-reporting purposes either way. If file is NULL, the pathname
+ * also identifies the file to which the data should be written: it is opened
+ * for writing and closed when done. If file is not NULL, the data is written
+ * there.
+ */
+bbstreamer *
+bbstreamer_plain_writer_new(char *pathname, FILE *file)
+{
+ bbstreamer_plain_writer *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_plain_writer));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_plain_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+ streamer->file = file;
+
+ if (file == NULL)
+ {
+ streamer->file = fopen(pathname, "wb");
+ if (streamer->file == NULL)
+ {
+ pg_log_error("could not create file \"%s\": %m", pathname);
+ exit(1);
+ }
+ streamer->should_close_file = true;
+ }
+
+ return &streamer->base;
+}
+
+/*
+ * Write archive content to file.
+ */
+static void
+bbstreamer_plain_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member, const char *data,
+ int len, bbstreamer_archive_context context)
+{
+ bbstreamer_plain_writer *mystreamer;
+
+ mystreamer = (bbstreamer_plain_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (fwrite(data, len, 1, mystreamer->file) != 1)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to file \"%s\": %m",
+ mystreamer->pathname);
+ exit(1);
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a plain file consists of closing
+ * the file if we opened it, but not if the caller provided it.
+ */
+static void
+bbstreamer_plain_writer_finalize(bbstreamer *streamer)
+{
+ bbstreamer_plain_writer *mystreamer;
+
+ mystreamer = (bbstreamer_plain_writer *) streamer;
+
+ if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
+ {
+ pg_log_error("could not close file \"%s\": %m",
+ mystreamer->pathname);
+ exit(1);
+ }
+
+ mystreamer->file = NULL;
+ mystreamer->should_close_file = false;
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_plain_writer_free(bbstreamer *streamer)
+{
+ bbstreamer_plain_writer *mystreamer;
+
+ mystreamer = (bbstreamer_plain_writer *) streamer;
+
+ Assert(!mystreamer->should_close_file);
+ Assert(mystreamer->base.bbs_next == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Create a bbstreamer that just compresses data using gzip, and then writes
+ * it to a file.
+ *
+ * As in the case of bbstreamer_plain_writer_new, pathname is always used
+ * for error reporting purposes; if file is NULL, it is also the opened and
+ * closed so that the data may be written there.
+ */
+bbstreamer *
+bbstreamer_gzip_writer_new(char *pathname, FILE *file, int compresslevel)
+{
+#ifdef HAVE_LIBZ
+ bbstreamer_gzip_writer *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_gzip_writer));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_gzip_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+
+ if (file == NULL)
+ {
+ streamer->gzfile = gzopen(pathname, "wb");
+ if (streamer->gzfile == NULL)
+ {
+ pg_log_error("could not create compressed file \"%s\": %m",
+ pathname);
+ exit(1);
+ }
+ }
+ else
+ {
+ int fd = dup(fileno(file));
+
+ if (fd < 0)
+ {
+ pg_log_error("could not duplicate stdout: %m");
+ exit(1);
+ }
+
+ streamer->gzfile = gzdopen(fd, "wb");
+ if (streamer->gzfile == NULL)
+ {
+ pg_log_error("could not open output file: %m");
+ exit(1);
+ }
+ }
+
+ if (gzsetparams(streamer->gzfile, compresslevel,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ pg_log_error("could not set compression level %d: %s",
+ compresslevel, get_gz_error(streamer->gzfile));
+ exit(1);
+ }
+
+ return &streamer->base;
+#else
+ pg_log_error("this build does not support compression");
+ exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Write archive content to gzip file.
+ */
+static void
+bbstreamer_gzip_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member, const char *data,
+ int len, bbstreamer_archive_context context)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (gzwrite(mystreamer->gzfile, data, len) != len)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to compressed file \"%s\": %s",
+ mystreamer->pathname, get_gz_error(mystreamer->gzfile));
+ exit(1);
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a gzip file consists of just
+ * calling gzclose.
+ *
+ * It makes no difference whether we opened the file or the caller did it,
+ * because libz provides no way of avoiding a close on the underling file
+ * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
+ * work around this issue, so that the behavior from the caller's viewpoint
+ * is the same as for bbstreamer_plain_writer.
+ */
+static void
+bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ if (gzclose(mystreamer->gzfile) != 0)
+ {
+ pg_log_error("could not close compressed file \"%s\": %s",
+ mystreamer->pathname,
+ get_gz_error(mystreamer->gzfile));
+ exit(1);
+ }
+
+ mystreamer->gzfile = NULL;
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_gzip_writer_free(bbstreamer *streamer)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ Assert(mystreamer->base.bbs_next == NULL);
+ Assert(mystreamer->gzfile == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Helper function for libz error reporting.
+ */
+static const char *
+get_gz_error(gzFile gzf)
+{
+ int errnum;
+ const char *errmsg;
+
+ errmsg = gzerror(gzf, &errnum);
+ if (errnum == Z_ERRNO)
+ return strerror(errno);
+ else
+ return errmsg;
+}
+#endif
+
+/*
+ * Create a bbstreamer that extracts an archive.
+ *
+ * All pathnames in the archive are interpreted relative to basepath.
+ *
+ * Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here
+ * with untyped chunks; we need typed chunks which follow the rules described
+ * in bbstreamer.h. Assuming we have that, we don't need to worry about the
+ * original archive format; it's enough to just look at the member information
+ * provided and write to the corresponding file.
+ *
+ * 'link_map' is a function that will be applied to the target of any
+ * symbolic link, and which should return a replacement pathname to be used
+ * in its place. If NULL, the symbolic link target is used without
+ * modification.
+ *
+ * 'report_output_file' is a function that will be called each time we open a
+ * new output file. The pathname to that file is passed as an argument. If
+ * NULL, the call is skipped.
+ */
+bbstreamer *
+bbstreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *))
+{
+ bbstreamer_extractor *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_extractor));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_extractor_ops;
+ streamer->basepath = pstrdup(basepath);
+ streamer->link_map = link_map;
+ streamer->report_output_file = report_output_file;
+
+ return &streamer->base;
+}
+
+/*
+ * Extract archive contents to the filesystem.
+ */
+static void
+bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+ int fnamelen;
+
+ Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
+ Assert(context != BBSTREAMER_UNKNOWN);
+
+ switch (context)
+ {
+ case BBSTREAMER_MEMBER_HEADER:
+ Assert(mystreamer->file == NULL);
+
+ /* Prepend basepath. */
+ snprintf(mystreamer->filename, sizeof(mystreamer->filename),
+ "%s/%s", mystreamer->basepath, member->pathname);
+
+ /* Remove any trailing slash. */
+ fnamelen = strlen(mystreamer->filename);
+ if (mystreamer->filename[fnamelen - 1] == '/')
+ mystreamer->filename[fnamelen - 1] = '\0';
+
+ /* Dispatch based on file type. */
+ if (member->is_directory)
+ extract_directory(mystreamer->filename, member->mode);
+ else if (member->is_link)
+ {
+ const char *linktarget = member->linktarget;
+
+ if (mystreamer->link_map)
+ linktarget = mystreamer->link_map(linktarget);
+ extract_link(mystreamer->filename, linktarget);
+ }
+ else
+ mystreamer->file =
+ create_file_for_extract(mystreamer->filename,
+ member->mode);
+
+ /* Report output file change. */
+ if (mystreamer->report_output_file)
+ mystreamer->report_output_file(mystreamer->filename);
+ break;
+
+ case BBSTREAMER_MEMBER_CONTENTS:
+ if (mystreamer->file == NULL)
+ break;
+
+ errno = 0;
+ if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_log_error("could not write to file \"%s\": %m",
+ mystreamer->filename);
+ exit(1);
+ }
+ break;
+
+ case BBSTREAMER_MEMBER_TRAILER:
+ if (mystreamer->file == NULL)
+ break;
+ fclose(mystreamer->file);
+ mystreamer->file = NULL;
+ break;
+
+ case BBSTREAMER_ARCHIVE_TRAILER:
+ break;
+
+ default:
+ /* Shouldn't happen. */
+ pg_log_error("unexpected state while extracting archive");
+ exit(1);
+ }
+}
+
+/*
+ * Create a directory.
+ */
+static void
+extract_directory(const char *filename, mode_t mode)
+{
+ if (mkdir(filename, pg_dir_create_mode) != 0)
+ {
+ /*
+ * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will
+ * have been created by the wal receiver process. Also, when the WAL
+ * directory location was specified, pg_wal (or pg_xlog) has already
+ * been created as a symbolic link before starting the actual backup.
+ * So just ignore creation failures on related directories.
+ */
+ if (!((pg_str_endswith(filename, "/pg_wal") ||
+ pg_str_endswith(filename, "/pg_xlog") ||
+ pg_str_endswith(filename, "/archive_status")) &&
+ errno == EEXIST))
+ {
+ pg_log_error("could not create directory \"%s\": %m",
+ filename);
+ exit(1);
+ }
+ }
+
+#ifndef WIN32
+ if (chmod(filename, mode))
+ {
+ pg_log_error("could not set permissions on directory \"%s\": %m",
+ filename);
+ exit(1);
+ }
+#endif
+}
+
+/*
+ * Create a symbolic link.
+ *
+ * It's most likely a link in pg_tblspc directory, to the location of a
+ * tablespace. Apply any tablespace mapping given on the command line
+ * (--tablespace-mapping). (We blindly apply the mapping without checking that
+ * the link really is inside pg_tblspc. We don't expect there to be other
+ * symlinks in a data directory, but if there are, you can call it an
+ * undocumented feature that you can map them too.)
+ */
+static void
+extract_link(const char *filename, const char *linktarget)
+{
+ if (symlink(linktarget, filename) != 0)
+ {
+ pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+ filename, linktarget);
+ exit(1);
+ }
+}
+
+/*
+ * Create a regular file.
+ *
+ * Return the resulting handle so we can write the content to the file.
+ */
+static FILE *
+create_file_for_extract(const char *filename, mode_t mode)
+{
+ FILE *file;
+
+ file = fopen(filename, "wb");
+ if (file == NULL)
+ {
+ pg_log_error("could not create file \"%s\": %m", filename);
+ exit(1);
+ }
+
+#ifndef WIN32
+ if (chmod(filename, mode))
+ {
+ pg_log_error("could not set permissions on file \"%s\": %m",
+ filename);
+ exit(1);
+ }
+#endif
+
+ return file;
+}
+
+/*
+ * End-of-stream processing for extracting an archive.
+ *
+ * There's nothing to do here but sanity checking.
+ */
+static void
+bbstreamer_extractor_finalize(bbstreamer *streamer)
+{
+ bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+
+ Assert(mystreamer->file == NULL);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_extractor_free(bbstreamer *streamer)
+{
+ bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+
+ pfree(mystreamer->basepath);
+ pfree(mystreamer);
+}
diff --git a/src/bin/pg_basebackup/bbstreamer_inject.c b/src/bin/pg_basebackup/bbstreamer_inject.c
new file mode 100644
index 00000000000..4d15251fdc7
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_inject.c
@@ -0,0 +1,250 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_inject.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_inject.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "bbstreamer.h"
+#include "common/file_perm.h"
+#include "common/logging.h"
+
+typedef struct bbstreamer_recovery_injector
+{
+ bbstreamer base;
+ bool skip_file;
+ bool is_recovery_guc_supported;
+ bool is_postgresql_auto_conf;
+ bool found_postgresql_auto_conf;
+ PQExpBuffer recoveryconfcontents;
+ bbstreamer_member member;
+} bbstreamer_recovery_injector;
+
+static void bbstreamer_recovery_injector_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer);
+static void bbstreamer_recovery_injector_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_recovery_injector_ops = {
+ .content = bbstreamer_recovery_injector_content,
+ .finalize = bbstreamer_recovery_injector_finalize,
+ .free = bbstreamer_recovery_injector_free
+};
+
+/*
+ * Create a bbstreamer that can edit recoverydata into an archive stream.
+ *
+ * The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as
+ * per the conventions described in bbstreamer.h; the chunks forwarded to
+ * the next bbstreamer will be similarly typed, but the
+ * BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
+ * edited the archive stream.
+ *
+ * Our goal is to do one of the following three things with the content passed
+ * via recoveryconfcontents: (1) if is_recovery_guc_supported is false, then
+ * put the content into recovery.conf, replacing any existing archive member
+ * by that name; (2) if is_recovery_guc_supported is true and
+ * postgresql.auto.conf exists in the archive, then append the content
+ * provided to the existing file; and (3) if is_recovery_guc_supported is
+ * true but postgresql.auto.conf does not exist in the archive, then create
+ * it with the specified content.
+ *
+ * In addition, if is_recovery_guc_supported is true, then we create a
+ * zero-length standby.signal file, dropping any file with that name from
+ * the archive.
+ */
+extern bbstreamer *
+bbstreamer_recovery_injector_new(bbstreamer *next,
+ bool is_recovery_guc_supported,
+ PQExpBuffer recoveryconfcontents)
+{
+ bbstreamer_recovery_injector *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_recovery_injector));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_recovery_injector_ops;
+ streamer->base.bbs_next = next;
+ streamer->is_recovery_guc_supported = is_recovery_guc_supported;
+ streamer->recoveryconfcontents = recoveryconfcontents;
+
+ return &streamer->base;
+}
+
+/*
+ * Handle each chunk of tar content while injecting recovery configuration.
+ */
+static void
+bbstreamer_recovery_injector_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_recovery_injector *mystreamer;
+
+ mystreamer = (bbstreamer_recovery_injector *) streamer;
+ Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
+
+ switch (context)
+ {
+ case BBSTREAMER_MEMBER_HEADER:
+ /* Must copy provided data so we have the option to modify it. */
+ memcpy(&mystreamer->member, member, sizeof(bbstreamer_member));
+
+ /*
+ * On v12+, skip standby.signal and edit postgresql.auto.conf; on
+ * older versions, skip recovery.conf.
+ */
+ if (mystreamer->is_recovery_guc_supported)
+ {
+ mystreamer->skip_file =
+ (strcmp(member->pathname, "standby.signal") == 0);
+ mystreamer->is_postgresql_auto_conf =
+ (strcmp(member->pathname, "postgresql.auto.conf") == 0);
+ if (mystreamer->is_postgresql_auto_conf)
+ {
+ /* Remember we saw it so we don't add it again. */
+ mystreamer->found_postgresql_auto_conf = true;
+
+ /* Increment length by data to be injected. */
+ mystreamer->member.size +=
+ mystreamer->recoveryconfcontents->len;
+
+ /*
+ * Zap data and len because the archive header is no
+ * longer valid; some subsequent bbstreamer must
+ * regenerate it if it's necessary.
+ */
+ data = NULL;
+ len = 0;
+ }
+ }
+ else
+ mystreamer->skip_file =
+ (strcmp(member->pathname, "recovery.conf") == 0);
+
+ /* Do not forward if the file is to be skipped. */
+ if (mystreamer->skip_file)
+ return;
+ break;
+
+ case BBSTREAMER_MEMBER_CONTENTS:
+ /* Do not forward if the file is to be skipped. */
+ if (mystreamer->skip_file)
+ return;
+ break;
+
+ case BBSTREAMER_MEMBER_TRAILER:
+ /* Do not forward it the file is to be skipped. */
+ if (mystreamer->skip_file)
+ return;
+
+ /* Append provided content to whatever we already sent. */
+ if (mystreamer->is_postgresql_auto_conf)
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len,
+ BBSTREAMER_MEMBER_CONTENTS);
+ break;
+
+ case BBSTREAMER_ARCHIVE_TRAILER:
+ if (mystreamer->is_recovery_guc_supported)
+ {
+ /*
+ * If we didn't already find (and thus modify)
+ * postgresql.auto.conf, inject it as an additional archive
+ * member now.
+ */
+ if (!mystreamer->found_postgresql_auto_conf)
+ bbstreamer_inject_file(mystreamer->base.bbs_next,
+ "postgresql.auto.conf",
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len);
+
+ /* Inject empty standby.signal file. */
+ bbstreamer_inject_file(mystreamer->base.bbs_next,
+ "standby.signal", "", 0);
+ }
+ else
+ {
+ /* Inject recovery.conf file with specified contents. */
+ bbstreamer_inject_file(mystreamer->base.bbs_next,
+ "recovery.conf",
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len);
+ }
+
+ /* Nothing to do here. */
+ break;
+
+ default:
+ /* Shouldn't happen. */
+ pg_log_error("unexpected state while injecting recovery settings");
+ exit(1);
+ }
+
+ bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
+ data, len, context);
+}
+
+/*
+ * End-of-stream processing for this bbstreamer.
+ */
+static void
+bbstreamer_recovery_injector_finalize(bbstreamer *streamer)
+{
+ bbstreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_recovery_injector_free(bbstreamer *streamer)
+{
+ bbstreamer_free(streamer->bbs_next);
+ pfree(streamer);
+}
+
+/*
+ * Inject a member into the archive with specified contents.
+ */
+void
+bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
+ int len)
+{
+ bbstreamer_member member;
+
+ strlcpy(member.pathname, pathname, MAXPGPATH);
+ member.size = len;
+ member.mode = pg_file_create_mode;
+ member.is_directory = false;
+ member.is_link = false;
+ member.linktarget[0] = '\0';
+
+ /*
+ * There seems to be no principled argument for these values, but they are
+ * what PostgreSQL has historically used.
+ */
+ member.uid = 04000;
+ member.gid = 02000;
+
+ /*
+ * We don't know here how to generate valid member headers and trailers
+ * for the archiving format in use, so if those are needed, some successor
+ * bbstreamer will have to generate them using the data from 'member'.
+ */
+ bbstreamer_content(streamer, &member, NULL, 0,
+ BBSTREAMER_MEMBER_HEADER);
+ bbstreamer_content(streamer, &member, data, len,
+ BBSTREAMER_MEMBER_CONTENTS);
+ bbstreamer_content(streamer, &member, NULL, 0,
+ BBSTREAMER_MEMBER_TRAILER);
+}
diff --git a/src/bin/pg_basebackup/bbstreamer_tar.c b/src/bin/pg_basebackup/bbstreamer_tar.c
new file mode 100644
index 00000000000..5a9f587dca1
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_tar.c
@@ -0,0 +1,444 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_tar.c
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_tar.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <time.h>
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "pgtar.h"
+
+typedef struct bbstreamer_tar_parser
+{
+ bbstreamer base;
+ bbstreamer_archive_context next_context;
+ bbstreamer_member member;
+ size_t file_bytes_sent;
+ size_t pad_bytes_expected;
+} bbstreamer_tar_parser;
+
+typedef struct bbstreamer_tar_archiver
+{
+ bbstreamer base;
+ bool rearchive_member;
+} bbstreamer_tar_archiver;
+
+static void bbstreamer_tar_parser_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_tar_parser_finalize(bbstreamer *streamer);
+static void bbstreamer_tar_parser_free(bbstreamer *streamer);
+static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer);
+
+const bbstreamer_ops bbstreamer_tar_parser_ops = {
+ .content = bbstreamer_tar_parser_content,
+ .finalize = bbstreamer_tar_parser_finalize,
+ .free = bbstreamer_tar_parser_free
+};
+
+static void bbstreamer_tar_archiver_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer);
+static void bbstreamer_tar_archiver_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_tar_archiver_ops = {
+ .content = bbstreamer_tar_archiver_content,
+ .finalize = bbstreamer_tar_archiver_finalize,
+ .free = bbstreamer_tar_archiver_free
+};
+
+/*
+ * Create a bbstreamer that can parse a stream of content as tar data.
+ *
+ * The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer
+ * specified by 'next' will receive a series of typed chunks, as per the
+ * conventions described in bbstreamer.h.
+ */
+extern bbstreamer *
+bbstreamer_tar_parser_new(bbstreamer *next)
+{
+ bbstreamer_tar_parser *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_tar_parser));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_tar_parser_ops;
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ streamer->next_context = BBSTREAMER_MEMBER_HEADER;
+
+ return &streamer->base;
+}
+
+/*
+ * Parse unknown content as tar data.
+ */
+static void
+bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
+ size_t nbytes;
+
+ /* Expect unparsed input. */
+ Assert(member == NULL);
+ Assert(context == BBSTREAMER_UNKNOWN);
+
+ while (len > 0)
+ {
+ switch (mystreamer->next_context)
+ {
+ case BBSTREAMER_MEMBER_HEADER:
+
+ /*
+ * If we're expecting an archive member header, accumulate a
+ * full block of data before doing anything further.
+ */
+ if (!bbstreamer_buffer_until(streamer, &data, &len,
+ TAR_BLOCK_SIZE))
+ return;
+
+ /*
+ * Now we can process the header and get ready to process the
+ * file contents; however, we might find out that what we
+ * thought was the next file header is actually the start of
+ * the archive trailer. Switch modes accordingly.
+ */
+ if (bbstreamer_tar_header(mystreamer))
+ {
+ if (mystreamer->member.size == 0)
+ {
+ /* No content; trailer is zero-length. */
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ BBSTREAMER_MEMBER_TRAILER);
+
+ /* Expect next header. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ }
+ else
+ {
+ /* Expect contents. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS;
+ }
+ mystreamer->base.bbs_buffer.len = 0;
+ mystreamer->file_bytes_sent = 0;
+ }
+ else
+ mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER;
+ break;
+
+ case BBSTREAMER_MEMBER_CONTENTS:
+
+ /*
+ * Send as much content as we have, but not more than the
+ * remaining file length.
+ */
+ Assert(mystreamer->file_bytes_sent < mystreamer->member.size);
+ nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
+ nbytes = Min(nbytes, len);
+ Assert(nbytes > 0);
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, nbytes,
+ BBSTREAMER_MEMBER_CONTENTS);
+ mystreamer->file_bytes_sent += nbytes;
+ data += nbytes;
+ len -= nbytes;
+
+ /*
+ * If we've not yet sent the whole file, then there's more
+ * content to come; otherwise, it's time to expect the file
+ * trailer.
+ */
+ Assert(mystreamer->file_bytes_sent <= mystreamer->member.size);
+ if (mystreamer->file_bytes_sent == mystreamer->member.size)
+ {
+ if (mystreamer->pad_bytes_expected == 0)
+ {
+ /* Trailer is zero-length. */
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ BBSTREAMER_MEMBER_TRAILER);
+
+ /* Expect next header. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ }
+ else
+ {
+ /* Trailer is not zero-length. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER;
+ }
+ mystreamer->base.bbs_buffer.len = 0;
+ }
+ break;
+
+ case BBSTREAMER_MEMBER_TRAILER:
+
+ /*
+ * If we're expecting an archive member trailer, accumulate
+ * the expected number of padding bytes before sending
+ * anything onward.
+ */
+ if (!bbstreamer_buffer_until(streamer, &data, &len,
+ mystreamer->pad_bytes_expected))
+ return;
+
+ /* OK, now we can send it. */
+ bbstreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, mystreamer->pad_bytes_expected,
+ BBSTREAMER_MEMBER_TRAILER);
+
+ /* Expect next file header. */
+ mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ mystreamer->base.bbs_buffer.len = 0;
+ break;
+
+ case BBSTREAMER_ARCHIVE_TRAILER:
+
+ /*
+ * We've seen an end-of-archive indicator, so anything more is
+ * buffered and sent as part of the archive trailer. But we
+ * don't expect more than 2 blocks.
+ */
+ bbstreamer_buffer_bytes(streamer, &data, &len, len);
+ if (len > 2 * TAR_BLOCK_SIZE)
+ {
+ pg_log_error("tar file trailer exceeds 2 blocks");
+ exit(1);
+ }
+ return;
+
+ default:
+ /* Shouldn't happen. */
+ pg_log_error("unexpected state while parsing tar archive");
+ exit(1);
+ }
+ }
+}
+
+/*
+ * Parse a file header within a tar stream.
+ *
+ * The return value is true if we found a file header and passed it on to the
+ * next bbstreamer; it is false if we have reached the archive trailer.
+ */
+static bool
+bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
+{
+ bool has_nonzero_byte = false;
+ int i;
+ bbstreamer_member *member = &mystreamer->member;
+ char *buffer = mystreamer->base.bbs_buffer.data;
+
+ Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
+
+ /* Check whether we've got a block of all zero bytes. */
+ for (i = 0; i < TAR_BLOCK_SIZE; ++i)
+ {
+ if (buffer[i] != '\0')
+ {
+ has_nonzero_byte = true;
+ break;
+ }
+ }
+
+ /*
+ * If the entire block was zeros, this is the end of the archive, not the
+ * start of the next file.
+ */
+ if (!has_nonzero_byte)
+ return false;
+
+ /*
+ * Parse key fields out of the header.
+ *
+ * FIXME: It's terrible that we use hard-coded values here instead of some
+ * more principled approach. It's been like this for a long time, but we
+ * ought to do better.
+ */
+ strlcpy(member->pathname, &buffer[0], MAXPGPATH);
+ if (member->pathname[0] == '\0')
+ {
+ pg_log_error("tar member has empty name");
+ exit(1);
+ }
+ member->size = read_tar_number(&buffer[124], 12);
+ member->mode = read_tar_number(&buffer[100], 8);
+ member->uid = read_tar_number(&buffer[108], 8);
+ member->gid = read_tar_number(&buffer[116], 8);
+ member->is_directory = (buffer[156] == '5');
+ member->is_link = (buffer[156] == '2');
+ if (member->is_link)
+ strlcpy(member->linktarget, &buffer[157], 100);
+
+ /* Compute number of padding bytes. */
+ mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
+
+ /* Forward the entire header to the next bbstreamer. */
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ buffer, TAR_BLOCK_SIZE,
+ BBSTREAMER_MEMBER_HEADER);
+
+ return true;
+}
+
+/*
+ * End-of-stream processing for a tar parser.
+ */
+static void
+bbstreamer_tar_parser_finalize(bbstreamer *streamer)
+{
+ bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
+
+ if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER &&
+ (mystreamer->next_context != BBSTREAMER_MEMBER_HEADER ||
+ mystreamer->base.bbs_buffer.len > 0))
+ {
+ pg_log_error("COPY stream ended before last file was finished");
+ exit(1);
+ }
+
+ /* Send the archive trailer, even if empty. */
+ bbstreamer_content(streamer->bbs_next, NULL,
+ streamer->bbs_buffer.data, streamer->bbs_buffer.len,
+ BBSTREAMER_ARCHIVE_TRAILER);
+
+ /* Now finalize successor. */
+ bbstreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar parser.
+ */
+static void
+bbstreamer_tar_parser_free(bbstreamer *streamer)
+{
+ pfree(streamer->bbs_buffer.data);
+ bbstreamer_free(streamer->bbs_next);
+}
+
+/*
+ * Create an bbstreamer that can generate a tar archive.
+ *
+ * This is intended to be usable either for generating a brand-new tar archive
+ * or for modifying one on the fly. The input should be a series of typed
+ * chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for
+ * bbstreamer_tar_parser_content.
+ */
+extern bbstreamer *
+bbstreamer_tar_archiver_new(bbstreamer *next)
+{
+ bbstreamer_tar_archiver *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_tar_archiver));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_tar_archiver_ops;
+ streamer->base.bbs_next = next;
+
+ return &streamer->base;
+}
+
+/*
+ * Fix up the stream of input chunks to create a valid tar file.
+ *
+ * If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
+ * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
+ * passed through without change. Any other size is a fatal error (and
+ * indicates a bug).
+ *
+ * Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the
+ * corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from
+ * scratch. Specifically, we construct a block of zero bytes sufficient to
+ * pad out to a block boundary, as required by the tar format. Other
+ * BBSTREAMER_MEMBER_TRAILER chunks are passed through without change.
+ *
+ * Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change.
+ *
+ * The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two
+ * blocks of zero bytes. Not all tar programs require this, but apparently
+ * some do. The server does not supply this trailer. If no archive trailer is
+ * present, one will be added by bbstreamer_tar_parser_finalize.
+ */
+static void
+bbstreamer_tar_archiver_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer;
+ char buffer[2 * TAR_BLOCK_SIZE];
+
+ Assert(context != BBSTREAMER_UNKNOWN);
+
+ if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
+ {
+ Assert(len == 0);
+
+ /* Replace zero-length tar header with a newly constructed one. */
+ tarCreateHeader(buffer, member->pathname, NULL,
+ member->size, member->mode, member->uid, member->gid,
+ time(NULL));
+ data = buffer;
+ len = TAR_BLOCK_SIZE;
+
+ /* Also make a note to replace padding, in case size changed. */
+ mystreamer->rearchive_member = true;
+ }
+ else if (context == BBSTREAMER_MEMBER_TRAILER &&
+ mystreamer->rearchive_member)
+ {
+ int pad_bytes = tarPaddingBytesRequired(member->size);
+
+ /* Also replace padding, if we regenerated the header. */
+ memset(buffer, 0, pad_bytes);
+ data = buffer;
+ len = pad_bytes;
+
+ /* Don't do this agian unless we replace another header. */
+ mystreamer->rearchive_member = false;
+ }
+ else if (context == BBSTREAMER_ARCHIVE_TRAILER)
+ {
+ /* Trailer should always be two blocks of zero bytes. */
+ memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
+ data = buffer;
+ len = 2 * TAR_BLOCK_SIZE;
+ }
+
+ bbstreamer_content(streamer->bbs_next, member, data, len, context);
+}
+
+/*
+ * End-of-stream processing for a tar archiver.
+ */
+static void
+bbstreamer_tar_archiver_finalize(bbstreamer *streamer)
+{
+ bbstreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar archiver.
+ */
+static void
+bbstreamer_tar_archiver_free(bbstreamer *streamer)
+{
+ bbstreamer_free(streamer->bbs_next);
+ pfree(streamer);
+}
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index cdea3711b7b..169afa5645d 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -28,18 +28,13 @@
#endif
#include "access/xlog_internal.h"
+#include "bbstreamer.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
#include "common/logging.h"
-#include "common/string.h"
#include "fe_utils/option_utils.h"
#include "fe_utils/recovery_gen.h"
-#include "fe_utils/string_utils.h"
#include "getopt_long.h"
-#include "libpq-fe.h"
-#include "pgtar.h"
-#include "pgtime.h"
-#include "pqexpbuffer.h"
#include "receivelog.h"
#include "replication/basebackup.h"
#include "streamutil.h"
@@ -62,34 +57,9 @@ typedef struct TablespaceList
typedef struct WriteTarState
{
int tablespacenum;
- char filename[MAXPGPATH];
- FILE *tarfile;
- char tarhdr[TAR_BLOCK_SIZE];
- bool basetablespace;
- bool in_tarhdr;
- bool skip_file;
- bool is_recovery_guc_supported;
- bool is_postgresql_auto_conf;
- bool found_postgresql_auto_conf;
- int file_padding_len;
- size_t tarhdrsz;
- pgoff_t filesz;
-#ifdef HAVE_LIBZ
- gzFile ztarfile;
-#endif
+ bbstreamer *streamer;
} WriteTarState;
-typedef struct UnpackTarState
-{
- int tablespacenum;
- char current_path[MAXPGPATH];
- char filename[MAXPGPATH];
- const char *mapped_tblspc_path;
- pgoff_t current_len_left;
- int current_padding;
- FILE *file;
-} UnpackTarState;
-
typedef struct WriteManifestState
{
char filename[MAXPGPATH];
@@ -161,10 +131,11 @@ static bool found_existing_xlogdir = false;
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
-/* Progress counters */
+/* Progress indicators */
static uint64 totalsize_kb;
static uint64 totaldone;
static int tablespacecount;
+static const char *progress_filename;
/* Pipe to communicate with background wal receiver process */
#ifndef WIN32
@@ -190,14 +161,15 @@ static PQExpBuffer recoveryconfcontents = NULL;
/* Function headers */
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
-static void progress_report(int tablespacenum, const char *filename, bool force,
- bool finished);
-
-static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void progress_update_filename(const char *filename);
+static void progress_report(int tablespacenum, bool force, bool finished);
+
+static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
+ bbstreamer **manifest_inject_streamer_p,
+ bool is_recovery_guc_supported);
+static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
+ bool tablespacenum);
static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
-static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
- void *callback_data);
static void ReceiveBackupManifest(PGconn *conn);
static void ReceiveBackupManifestChunk(size_t r, char *copybuf,
void *callback_data);
@@ -360,21 +332,6 @@ tablespace_list_append(const char *arg)
}
-#ifdef HAVE_LIBZ
-static const char *
-get_gz_error(gzFile gzf)
-{
- int errnum;
- const char *errmsg;
-
- errmsg = gzerror(gzf, &errnum);
- if (errnum == Z_ERRNO)
- return strerror(errno);
- else
- return errmsg;
-}
-#endif
-
static void
usage(void)
{
@@ -766,6 +723,14 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
}
}
+/*
+ * Callback to update our notion of the current filename.
+ */
+static void
+progress_update_filename(const char *filename)
+{
+ progress_filename = filename;
+}
/*
* Print a progress report based on the global variables. If verbose output
@@ -778,8 +743,7 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
* is moved to the next line.
*/
static void
-progress_report(int tablespacenum, const char *filename,
- bool force, bool finished)
+progress_report(int tablespacenum, bool force, bool finished)
{
int percent;
char totaldone_str[32];
@@ -814,7 +778,7 @@ progress_report(int tablespacenum, const char *filename,
#define VERBOSE_FILENAME_LENGTH 35
if (verbose)
{
- if (!filename)
+ if (!progress_filename)
/*
* No filename given, so clear the status line (used for last
@@ -830,7 +794,7 @@ progress_report(int tablespacenum, const char *filename,
VERBOSE_FILENAME_LENGTH + 5, "");
else
{
- bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+ bool truncate = (strlen(progress_filename) > VERBOSE_FILENAME_LENGTH);
fprintf(stderr,
ngettext("%*s/%s kB (%d%%), %d/%d tablespace (%s%-*.*s)",
@@ -844,7 +808,7 @@ progress_report(int tablespacenum, const char *filename,
truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
/* Truncate filename at beginning if it's too long */
- truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+ truncate ? progress_filename + strlen(progress_filename) - VERBOSE_FILENAME_LENGTH + 3 : progress_filename);
}
}
else
@@ -990,257 +954,170 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
}
/*
- * Write a piece of tar data
+ * Figure out what to do with an archive received from the server based on
+ * the options selected by the user. We may just write the results directly
+ * to a file, or we might compress first, or we might extract the tar file
+ * and write each member separately. This function doesn't do any of that
+ * directly, but it works out what kind of bbstreamer we need to create so
+ * that the right stuff happens when, down the road, we actually receive
+ * the data.
*/
-static void
-writeTarData(WriteTarState *state, char *buf, int r)
+static bbstreamer *
+CreateBackupStreamer(char *archive_name, char *spclocation,
+ bbstreamer **manifest_inject_streamer_p,
+ bool is_recovery_guc_supported)
{
-#ifdef HAVE_LIBZ
- if (state->ztarfile != NULL)
- {
- errno = 0;
- if (gzwrite(state->ztarfile, buf, r) != r)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- pg_log_error("could not write to compressed file \"%s\": %s",
- state->filename, get_gz_error(state->ztarfile));
- exit(1);
- }
- }
- else
-#endif
- {
- errno = 0;
- if (fwrite(buf, r, 1, state->tarfile) != 1)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- pg_log_error("could not write to file \"%s\": %m",
- state->filename);
- exit(1);
- }
- }
-}
+ bbstreamer *streamer;
+ bbstreamer *manifest_inject_streamer = NULL;
+ bool inject_manifest;
+ bool must_parse_archive;
-/*
- * Receive a tar format file from the connection to the server, and write
- * the data from this file directly into a tar file. If compression is
- * enabled, the data will be compressed while written to the file.
- *
- * The file will be named base.tar[.gz] if it's for the main data directory
- * or <tablespaceoid>.tar[.gz] if it's for another tablespace.
- *
- * No attempt to inspect or validate the contents of the file is done.
- */
-static void
-ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
-{
- char zerobuf[TAR_BLOCK_SIZE * 2];
- WriteTarState state;
-
- memset(&state, 0, sizeof(state));
- state.tablespacenum = rownum;
- state.basetablespace = PQgetisnull(res, rownum, 0);
- state.in_tarhdr = true;
+ /*
+ * Normally, we emit the backup manifest as a separate file, but when
+ * we're writing a tarfile to stdout, we don't have that option, so
+ * include it in the one tarfile we've got.
+ */
+ inject_manifest = (format == 't' && strcmp(basedir, "-") == 0 && manifest);
- /* recovery.conf is integrated into postgresql.conf in 12 and newer */
- if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC)
- state.is_recovery_guc_supported = true;
+ /*
+ * We have to parse the archive if (1) we're suppose to extract it, or if
+ * (2) we need to inject backup_manifest or recovery configuration into it.
+ */
+ must_parse_archive = (format == 'p' || inject_manifest ||
+ (spclocation == NULL && writerecoveryconf));
- if (state.basetablespace)
+ if (format == 'p')
{
+ const char *directory;
+
/*
- * Base tablespaces
+ * In plain format, we must extract the archive. The data for the main
+ * tablespace will be written to the base directory, and the data for
+ * other tablespaces will be written to the directory where they're
+ * located on the server, after applying any user-specified tablespace
+ * mappings.
*/
- if (strcmp(basedir, "-") == 0)
- {
-#ifdef WIN32
- _setmode(fileno(stdout), _O_BINARY);
-#endif
-
-#ifdef HAVE_LIBZ
- if (compresslevel != 0)
- {
- int fd = dup(fileno(stdout));
-
- if (fd < 0)
- {
- pg_log_error("could not duplicate stdout: %m");
- exit(1);
- }
-
- state.ztarfile = gzdopen(fd, "wb");
- if (state.ztarfile == NULL)
- {
- pg_log_error("could not open output file: %m");
- exit(1);
- }
-
- if (gzsetparams(state.ztarfile, compresslevel,
- Z_DEFAULT_STRATEGY) != Z_OK)
- {
- pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(state.ztarfile));
- exit(1);
- }
- }
- else
-#endif
- state.tarfile = stdout;
- strcpy(state.filename, "-");
- }
- else
- {
-#ifdef HAVE_LIBZ
- if (compresslevel != 0)
- {
- snprintf(state.filename, sizeof(state.filename),
- "%s/base.tar.gz", basedir);
- state.ztarfile = gzopen(state.filename, "wb");
- if (gzsetparams(state.ztarfile, compresslevel,
- Z_DEFAULT_STRATEGY) != Z_OK)
- {
- pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(state.ztarfile));
- exit(1);
- }
- }
- else
-#endif
- {
- snprintf(state.filename, sizeof(state.filename),
- "%s/base.tar", basedir);
- state.tarfile = fopen(state.filename, "wb");
- }
- }
+ directory = spclocation == NULL ? basedir
+ : get_tablespace_mapping(spclocation);
+ streamer = bbstreamer_extractor_new(directory,
+ get_tablespace_mapping,
+ progress_update_filename);
}
else
{
+ FILE *archive_file;
+ char archive_filename[MAXPGPATH];
+
/*
- * Specific tablespace
+ * In tar format, we just write the archive without extracting it.
+ * Normally, we write it to the archive name provided by the caller,
+ * but when the base directory is "-" that means we need to write
+ * to standard output.
*/
-#ifdef HAVE_LIBZ
- if (compresslevel != 0)
+ if (strcmp(basedir, "-") == 0)
{
- snprintf(state.filename, sizeof(state.filename),
- "%s/%s.tar.gz",
- basedir, PQgetvalue(res, rownum, 0));
- state.ztarfile = gzopen(state.filename, "wb");
- if (gzsetparams(state.ztarfile, compresslevel,
- Z_DEFAULT_STRATEGY) != Z_OK)
- {
- pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(state.ztarfile));
- exit(1);
- }
+ snprintf(archive_filename, sizeof(archive_filename), "-");
+ archive_file = stdout;
}
else
-#endif
{
- snprintf(state.filename, sizeof(state.filename), "%s/%s.tar",
- basedir, PQgetvalue(res, rownum, 0));
- state.tarfile = fopen(state.filename, "wb");
+ snprintf(archive_filename, sizeof(archive_filename),
+ "%s/%s", basedir, archive_name);
+ archive_file = NULL;
}
- }
#ifdef HAVE_LIBZ
- if (compresslevel != 0)
- {
- if (!state.ztarfile)
+ if (compresslevel != 0)
{
- /* Compression is in use */
- pg_log_error("could not create compressed file \"%s\": %s",
- state.filename, get_gz_error(state.ztarfile));
- exit(1);
+ strlcat(archive_filename, ".gz", sizeof(archive_filename));
+ streamer = bbstreamer_gzip_writer_new(archive_filename,
+ archive_file,
+ compresslevel);
}
- }
- else
+ else
#endif
- {
- /* Either no zlib support, or zlib support but compresslevel = 0 */
- if (!state.tarfile)
- {
- pg_log_error("could not create file \"%s\": %m", state.filename);
- exit(1);
- }
- }
+ streamer = bbstreamer_plain_writer_new(archive_filename,
+ archive_file);
- ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+
+ /*
+ * If we need to parse the archive for whatever reason, then we'll
+ * also need to re-archive, because, if the output format is tar, the
+ * only point of parsing the archive is to be able to inject stuff
+ * into it.
+ */
+ if (must_parse_archive)
+ streamer = bbstreamer_tar_archiver_new(streamer);
+ progress_filename = archive_filename;
+ }
/*
- * End of copy data. If requested, and this is the base tablespace, write
- * configuration file into the tarfile. When done, close the file (but not
- * stdout).
- *
- * Also, write two completely empty blocks at the end of the tar file, as
- * required by some tar programs.
+ * If we're supposed to inject the backup manifest into the results,
+ * it should be done here, so that the file content can be injected
+ * directly, without worrying about the details of the tar format.
*/
+ if (inject_manifest)
+ manifest_inject_streamer = streamer;
- MemSet(zerobuf, 0, sizeof(zerobuf));
-
- if (state.basetablespace && writerecoveryconf)
+ /*
+ * If this is the main tablespace and we're supposed to write
+ * recovery information, arrange to do that.
+ */
+ if (spclocation == NULL && writerecoveryconf)
{
- char header[TAR_BLOCK_SIZE];
+ Assert(must_parse_archive);
+ streamer = bbstreamer_recovery_injector_new(streamer,
+ is_recovery_guc_supported,
+ recoveryconfcontents);
+ }
- /*
- * If postgresql.auto.conf has not been found in the streamed data,
- * add recovery configuration to postgresql.auto.conf if recovery
- * parameters are GUCs. If the instance connected to is older than
- * 12, create recovery.conf with this data otherwise.
- */
- if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported)
- {
- int padding;
-
- tarCreateHeader(header,
- state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
- NULL,
- recoveryconfcontents->len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
-
- padding = tarPaddingBytesRequired(recoveryconfcontents->len);
-
- writeTarData(&state, header, sizeof(header));
- writeTarData(&state, recoveryconfcontents->data,
- recoveryconfcontents->len);
- if (padding)
- writeTarData(&state, zerobuf, padding);
- }
+ /*
+ * If we're doing anything that involves understanding the contents of
+ * the archive, we'll need to parse it.
+ */
+ if (must_parse_archive)
+ streamer = bbstreamer_tar_parser_new(streamer);
- /*
- * standby.signal is supported only if recovery parameters are GUCs.
- */
- if (state.is_recovery_guc_supported)
- {
- tarCreateHeader(header, "standby.signal", NULL,
- 0, /* zero-length file */
- pg_file_create_mode, 04000, 02000,
- time(NULL));
+ /* Return the results. */
+ *manifest_inject_streamer_p = manifest_inject_streamer;
+ return streamer;
+}
- writeTarData(&state, header, sizeof(header));
+/*
+ * Receive raw tar data from the server, and stream it to the appropriate
+ * location. If we're writing a single tarfile to standard output, also
+ * receive the backup manifest and inject it into that tarfile.
+ */
+static void
+ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
+ bool tablespacenum)
+{
+ WriteTarState state;
+ bbstreamer *manifest_inject_streamer;
+ bool is_recovery_guc_supported;
- /*
- * we don't need to pad out to a multiple of the tar block size
- * here, because the file is zero length, which is a multiple of
- * any block size.
- */
- }
- }
+ /* Pass all COPY data through to the backup streamer. */
+ memset(&state, 0, sizeof(state));
+ is_recovery_guc_supported =
+ PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC;
+ state.streamer = CreateBackupStreamer(archive_name, spclocation,
+ &manifest_inject_streamer,
+ is_recovery_guc_supported);
+ state.tablespacenum = tablespacenum;
+ ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+ progress_filename = NULL;
/*
- * Normally, we emit the backup manifest as a separate file, but when
- * we're writing a tarfile to stdout, we don't have that option, so
- * include it in the one tarfile we've got.
+ * The decision as to whether we need to inject the backup manifest into
+ * the output at this stage is made by CreateBackupStreamer; if that is
+ * needed, manifest_inject_streamer will be non-NULL; otherwise, it will
+ * be NULL.
*/
- if (strcmp(basedir, "-") == 0 && manifest)
+ if (manifest_inject_streamer != NULL)
{
- char header[TAR_BLOCK_SIZE];
PQExpBufferData buf;
+ /* Slurp the entire backup manifest into a buffer. */
initPQExpBuffer(&buf);
ReceiveBackupManifestInMemory(conn, &buf);
if (PQExpBufferDataBroken(buf))
@@ -1248,42 +1125,20 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
pg_log_error("out of memory");
exit(1);
}
- tarCreateHeader(header, "backup_manifest", NULL, buf.len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
- writeTarData(&state, header, sizeof(header));
- writeTarData(&state, buf.data, buf.len);
- termPQExpBuffer(&buf);
- }
- /* 2 * TAR_BLOCK_SIZE bytes empty data at end of file */
- writeTarData(&state, zerobuf, sizeof(zerobuf));
+ /* Inject it into the output tarfile. */
+ bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest",
+ buf.data, buf.len);
-#ifdef HAVE_LIBZ
- if (state.ztarfile != NULL)
- {
- if (gzclose(state.ztarfile) != 0)
- {
- pg_log_error("could not close compressed file \"%s\": %s",
- state.filename, get_gz_error(state.ztarfile));
- exit(1);
- }
- }
- else
-#endif
- {
- if (strcmp(basedir, "-") != 0)
- {
- if (fclose(state.tarfile) != 0)
- {
- pg_log_error("could not close file \"%s\": %m",
- state.filename);
- exit(1);
- }
- }
+ /* Free memory. */
+ termPQExpBuffer(&buf);
}
- progress_report(rownum, state.filename, true, false);
+ /* Cleanup. */
+ bbstreamer_finalize(state.streamer);
+ bbstreamer_free(state.streamer);
+
+ progress_report(tablespacenum, true, false);
/*
* Do not sync the resulting tar file yet, all files are synced once at
@@ -1299,184 +1154,10 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
{
WriteTarState *state = callback_data;
- if (!writerecoveryconf || !state->basetablespace)
- {
- /*
- * When not writing config file, or when not working on the base
- * tablespace, we never have to look for an existing configuration
- * file in the stream.
- */
- writeTarData(state, copybuf, r);
- }
- else
- {
- /*
- * Look for a config file in the existing tar stream. If it's there,
- * we must skip it so we can later overwrite it with our own version
- * of the file.
- *
- * To do this, we have to process the individual files inside the TAR
- * stream. The stream consists of a header and zero or more chunks,
- * each with a length equal to TAR_BLOCK_SIZE. The stream from the
- * server is broken up into smaller pieces, so we have to track the
- * size of the files to find the next header structure.
- */
- int rr = r;
- int pos = 0;
-
- while (rr > 0)
- {
- if (state->in_tarhdr)
- {
- /*
- * We're currently reading a header structure inside the TAR
- * stream, i.e. the file metadata.
- */
- if (state->tarhdrsz < TAR_BLOCK_SIZE)
- {
- /*
- * Copy the header structure into tarhdr in case the
- * header is not aligned properly or it's not returned in
- * whole by the last PQgetCopyData call.
- */
- int hdrleft;
- int bytes2copy;
-
- hdrleft = TAR_BLOCK_SIZE - state->tarhdrsz;
- bytes2copy = (rr > hdrleft ? hdrleft : rr);
-
- memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos,
- bytes2copy);
-
- rr -= bytes2copy;
- pos += bytes2copy;
- state->tarhdrsz += bytes2copy;
- }
- else
- {
- /*
- * We have the complete header structure in tarhdr, look
- * at the file metadata: we may want append recovery info
- * into postgresql.auto.conf and skip standby.signal file
- * if recovery parameters are integrated as GUCs, and
- * recovery.conf otherwise. In both cases we must
- * calculate tar padding.
- */
- if (state->is_recovery_guc_supported)
- {
- state->skip_file =
- (strcmp(&state->tarhdr[0], "standby.signal") == 0);
- state->is_postgresql_auto_conf =
- (strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0);
- }
- else
- state->skip_file =
- (strcmp(&state->tarhdr[0], "recovery.conf") == 0);
-
- state->filesz = read_tar_number(&state->tarhdr[124], 12);
- state->file_padding_len =
- tarPaddingBytesRequired(state->filesz);
-
- if (state->is_recovery_guc_supported &&
- state->is_postgresql_auto_conf &&
- writerecoveryconf)
- {
- /* replace tar header */
- char header[TAR_BLOCK_SIZE];
-
- tarCreateHeader(header, "postgresql.auto.conf", NULL,
- state->filesz + recoveryconfcontents->len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
-
- writeTarData(state, header, sizeof(header));
- }
- else
- {
- /* copy stream with padding */
- state->filesz += state->file_padding_len;
-
- if (!state->skip_file)
- {
- /*
- * If we're not skipping the file, write the tar
- * header unmodified.
- */
- writeTarData(state, state->tarhdr, TAR_BLOCK_SIZE);
- }
- }
-
- /* Next part is the file, not the header */
- state->in_tarhdr = false;
- }
- }
- else
- {
- /*
- * We're processing a file's contents.
- */
- if (state->filesz > 0)
- {
- /*
- * We still have data to read (and possibly write).
- */
- int bytes2write;
-
- bytes2write = (state->filesz > rr ? rr : state->filesz);
-
- if (!state->skip_file)
- writeTarData(state, copybuf + pos, bytes2write);
-
- rr -= bytes2write;
- pos += bytes2write;
- state->filesz -= bytes2write;
- }
- else if (state->is_recovery_guc_supported &&
- state->is_postgresql_auto_conf &&
- writerecoveryconf)
- {
- /* append recovery config to postgresql.auto.conf */
- int padding;
- int tailsize;
-
- tailsize = (TAR_BLOCK_SIZE - state->file_padding_len) + recoveryconfcontents->len;
- padding = tarPaddingBytesRequired(tailsize);
-
- writeTarData(state, recoveryconfcontents->data,
- recoveryconfcontents->len);
-
- if (padding)
- {
- char zerobuf[TAR_BLOCK_SIZE];
-
- MemSet(zerobuf, 0, sizeof(zerobuf));
- writeTarData(state, zerobuf, padding);
- }
+ bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN);
- /* skip original file padding */
- state->is_postgresql_auto_conf = false;
- state->skip_file = true;
- state->filesz += state->file_padding_len;
-
- state->found_postgresql_auto_conf = true;
- }
- else
- {
- /*
- * No more data in the current file, the next piece of
- * data (if any) will be a new file header structure.
- */
- state->in_tarhdr = true;
- state->skip_file = false;
- state->is_postgresql_auto_conf = false;
- state->tarhdrsz = 0;
- state->filesz = 0;
- }
- }
- }
- }
totaldone += r;
- progress_report(state->tablespacenum, state->filename, false, false);
+ progress_report(state->tablespacenum, false, false);
}
@@ -1501,242 +1182,6 @@ get_tablespace_mapping(const char *dir)
return dir;
}
-
-/*
- * Receive a tar format stream from the connection to the server, and unpack
- * the contents of it into a directory. Only files, directories and
- * symlinks are supported, no other kinds of special files.
- *
- * If the data is for the main data directory, it will be restored in the
- * specified directory. If it's for another tablespace, it will be restored
- * in the original or mapped directory.
- */
-static void
-ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
-{
- UnpackTarState state;
- bool basetablespace;
-
- memset(&state, 0, sizeof(state));
- state.tablespacenum = rownum;
-
- basetablespace = PQgetisnull(res, rownum, 0);
- if (basetablespace)
- strlcpy(state.current_path, basedir, sizeof(state.current_path));
- else
- strlcpy(state.current_path,
- get_tablespace_mapping(PQgetvalue(res, rownum, 1)),
- sizeof(state.current_path));
-
- ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state);
-
-
- if (state.file)
- fclose(state.file);
-
- progress_report(rownum, state.filename, true, false);
-
- if (state.file != NULL)
- {
- pg_log_error("COPY stream ended before last file was finished");
- exit(1);
- }
-
- if (basetablespace && writerecoveryconf)
- WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
- /*
- * No data is synced here, everything is done for all tablespaces at the
- * end.
- */
-}
-
-static void
-ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
-{
- UnpackTarState *state = callback_data;
-
- if (state->file == NULL)
- {
-#ifndef WIN32
- int filemode;
-#endif
-
- /*
- * No current file, so this must be the header for a new file
- */
- if (r != TAR_BLOCK_SIZE)
- {
- pg_log_error("invalid tar block header size: %zu", r);
- exit(1);
- }
- totaldone += TAR_BLOCK_SIZE;
-
- state->current_len_left = read_tar_number(&copybuf[124], 12);
-
-#ifndef WIN32
- /* Set permissions on the file */
- filemode = read_tar_number(&copybuf[100], 8);
-#endif
-
- /*
- * All files are padded up to a multiple of TAR_BLOCK_SIZE
- */
- state->current_padding =
- tarPaddingBytesRequired(state->current_len_left);
-
- /*
- * First part of header is zero terminated filename
- */
- snprintf(state->filename, sizeof(state->filename),
- "%s/%s", state->current_path, copybuf);
- if (state->filename[strlen(state->filename) - 1] == '/')
- {
- /*
- * Ends in a slash means directory or symlink to directory
- */
- if (copybuf[156] == '5')
- {
- /*
- * Directory. Remove trailing slash first.
- */
- state->filename[strlen(state->filename) - 1] = '\0';
- if (mkdir(state->filename, pg_dir_create_mode) != 0)
- {
- /*
- * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
- * clusters) will have been created by the wal receiver
- * process. Also, when the WAL directory location was
- * specified, pg_wal (or pg_xlog) has already been created
- * as a symbolic link before starting the actual backup.
- * So just ignore creation failures on related
- * directories.
- */
- if (!((pg_str_endswith(state->filename, "/pg_wal") ||
- pg_str_endswith(state->filename, "/pg_xlog") ||
- pg_str_endswith(state->filename, "/archive_status")) &&
- errno == EEXIST))
- {
- pg_log_error("could not create directory \"%s\": %m",
- state->filename);
- exit(1);
- }
- }
-#ifndef WIN32
- if (chmod(state->filename, (mode_t) filemode))
- {
- pg_log_error("could not set permissions on directory \"%s\": %m",
- state->filename);
- exit(1);
- }
-#endif
- }
- else if (copybuf[156] == '2')
- {
- /*
- * Symbolic link
- *
- * It's most likely a link in pg_tblspc directory, to the
- * location of a tablespace. Apply any tablespace mapping
- * given on the command line (--tablespace-mapping). (We
- * blindly apply the mapping without checking that the link
- * really is inside pg_tblspc. We don't expect there to be
- * other symlinks in a data directory, but if there are, you
- * can call it an undocumented feature that you can map them
- * too.)
- */
- state->filename[strlen(state->filename) - 1] = '\0'; /* Remove trailing slash */
-
- state->mapped_tblspc_path =
- get_tablespace_mapping(&copybuf[157]);
- if (symlink(state->mapped_tblspc_path, state->filename) != 0)
- {
- pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
- state->filename, state->mapped_tblspc_path);
- exit(1);
- }
- }
- else
- {
- pg_log_error("unrecognized link indicator \"%c\"",
- copybuf[156]);
- exit(1);
- }
- return; /* directory or link handled */
- }
-
- /*
- * regular file
- */
- state->file = fopen(state->filename, "wb");
- if (!state->file)
- {
- pg_log_error("could not create file \"%s\": %m", state->filename);
- exit(1);
- }
-
-#ifndef WIN32
- if (chmod(state->filename, (mode_t) filemode))
- {
- pg_log_error("could not set permissions on file \"%s\": %m",
- state->filename);
- exit(1);
- }
-#endif
-
- if (state->current_len_left == 0)
- {
- /*
- * Done with this file, next one will be a new tar header
- */
- fclose(state->file);
- state->file = NULL;
- return;
- }
- } /* new file */
- else
- {
- /*
- * Continuing blocks in existing file
- */
- if (state->current_len_left == 0 && r == state->current_padding)
- {
- /*
- * Received the padding block for this file, ignore it and close
- * the file, then move on to the next tar header.
- */
- fclose(state->file);
- state->file = NULL;
- totaldone += r;
- return;
- }
-
- errno = 0;
- if (fwrite(copybuf, r, 1, state->file) != 1)
- {
- /* if write didn't set errno, assume problem is no disk space */
- if (errno == 0)
- errno = ENOSPC;
- pg_log_error("could not write to file \"%s\": %m", state->filename);
- exit(1);
- }
- totaldone += r;
- progress_report(state->tablespacenum, state->filename, false, false);
-
- state->current_len_left -= r;
- if (state->current_len_left == 0 && state->current_padding == 0)
- {
- /*
- * Received the last block, and there is no padding to be
- * expected. Close the file and move on to the next tar header.
- */
- fclose(state->file);
- state->file = NULL;
- return;
- }
- } /* continuing data in existing file */
-}
-
/*
* Receive the backup manifest file and write it out to a file.
*/
@@ -2035,16 +1480,32 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
+ /* Receive a tar file for each tablespace in turn */
for (i = 0; i < PQntuples(res); i++)
{
- if (format == 't')
- ReceiveTarFile(conn, res, i);
+ char archive_name[MAXPGPATH];
+ char *spclocation;
+
+ /*
+ * If we write the data out to a tar file, it will be named base.tar
+ * if it's the main data directory or <tablespaceoid>.tar if it's for
+ * another tablespace. CreateBackupStreamer() will arrange to add .gz
+ * to the archive name if pg_basebackup is performing compression.
+ */
+ if (PQgetisnull(res, i, 0))
+ {
+ strlcpy(archive_name, "base.tar", sizeof(archive_name));
+ spclocation = NULL;
+ }
else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
+ {
+ snprintf(archive_name, sizeof(archive_name),
+ "%s.tar", PQgetvalue(res, i, 0));
+ spclocation = PQgetvalue(res, i, 1);
+ }
+
+ ReceiveTarFile(conn, archive_name, spclocation, i);
+ }
/*
* Now receive backup manifest, if appropriate.
@@ -2060,7 +1521,10 @@ BaseBackup(void)
ReceiveBackupManifest(conn);
if (showprogress)
- progress_report(PQntuples(res), NULL, true, true);
+ {
+ progress_filename = NULL;
+ progress_report(PQntuples(res), true, true);
+ }
PQclear(res);