diff options
author | Robert Haas <rhaas@postgresql.org> | 2022-01-18 13:47:26 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2022-01-18 13:47:49 -0500 |
commit | cc333f32336f5146b75190f57ef587dff225f565 (patch) | |
tree | 8bcc3bf1b50cf4cbfe597a519b37e7aa8fcc5bde /src/bin/pg_basebackup/pg_basebackup.c | |
parent | 3414099c338bf619c00dd82d96f29a9668a3bf07 (diff) |
Modify pg_basebackup to use a new COPY subprotocol for base backups.
In the new approach, all files across all tablespaces are sent in a
single COPY OUT operation. The CopyData messages are no longer raw
archive content; rather, each message is prefixed with a type byte
that describes its purpose, e.g. 'n' signifies the start of a new
archive and 'd' signifies archive or manifest data. This protocol
is significantly more extensible than the old approach, since we can
later create more message types, though not without concern for
backward compatibility.
The new protocol sends a few things to the client that the old one
did not. First, it sends the name of each archive explicitly, instead
of letting the client compute it. This is intended to make it easier
to write future patches that might send archives in a format other
that tar (e.g. cpio, pax, tar.gz). Second, it sends explicit progress
messages rather than allowing the client to assume that progress is
defined by the number of bytes received. This will help with future
features where the server compresses the data, or sends it someplace
directly rather than transmitting it to the client.
The old protocol is still supported for compatibility with previous
releases. The new protocol is selected by means of a new
TARGET option to the BASE_BACKUP command. Currently, the
only supported target is 'client'. Support for additional
targets will be added in a later commit.
Patch by me. The patch set of which this is a part has had review
and/or testing from Jeevan Ladhe, Tushar Ahuja, Suraj Kharage,
Dipesh Pandit, and Mark Dilger.
Discussion: http://postgr.es/m/CA+TgmoaYZbz0=Yk797aOJwkGJC-LK3iXn+wzzMx7KdwNpZhS5g@mail.gmail.com
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 410 |
1 files changed, 376 insertions, 34 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index aa43fc09241..2a58be638a5 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -54,6 +54,16 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct ArchiveStreamState +{ + int tablespacenum; + bbstreamer *streamer; + bbstreamer *manifest_inject_streamer; + PQExpBuffer manifest_buffer; + char manifest_filename[MAXPGPATH]; + FILE *manifest_file; +} ArchiveStreamState; + typedef struct WriteTarState { int tablespacenum; @@ -174,6 +184,13 @@ static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, bbstreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, bool expect_unterminated_tarfile); +static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, + void *callback_data); +static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); +static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor); +static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor); +static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor); +static void ReportCopyDataParseError(size_t r, char *copybuf); static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, bool tablespacenum); static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); @@ -1098,6 +1115,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } /* + * Receive all of the archives the server wants to send - and the backup + * manifest if present - as a single COPY stream. + */ +static void +ReceiveArchiveStream(PGconn *conn) +{ + ArchiveStreamState state; + + /* Set up initial state. */ + memset(&state, 0, sizeof(state)); + state.tablespacenum = -1; + + /* All the real work happens in ReceiveArchiveStreamChunk. */ + ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state); + + /* If we wrote the backup manifest to a file, close the file. */ + if (state.manifest_file !=NULL) + { + fclose(state.manifest_file); + state.manifest_file = NULL; + } + + /* + * If we buffered the backup manifest in order to inject it into the + * output tarfile, do that now. + */ + if (state.manifest_inject_streamer != NULL && + state.manifest_buffer != NULL) + { + bbstreamer_inject_file(state.manifest_inject_streamer, + "backup_manifest", + state.manifest_buffer->data, + state.manifest_buffer->len); + destroyPQExpBuffer(state.manifest_buffer); + state.manifest_buffer = NULL; + } + + /* If there's still an archive in progress, end processing. */ + if (state.streamer != NULL) + { + bbstreamer_finalize(state.streamer); + bbstreamer_free(state.streamer); + state.streamer = NULL; + } +} + +/* + * Receive one chunk of data sent by the server as part of a single COPY + * stream that includes all archives and the manifest. + */ +static void +ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) +{ + ArchiveStreamState *state = callback_data; + size_t cursor = 0; + + /* Each CopyData message begins with a type byte. */ + switch (GetCopyDataByte(r, copybuf, &cursor)) + { + case 'n': + { + /* New archive. */ + char *archive_name; + char *spclocation; + + /* + * We force a progress report at the end of each tablespace. A + * new tablespace starts when the previous one ends, except in + * the case of the very first one. + */ + if (++state->tablespacenum > 0) + progress_report(state->tablespacenum, true, false); + + /* Sanity check. */ + if (state->manifest_buffer != NULL || + state->manifest_file !=NULL) + { + pg_log_error("archives should precede manifest"); + exit(1); + } + + /* Parse the rest of the CopyData message. */ + archive_name = GetCopyDataString(r, copybuf, &cursor); + spclocation = GetCopyDataString(r, copybuf, &cursor); + GetCopyDataEnd(r, copybuf, cursor); + + /* + * Basic sanity checks on the archive name: it shouldn't be + * empty, it shouldn't start with a dot, and it shouldn't + * contain a path separator. + */ + if (archive_name[0] == '\0' || archive_name[0] == '.' || + strchr(archive_name, '/') != NULL || + strchr(archive_name, '\\') != NULL) + { + pg_log_error("invalid archive name: \"%s\"", + archive_name); + exit(1); + } + + /* + * An empty spclocation is treated as NULL. We expect this + * case to occur for the data directory itself, but not for + * any archives that correspond to tablespaces. + */ + if (spclocation[0] == '\0') + spclocation = NULL; + + /* End processing of any prior archive. */ + if (state->streamer != NULL) + { + bbstreamer_finalize(state->streamer); + bbstreamer_free(state->streamer); + state->streamer = NULL; + } + + /* + * Create an appropriate backup streamer. We know that + * recovery GUCs are supported, because this protocol can only + * be used on v15+. + */ + state->streamer = + CreateBackupStreamer(archive_name, + spclocation, + &state->manifest_inject_streamer, + true, false); + break; + } + + case 'd': + { + /* Archive or manifest data. */ + if (state->manifest_buffer != NULL) + { + /* Manifest data, buffer in memory. */ + appendPQExpBuffer(state->manifest_buffer, copybuf + 1, + r - 1); + } + else if (state->manifest_file !=NULL) + { + /* Manifest data, write to disk. */ + if (fwrite(copybuf + 1, r - 1, 1, + state->manifest_file) != 1) + { + /* + * If fwrite() didn't set errno, assume that the + * problem is that we're out of disk space. + */ + if (errno == 0) + errno = ENOSPC; + pg_log_error("could not write to file \"%s\": %m", + state->manifest_filename); + exit(1); + } + } + else if (state->streamer != NULL) + { + /* Archive data. */ + bbstreamer_content(state->streamer, NULL, copybuf + 1, + r - 1, BBSTREAMER_UNKNOWN); + } + else + { + pg_log_error("unexpected payload data"); + exit(1); + } + break; + } + + case 'p': + { + /* + * Progress report. + * + * The remainder of the message is expected to be an 8-byte + * count of bytes completed. + */ + totaldone = GetCopyDataUInt64(r, copybuf, &cursor); + GetCopyDataEnd(r, copybuf, cursor); + + /* + * The server shouldn't send progres report messages too + * often, so we force an update each time we receive one. + */ + progress_report(state->tablespacenum, true, false); + break; + } + + case 'm': + { + /* + * Manifest data will be sent next. This message is not + * expected to have any further payload data. + */ + GetCopyDataEnd(r, copybuf, cursor); + + /* + * If we're supposed inject the manifest into the archive, we + * prepare to buffer it in memory; otherwise, we prepare to + * write it to a temporary file. + */ + if (state->manifest_inject_streamer != NULL) + state->manifest_buffer = createPQExpBuffer(); + else + { + snprintf(state->manifest_filename, + sizeof(state->manifest_filename), + "%s/backup_manifest.tmp", basedir); + state->manifest_file = + fopen(state->manifest_filename, "wb"); + if (state->manifest_file == NULL) + { + pg_log_error("could not create file \"%s\": %m", + state->manifest_filename); + exit(1); + } + } + break; + } + + default: + ReportCopyDataParseError(r, copybuf); + break; + } +} + +/* + * Get a single byte from a CopyData message. + * + * Bail out if none remain. + */ +static char +GetCopyDataByte(size_t r, char *copybuf, size_t *cursor) +{ + if (*cursor >= r) + ReportCopyDataParseError(r, copybuf); + + return copybuf[(*cursor)++]; +} + +/* + * Get a NUL-terminated string from a CopyData message. + * + * Bail out if the terminating NUL cannot be found. + */ +static char * +GetCopyDataString(size_t r, char *copybuf, size_t *cursor) +{ + size_t startpos = *cursor; + size_t endpos = startpos; + + while (1) + { + if (endpos >= r) + ReportCopyDataParseError(r, copybuf); + if (copybuf[endpos] == '\0') + break; + ++endpos; + } + + *cursor = endpos + 1; + return ©buf[startpos]; +} + +/* + * Get an unsigned 64-bit integer from a CopyData message. + * + * Bail out if there are not at least 8 bytes remaining. + */ +static uint64 +GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor) +{ + uint64 result; + + if (*cursor + sizeof(uint64) > r) + ReportCopyDataParseError(r, copybuf); + memcpy(&result, ©buf[*cursor], sizeof(uint64)); + *cursor += sizeof(uint64); + return pg_ntoh64(result); +} + +/* + * Bail out if we didn't parse the whole message. + */ +static void +GetCopyDataEnd(size_t r, char *copybuf, size_t cursor) +{ + if (r != cursor) + ReportCopyDataParseError(r, copybuf); +} + +/* + * Report failure to parse a CopyData message from the server. Then exit. + * + * As a debugging aid, we try to give some hint about what kind of message + * provoked the failure. Perhaps this is not detailed enough, but it's not + * clear that it's worth expending any more code on what shoud be a + * can't-happen case. + */ +static void +ReportCopyDataParseError(size_t r, char *copybuf) +{ + if (r == 0) + pg_log_error("empty COPY message"); + else + pg_log_error("malformed COPY message of type %d, length %zu", + copybuf[0], r); + exit(1); +} + +/* * Receive raw tar data from the server, and stream it to the appropriate * location. If we're writing a single tarfile to standard output, also * receive the backup manifest and inject it into that tarfile. @@ -1376,6 +1704,10 @@ BaseBackup(void) "MANIFEST_CHECKSUMS", manifest_checksums); } + if (serverMajor >= 1500) + AppendStringCommandOption(&buf, use_new_option_syntax, + "TARGET", "client"); + if (verbose) pg_log_info("initiating base backup, waiting for checkpoint to complete"); @@ -1498,46 +1830,56 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* Receive a tar file for each tablespace in turn */ - for (i = 0; i < PQntuples(res); i++) + if (serverMajor >= 1500) { - char archive_name[MAXPGPATH]; - char *spclocation; - - /* - * If we write the data out to a tar file, it will be named base.tar - * if it's the main data directory or <tablespaceoid>.tar if it's for - * another tablespace. CreateBackupStreamer() will arrange to add .gz - * to the archive name if pg_basebackup is performing compression. - */ - if (PQgetisnull(res, i, 0)) - { - strlcpy(archive_name, "base.tar", sizeof(archive_name)); - spclocation = NULL; - } - else + /* Receive a single tar stream with everything. */ + ReceiveArchiveStream(conn); + } + else + { + /* Receive a tar file for each tablespace in turn */ + for (i = 0; i < PQntuples(res); i++) { - snprintf(archive_name, sizeof(archive_name), - "%s.tar", PQgetvalue(res, i, 0)); - spclocation = PQgetvalue(res, i, 1); + char archive_name[MAXPGPATH]; + char *spclocation; + + /* + * If we write the data out to a tar file, it will be named + * base.tar if it's the main data directory or <tablespaceoid>.tar + * if it's for another tablespace. CreateBackupStreamer() will + * arrange to add .gz to the archive name if pg_basebackup is + * performing compression. + */ + if (PQgetisnull(res, i, 0)) + { + strlcpy(archive_name, "base.tar", sizeof(archive_name)); + spclocation = NULL; + } + else + { + snprintf(archive_name, sizeof(archive_name), + "%s.tar", PQgetvalue(res, i, 0)); + spclocation = PQgetvalue(res, i, 1); + } + + ReceiveTarFile(conn, archive_name, spclocation, i); } - ReceiveTarFile(conn, archive_name, spclocation, i); + /* + * Now receive backup manifest, if appropriate. + * + * If we're writing a tarfile to stdout, ReceiveTarFile will have + * already processed the backup manifest and included it in the output + * tarfile. Such a configuration doesn't allow for writing multiple + * files. + * + * If we're talking to an older server, it won't send a backup + * manifest, so don't try to receive one. + */ + if (!writing_to_stdout && manifest) + ReceiveBackupManifest(conn); } - /* - * Now receive backup manifest, if appropriate. - * - * If we're writing a tarfile to stdout, ReceiveTarFile will have already - * processed the backup manifest and included it in the output tarfile. - * Such a configuration doesn't allow for writing multiple files. - * - * If we're talking to an older server, it won't send a backup manifest, - * so don't try to receive one. - */ - if (!writing_to_stdout && manifest) - ReceiveBackupManifest(conn); - if (showprogress) { progress_filename = NULL; |