diff options
author | Robert Haas <rhaas@postgresql.org> | 2021-11-05 10:08:30 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2021-11-05 10:08:30 -0400 |
commit | bef47ff85df18bf4a3a9b13bd2a54820e27f3614 (patch) | |
tree | 9b0ff2c1fa76a38a425172a66d9afb2c3550743c /src/backend/replication/basebackup.c | |
parent | bd807be6935929bdefe74d1258ca08048f0aafa3 (diff) |
Introduce 'bbsink' abstraction to modularize base backup code.
The base backup code has accumulated a healthy number of new
features over the years, but it's becoming increasingly difficult
to maintain and further enhance that code because there's no
real separation of concerns. For example, the code that
understands knows the details of how we send data to the client
using the libpq protocol is scattered throughout basebackup.c,
rather than being centralized in one place.
To try to improve this situation, introduce a new 'bbsink' object
which acts as a recipient for archives generated during the base
backup progress and also for the backup manifest. This commit
introduces three types of bbsink: a 'copytblspc' bbsink forwards the
backup to the client using one COPY OUT operation per tablespace and
another for the manifest, a 'progress' bbsink performs command
progress reporting, and a 'throttle' bbsink performs rate-limiting.
The 'progress' and 'throttle' bbsink types also forward the data to a
successor bbsink; at present, the last bbsink in the chain will
always be of type 'copytblspc'. There are plans to add more types
of 'bbsink' in future commits.
This abstraction is a bit leaky in the case of progress reporting,
but this still seems cleaner than what we had before.
Patch by me, reviewed and tested by Andres Freund, Sumanta Mukherjee,
Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja, Mark Dilger,
and Jeevan Ladhe.
Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com
Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
Diffstat (limited to 'src/backend/replication/basebackup.c')
-rw-r--r-- | src/backend/replication/basebackup.c | 692 |
1 files changed, 197 insertions, 495 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index b7359f43903..38c82c46196 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -17,13 +17,9 @@ #include <time.h> #include "access/xlog_internal.h" /* for pg_start/stop_backup */ -#include "catalog/pg_type.h" #include "common/file_perm.h" #include "commands/defrem.h" -#include "commands/progress.h" #include "lib/stringinfo.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/pg_list.h" #include "pgstat.h" @@ -31,6 +27,7 @@ #include "port.h" #include "postmaster/syslogger.h" #include "replication/basebackup.h" +#include "replication/basebackup_sink.h" #include "replication/backup_manifest.h" #include "replication/walsender.h" #include "replication/walsender_private.h" @@ -46,6 +43,16 @@ #include "utils/resowner.h" #include "utils/timestamp.h" +/* + * How much data do we want to send in one CopyData message? Note that + * this may also result in reading the underlying files in chunks of this + * size. + * + * NB: The buffer size is required to be a multiple of the system block + * size, so use that value instead if it's bigger than our preference. + */ +#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ) + typedef struct { const char *label; @@ -59,27 +66,25 @@ typedef struct pg_checksum_type manifest_checksum_type; } basebackup_options; -static int64 sendTablespace(char *path, char *oid, bool sizeonly, +static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly, struct backup_manifest_info *manifest); -static int64 sendDir(const char *path, int basepathlen, bool sizeonly, +static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest, const char *spcoid); -static bool sendFile(const char *readfilename, const char *tarfilename, +static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid, backup_manifest_info *manifest, const char *spcoid); -static void sendFileWithContent(const char *filename, const char *content, +static void sendFileWithContent(bbsink *sink, const char *filename, + const char *content, backup_manifest_info *manifest); -static int64 _tarWriteHeader(const char *filename, const char *linktarget, - struct stat *statbuf, bool sizeonly); +static int64 _tarWriteHeader(bbsink *sink, const char *filename, + const char *linktarget, struct stat *statbuf, + bool sizeonly); +static void _tarWritePadding(bbsink *sink, int len); static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf); -static void send_int8_string(StringInfoData *buf, int64 intval); -static void SendBackupHeader(List *tablespaces); -static void perform_base_backup(basebackup_options *opt); +static void perform_base_backup(basebackup_options *opt, bbsink *sink); static void parse_basebackup_options(List *options, basebackup_options *opt); -static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const ListCell *a, const ListCell *b); -static void throttle(size_t increment); -static void update_basebackup_progress(int64 delta); static bool is_checksummed_file(const char *fullpath, const char *filename); static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset, const char *filename, bool partial_read_ok); @@ -90,31 +95,6 @@ static bool backup_started_in_recovery = false; /* Relative path of temporary statistics directory */ static char *statrelpath = NULL; -/* - * Size of each block sent into the tar stream for larger files. - */ -#define TAR_SEND_SIZE 32768 - -/* - * How frequently to throttle, as a fraction of the specified rate-second. - */ -#define THROTTLING_FREQUENCY 8 - -/* The actual number of bytes, transfer of which may cause sleep. */ -static uint64 throttling_sample; - -/* Amount of data already transferred but not yet throttled. */ -static int64 throttling_counter; - -/* The minimum time required to transfer throttling_sample bytes. */ -static TimeOffset elapsed_min_unit; - -/* The last check of the transfer rate. */ -static TimestampTz throttled_last; - -/* The starting XLOG position of the base backup. */ -static XLogRecPtr startptr; - /* Total number of checksum failures during base backup. */ static long long int total_checksum_failures; @@ -122,15 +102,6 @@ static long long int total_checksum_failures; static bool noverify_checksums = false; /* - * Total amount of backup data that will be streamed. - * -1 means that the size is not estimated. - */ -static int64 backup_total = 0; - -/* Amount of backup data already streamed */ -static int64 backup_streamed = 0; - -/* * Definition of one element part of an exclusion list, used for paths part * of checksum validation or base backups. "name" is the name of the file * or path to check for exclusion. If "match_prefix" is true, any items @@ -253,32 +224,22 @@ static const struct exclude_list_item noChecksumFiles[] = { * clobbered by longjmp" from stupider versions of gcc. */ static void -perform_base_backup(basebackup_options *opt) +perform_base_backup(basebackup_options *opt, bbsink *sink) { - TimeLineID starttli; + bbsink_state state; XLogRecPtr endptr; TimeLineID endtli; StringInfo labelfile; StringInfo tblspc_map_file; backup_manifest_info manifest; int datadirpathlen; - List *tablespaces = NIL; - backup_total = 0; - backup_streamed = 0; - pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid); - - /* - * If the estimation of the total backup size is disabled, make the - * backup_total column in the view return NULL by setting the parameter to - * -1. - */ - if (!opt->progress) - { - backup_total = -1; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, - backup_total); - } + /* Initial backup state, insofar as we know it now. */ + state.tablespaces = NIL; + state.tablespace_num = 0; + state.bytes_done = 0; + state.bytes_total = 0; + state.bytes_total_is_valid = false; /* we're going to use a BufFile, so we need a ResourceOwner */ Assert(CurrentResourceOwner == NULL); @@ -295,11 +256,11 @@ perform_base_backup(basebackup_options *opt) total_checksum_failures = 0; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT); - startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, - labelfile, &tablespaces, - tblspc_map_file); + basebackup_progress_wait_checkpoint(); + state.startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, + &state.starttli, + labelfile, &state.tablespaces, + tblspc_map_file); /* * Once do_pg_start_backup has been called, ensure that any failure causes @@ -312,7 +273,6 @@ perform_base_backup(basebackup_options *opt) { ListCell *lc; tablespaceinfo *ti; - int tblspc_streamed = 0; /* * Calculate the relative path of temporary statistics directory in @@ -329,7 +289,7 @@ perform_base_backup(basebackup_options *opt) /* Add a node for the base directory at the end */ ti = palloc0(sizeof(tablespaceinfo)); ti->size = -1; - tablespaces = lappend(tablespaces, ti); + state.tablespaces = lappend(state.tablespaces, ti); /* * Calculate the total backup size by summing up the size of each @@ -337,100 +297,53 @@ perform_base_backup(basebackup_options *opt) */ if (opt->progress) { - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE); + basebackup_progress_estimate_backup_size(); - foreach(lc, tablespaces) + foreach(lc, state.tablespaces) { tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc); if (tmp->path == NULL) - tmp->size = sendDir(".", 1, true, tablespaces, true, NULL, - NULL); + tmp->size = sendDir(sink, ".", 1, true, state.tablespaces, + true, NULL, NULL); else - tmp->size = sendTablespace(tmp->path, tmp->oid, true, + tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true, NULL); - backup_total += tmp->size; + state.bytes_total += tmp->size; } + state.bytes_total_is_valid = true; } - /* Report that we are now streaming database files as a base backup */ - { - const int index[] = { - PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_BACKUP_TOTAL, - PROGRESS_BASEBACKUP_TBLSPC_TOTAL - }; - const int64 val[] = { - PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP, - backup_total, list_length(tablespaces) - }; - - pgstat_progress_update_multi_param(3, index, val); - } - - /* Send the starting position of the backup */ - SendXlogRecPtrResult(startptr, starttli); - - /* Send tablespace header */ - SendBackupHeader(tablespaces); - - /* Setup and activate network throttling, if client requested it */ - if (opt->maxrate > 0) - { - throttling_sample = - (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY; - - /* - * The minimum amount of time for throttling_sample bytes to be - * transferred. - */ - elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; - - /* Enable throttling. */ - throttling_counter = 0; - - /* The 'real data' starts now (header was ignored). */ - throttled_last = GetCurrentTimestamp(); - } - else - { - /* Disable throttling. */ - throttling_counter = -1; - } + /* notify basebackup sink about start of backup */ + bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH); /* Send off our tablespaces one by one */ - foreach(lc, tablespaces) + foreach(lc, state.tablespaces) { tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc); - StringInfoData buf; - - /* Send CopyOutResponse message */ - pq_beginmessage(&buf, 'H'); - pq_sendbyte(&buf, 0); /* overall format */ - pq_sendint16(&buf, 0); /* natts */ - pq_endmessage(&buf); if (ti->path == NULL) { struct stat statbuf; bool sendtblspclinks = true; + bbsink_begin_archive(sink, "base.tar"); + /* In the main tar, include the backup_label first... */ - sendFileWithContent(BACKUP_LABEL_FILE, labelfile->data, + sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data, &manifest); /* Then the tablespace_map file, if required... */ if (opt->sendtblspcmapfile) { - sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data, + sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data, &manifest); sendtblspclinks = false; } /* Then the bulk of the files... */ - sendDir(".", 1, false, tablespaces, sendtblspclinks, - &manifest, NULL); + sendDir(sink, ".", 1, false, state.tablespaces, + sendtblspclinks, &manifest, NULL); /* ... and pg_control after everything else. */ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0) @@ -438,32 +351,33 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", XLOG_CONTROL_FILE))); - sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, + sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, &manifest, NULL); } else - sendTablespace(ti->path, ti->oid, false, &manifest); + { + char *archive_name = psprintf("%s.tar", ti->oid); + + bbsink_begin_archive(sink, archive_name); + + sendTablespace(sink, ti->path, ti->oid, false, &manifest); + } /* * If we're including WAL, and this is the main data directory we - * don't terminate the tar stream here. Instead, we will append - * the xlog files below and terminate it then. This is safe since - * the main data directory is always sent *last*. + * don't treat this as the end of the tablespace. Instead, we will + * include the xlog files below and stop afterwards. This is safe + * since the main data directory is always sent *last*. */ if (opt->includewal && ti->path == NULL) { - Assert(lnext(tablespaces, lc) == NULL); + Assert(lnext(state.tablespaces, lc) == NULL); } else - pq_putemptymessage('c'); /* CopyDone */ - - tblspc_streamed++; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED, - tblspc_streamed); + bbsink_end_archive(sink); } - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE); + basebackup_progress_wait_wal_archive(&state); endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli); } PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false)); @@ -489,8 +403,7 @@ perform_base_backup(basebackup_options *opt) ListCell *lc; TimeLineID tli; - pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE, - PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL); + basebackup_progress_transfer_wal(); /* * I'd rather not worry about timelines here, so scan pg_wal and @@ -501,8 +414,8 @@ perform_base_backup(basebackup_options *opt) * shouldn't be such files, but if there are, there's little harm in * including them. */ - XLByteToSeg(startptr, startsegno, wal_segment_size); - XLogFileName(firstoff, starttli, startsegno, wal_segment_size); + XLByteToSeg(state.startptr, startsegno, wal_segment_size); + XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size); XLByteToPrevSeg(endptr, endsegno, wal_segment_size); XLogFileName(lastoff, endtli, endsegno, wal_segment_size); @@ -528,7 +441,7 @@ perform_base_backup(basebackup_options *opt) * Before we go any further, check that none of the WAL segments we * need were removed. */ - CheckXLogRemoved(startsegno, starttli); + CheckXLogRemoved(startsegno, state.starttli); /* * Sort the WAL filenames. We want to send the files in order from @@ -555,7 +468,7 @@ perform_base_backup(basebackup_options *opt) { char startfname[MAXFNAMELEN]; - XLogFileName(startfname, starttli, startsegno, + XLogFileName(startfname, state.starttli, startsegno, wal_segment_size); ereport(ERROR, (errmsg("could not find WAL file \"%s\"", startfname))); @@ -590,7 +503,6 @@ perform_base_backup(basebackup_options *opt) { char *walFileName = (char *) lfirst(lc); int fd; - char buf[TAR_SEND_SIZE]; size_t cnt; pgoff_t len = 0; @@ -629,22 +541,17 @@ perform_base_backup(basebackup_options *opt) } /* send the WAL file itself */ - _tarWriteHeader(pathbuf, NULL, &statbuf, false); + _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false); - while ((cnt = basebackup_read_file(fd, buf, - Min(sizeof(buf), + while ((cnt = basebackup_read_file(fd, sink->bbs_buffer, + Min(sink->bbs_buffer_length, wal_segment_size - len), len, pathbuf, true)) > 0) { CheckXLogRemoved(segno, tli); - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); - update_basebackup_progress(cnt); + bbsink_archive_contents(sink, cnt); len += cnt; - throttle(cnt); if (len == wal_segment_size) break; @@ -673,7 +580,7 @@ perform_base_backup(basebackup_options *opt) * complete segment. */ StatusFilePath(pathbuf, walFileName, ".done"); - sendFileWithContent(pathbuf, "", &manifest); + sendFileWithContent(sink, pathbuf, "", &manifest); } /* @@ -696,23 +603,23 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf))); - sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, + sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid, &manifest, NULL); /* unconditionally mark file as archived */ StatusFilePath(pathbuf, fname, ".done"); - sendFileWithContent(pathbuf, "", &manifest); + sendFileWithContent(sink, pathbuf, "", &manifest); } - /* Send CopyDone message for the last tar file */ - pq_putemptymessage('c'); + bbsink_end_archive(sink); } - AddWALInfoToBackupManifest(&manifest, startptr, starttli, endptr, endtli); + AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli, + endptr, endtli); - SendBackupManifest(&manifest); + SendBackupManifest(&manifest, sink); - SendXlogRecPtrResult(endptr, endtli); + bbsink_end_backup(sink, endptr, endtli); if (total_checksum_failures) { @@ -738,7 +645,7 @@ perform_base_backup(basebackup_options *opt) /* clean up the resource owner we created */ WalSndResourceCleanup(true); - pgstat_progress_end_command(); + basebackup_progress_done(); } /* @@ -943,6 +850,7 @@ void SendBaseBackup(BaseBackupCmd *cmd) { basebackup_options opt; + bbsink *sink; parse_basebackup_options(cmd->options, &opt); @@ -957,158 +865,40 @@ SendBaseBackup(BaseBackupCmd *cmd) set_ps_display(activitymsg); } - perform_base_backup(&opt); -} - -static void -send_int8_string(StringInfoData *buf, int64 intval) -{ - char is[32]; - - sprintf(is, INT64_FORMAT, intval); - pq_sendint32(buf, strlen(is)); - pq_sendbytes(buf, is, strlen(is)); -} - -static void -SendBackupHeader(List *tablespaces) -{ - StringInfoData buf; - ListCell *lc; - - /* Construct and send the directory information */ - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 3); /* 3 fields */ - - /* First field - spcoid */ - pq_sendstring(&buf, "spcoid"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, OIDOID); /* type oid */ - pq_sendint16(&buf, 4); /* typlen */ - pq_sendint32(&buf, 0); /* typmod */ - pq_sendint16(&buf, 0); /* format code */ - - /* Second field - spclocation */ - pq_sendstring(&buf, "spclocation"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, TEXTOID); - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - /* Third field - size */ - pq_sendstring(&buf, "size"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, INT8OID); - pq_sendint16(&buf, 8); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - - foreach(lc, tablespaces) - { - tablespaceinfo *ti = lfirst(lc); - - /* Send one datarow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 3); /* number of columns */ - if (ti->path == NULL) - { - pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ - pq_sendint32(&buf, -1); - } - else - { - Size len; - - len = strlen(ti->oid); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->oid, len); - - len = strlen(ti->path); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->path, len); - } - if (ti->size >= 0) - send_int8_string(&buf, ti->size / 1024); - else - pq_sendint32(&buf, -1); /* NULL */ + /* Create a basic basebackup sink. */ + sink = bbsink_copytblspc_new(); - pq_endmessage(&buf); - } + /* Set up network throttling, if client requested it */ + if (opt.maxrate > 0) + sink = bbsink_throttle_new(sink, opt.maxrate); - /* Send a CommandComplete message */ - pq_puttextmessage('C', "SELECT"); -} - -/* - * Send a single resultset containing just a single - * XLogRecPtr record (in text format) - */ -static void -SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) -{ - StringInfoData buf; - char str[MAXFNAMELEN]; - Size len; - - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 2); /* 2 fields */ - - /* Field headers */ - pq_sendstring(&buf, "recptr"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, TEXTOID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - pq_sendstring(&buf, "tli"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ + /* Set up progress reporting. */ + sink = bbsink_progress_new(sink, opt.progress); /* - * int8 may seem like a surprising data type for this, but in theory int4 - * would not be wide enough for this, as TimeLineID is unsigned. + * Perform the base backup, but make sure we clean up the bbsink even if + * an error occurs. */ - pq_sendint32(&buf, INT8OID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - - /* Data row */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 2); /* number of columns */ - - len = snprintf(str, sizeof(str), - "%X/%X", LSN_FORMAT_ARGS(ptr)); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - len = snprintf(str, sizeof(str), "%u", tli); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - pq_endmessage(&buf); - - /* Send a CommandComplete message */ - pq_puttextmessage('C', "SELECT"); + PG_TRY(); + { + perform_base_backup(&opt, sink); + } + PG_FINALLY(); + { + bbsink_cleanup(sink); + } + PG_END_TRY(); } /* * Inject a file with given name and content in the output tar stream. */ static void -sendFileWithContent(const char *filename, const char *content, +sendFileWithContent(bbsink *sink, const char *filename, const char *content, backup_manifest_info *manifest) { struct stat statbuf; - int pad, + int bytes_done = 0, len; pg_checksum_context checksum_ctx; @@ -1134,25 +924,23 @@ sendFileWithContent(const char *filename, const char *content, statbuf.st_mode = pg_file_create_mode; statbuf.st_size = len; - _tarWriteHeader(filename, NULL, &statbuf, false); - /* Send the contents as a CopyData message */ - pq_putmessage('d', content, len); - update_basebackup_progress(len); + _tarWriteHeader(sink, filename, NULL, &statbuf, false); - /* Pad to a multiple of the tar block size. */ - pad = tarPaddingBytesRequired(len); - if (pad > 0) + if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0) + elog(ERROR, "could not update checksum of file \"%s\"", + filename); + + while (bytes_done < len) { - char buf[TAR_BLOCK_SIZE]; + size_t remaining = len - bytes_done; + size_t nbytes = Min(sink->bbs_buffer_length, remaining); - MemSet(buf, 0, pad); - pq_putmessage('d', buf, pad); - update_basebackup_progress(pad); + memcpy(sink->bbs_buffer, content, nbytes); + bbsink_archive_contents(sink, nbytes); + bytes_done += nbytes; } - if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0) - elog(ERROR, "could not update checksum of file \"%s\"", - filename); + _tarWritePadding(sink, len); AddFileToBackupManifest(manifest, NULL, filename, len, (pg_time_t) statbuf.st_mtime, &checksum_ctx); @@ -1166,7 +954,7 @@ sendFileWithContent(const char *filename, const char *content, * Only used to send auxiliary tablespaces, not PGDATA. */ static int64 -sendTablespace(char *path, char *spcoid, bool sizeonly, +sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly, backup_manifest_info *manifest) { int64 size; @@ -1196,11 +984,11 @@ sendTablespace(char *path, char *spcoid, bool sizeonly, return 0; } - size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, + size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, sizeonly); /* Send all the files in the tablespace version directory */ - size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, manifest, + size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest, spcoid); return size; @@ -1219,8 +1007,8 @@ sendTablespace(char *path, char *spcoid, bool sizeonly, * as it will be sent separately in the tablespace_map file. */ static int64 -sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, - bool sendtblspclinks, backup_manifest_info *manifest, +sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest, const char *spcoid) { DIR *dir; @@ -1380,8 +1168,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name); convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); excludeFound = true; break; } @@ -1398,8 +1186,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath); convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); continue; } @@ -1412,15 +1200,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { /* If pg_wal is a symlink, write it as a directory anyway */ convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); /* * Also send archive_status directory (by hackishly reusing * statbuf from above ...). */ - size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL, + &statbuf, sizeonly); continue; /* don't recurse into pg_wal */ } @@ -1451,7 +1239,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, pathbuf))); linkpath[rllen] = '\0'; - size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath, &statbuf, sizeonly); #else @@ -1475,7 +1263,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, * Store a directory entry in the tar file so we can get the * permissions right. */ - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf, sizeonly); /* @@ -1507,7 +1295,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, skip_this_dir = true; if (!skip_this_dir) - size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, + size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, manifest, spcoid); } else if (S_ISREG(statbuf.st_mode)) @@ -1515,7 +1303,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sent = false; if (!sizeonly) - sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, + sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf, true, isDbDir ? atooid(lastDir + 1) : InvalidOid, manifest, spcoid); @@ -1592,21 +1380,19 @@ is_checksummed_file(const char *fullpath, const char *filename) * and the file did not exist. */ static bool -sendFile(const char *readfilename, const char *tarfilename, +sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid, backup_manifest_info *manifest, const char *spcoid) { int fd; BlockNumber blkno = 0; bool block_retry = false; - char buf[TAR_SEND_SIZE]; uint16 checksum; int checksum_failures = 0; off_t cnt; int i; pgoff_t len = 0; char *page; - size_t pad; PageHeader phdr; int segmentno = 0; char *segmentpath; @@ -1627,7 +1413,7 @@ sendFile(const char *readfilename, const char *tarfilename, errmsg("could not open file \"%s\": %m", readfilename))); } - _tarWriteHeader(tarfilename, NULL, statbuf, false); + _tarWriteHeader(sink, tarfilename, NULL, statbuf, false); if (!noverify_checksums && DataChecksumsEnabled()) { @@ -1668,9 +1454,11 @@ sendFile(const char *readfilename, const char *tarfilename, */ while (len < statbuf->st_size) { + size_t remaining = statbuf->st_size - len; + /* Try to read some more data. */ - cnt = basebackup_read_file(fd, buf, - Min(sizeof(buf), statbuf->st_size - len), + cnt = basebackup_read_file(fd, sink->bbs_buffer, + Min(sink->bbs_buffer_length, remaining), len, readfilename, true); /* @@ -1687,7 +1475,7 @@ sendFile(const char *readfilename, const char *tarfilename, * TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of * BLCKSZ bytes. */ - Assert(TAR_SEND_SIZE % BLCKSZ == 0); + Assert((sink->bbs_buffer_length % BLCKSZ) == 0); if (verify_checksum && (cnt % BLCKSZ != 0)) { @@ -1703,7 +1491,7 @@ sendFile(const char *readfilename, const char *tarfilename, { for (i = 0; i < cnt / BLCKSZ; i++) { - page = buf + BLCKSZ * i; + page = sink->bbs_buffer + BLCKSZ * i; /* * Only check pages which have not been modified since the @@ -1713,7 +1501,7 @@ sendFile(const char *readfilename, const char *tarfilename, * this case. We also skip completely new pages, since they * don't have a checksum yet. */ - if (!PageIsNew(page) && PageGetLSN(page) < startptr) + if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr) { checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE); phdr = (PageHeader) page; @@ -1735,7 +1523,8 @@ sendFile(const char *readfilename, const char *tarfilename, /* Reread the failed block */ reread_cnt = - basebackup_read_file(fd, buf + BLCKSZ * i, + basebackup_read_file(fd, + sink->bbs_buffer + BLCKSZ * i, BLCKSZ, len + BLCKSZ * i, readfilename, false); @@ -1782,34 +1571,29 @@ sendFile(const char *readfilename, const char *tarfilename, } } - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); - update_basebackup_progress(cnt); + bbsink_archive_contents(sink, cnt); /* Also feed it to the checksum machinery. */ - if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0) + if (pg_checksum_update(&checksum_ctx, + (uint8 *) sink->bbs_buffer, cnt) < 0) elog(ERROR, "could not update checksum of base backup"); len += cnt; - throttle(cnt); } /* If the file was truncated while we were sending it, pad it with zeros */ - if (len < statbuf->st_size) + while (len < statbuf->st_size) { - MemSet(buf, 0, sizeof(buf)); - while (len < statbuf->st_size) - { - cnt = Min(sizeof(buf), statbuf->st_size - len); - pq_putmessage('d', buf, cnt); - if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0) - elog(ERROR, "could not update checksum of base backup"); - update_basebackup_progress(cnt); - len += cnt; - throttle(cnt); - } + size_t remaining = statbuf->st_size - len; + size_t nbytes = Min(sink->bbs_buffer_length, remaining); + + MemSet(sink->bbs_buffer, 0, nbytes); + if (pg_checksum_update(&checksum_ctx, + (uint8 *) sink->bbs_buffer, + nbytes) < 0) + elog(ERROR, "could not update checksum of base backup"); + bbsink_archive_contents(sink, nbytes); + len += nbytes; } /* @@ -1817,13 +1601,7 @@ sendFile(const char *readfilename, const char *tarfilename, * of data is probably not worth throttling, and is not checksummed * because it's not actually part of the file.) */ - pad = tarPaddingBytesRequired(len); - if (pad > 0) - { - MemSet(buf, 0, pad); - pq_putmessage('d', buf, pad); - update_basebackup_progress(pad); - } + _tarWritePadding(sink, len); CloseTransientFile(fd); @@ -1846,18 +1624,28 @@ sendFile(const char *readfilename, const char *tarfilename, return true; } - static int64 -_tarWriteHeader(const char *filename, const char *linktarget, +_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget, struct stat *statbuf, bool sizeonly) { - char h[TAR_BLOCK_SIZE]; enum tarError rc; if (!sizeonly) { - rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size, - statbuf->st_mode, statbuf->st_uid, statbuf->st_gid, + /* + * As of this writing, the smallest supported block size is 1kB, which + * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a + * multiple of BLCKSZ, it should be safe to assume that the buffer is + * large enough to fit an entire tar block. We double-check by means + * of these assertions. + */ + StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ, + "BLCKSZ too small for tar block"); + Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE); + + rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget, + statbuf->st_size, statbuf->st_mode, + statbuf->st_uid, statbuf->st_gid, statbuf->st_mtime); switch (rc) @@ -1879,134 +1667,48 @@ _tarWriteHeader(const char *filename, const char *linktarget, elog(ERROR, "unrecognized tar error: %d", rc); } - pq_putmessage('d', h, sizeof(h)); - update_basebackup_progress(sizeof(h)); + bbsink_archive_contents(sink, TAR_BLOCK_SIZE); } - return sizeof(h); + return TAR_BLOCK_SIZE; } /* - * If the entry in statbuf is a link, then adjust statbuf to make it look like a - * directory, so that it will be written that way. + * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE. */ static void -convert_link_to_directory(const char *pathbuf, struct stat *statbuf) +_tarWritePadding(bbsink *sink, int len) { - /* If symlink, write it as a directory anyway */ -#ifndef WIN32 - if (S_ISLNK(statbuf->st_mode)) -#else - if (pgwin32_is_junction(pathbuf)) -#endif - statbuf->st_mode = S_IFDIR | pg_dir_create_mode; -} - -/* - * Increment the network transfer counter by the given number of bytes, - * and sleep if necessary to comply with the requested network transfer - * rate. - */ -static void -throttle(size_t increment) -{ - TimeOffset elapsed_min; - - if (throttling_counter < 0) - return; - - throttling_counter += increment; - if (throttling_counter < throttling_sample) - return; - - /* How much time should have elapsed at minimum? */ - elapsed_min = elapsed_min_unit * - (throttling_counter / throttling_sample); + int pad = tarPaddingBytesRequired(len); /* - * Since the latch could be set repeatedly because of concurrently WAL - * activity, sleep in a loop to ensure enough time has passed. + * As in _tarWriteHeader, it should be safe to assume that the buffer is + * large enough that we don't need to do this in multiple chunks. */ - for (;;) - { - TimeOffset elapsed, - sleep; - int wait_result; - - /* Time elapsed since the last measurement (and possible wake up). */ - elapsed = GetCurrentTimestamp() - throttled_last; - - /* sleep if the transfer is faster than it should be */ - sleep = elapsed_min - elapsed; - if (sleep <= 0) - break; - - ResetLatch(MyLatch); - - /* We're eating a potentially set latch, so check for interrupts */ - CHECK_FOR_INTERRUPTS(); + Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE); + Assert(pad <= TAR_BLOCK_SIZE); - /* - * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be - * the maximum time to sleep. Thus the cast to long is safe. - */ - wait_result = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - (long) (sleep / 1000), - WAIT_EVENT_BASE_BACKUP_THROTTLE); - - if (wait_result & WL_LATCH_SET) - CHECK_FOR_INTERRUPTS(); - - /* Done waiting? */ - if (wait_result & WL_TIMEOUT) - break; + if (pad > 0) + { + MemSet(sink->bbs_buffer, 0, pad); + bbsink_archive_contents(sink, pad); } - - /* - * As we work with integers, only whole multiple of throttling_sample was - * processed. The rest will be done during the next call of this function. - */ - throttling_counter %= throttling_sample; - - /* - * Time interval for the remaining amount and possible next increments - * starts now. - */ - throttled_last = GetCurrentTimestamp(); } /* - * Increment the counter for the amount of data already streamed - * by the given number of bytes, and update the progress report for - * pg_stat_progress_basebackup. + * If the entry in statbuf is a link, then adjust statbuf to make it look like a + * directory, so that it will be written that way. */ static void -update_basebackup_progress(int64 delta) +convert_link_to_directory(const char *pathbuf, struct stat *statbuf) { - const int index[] = { - PROGRESS_BASEBACKUP_BACKUP_STREAMED, - PROGRESS_BASEBACKUP_BACKUP_TOTAL - }; - int64 val[2]; - int nparam = 0; - - backup_streamed += delta; - val[nparam++] = backup_streamed; - - /* - * Avoid overflowing past 100% or the full size. This may make the total - * size number change as we approach the end of the backup (the estimate - * will always be wrong if WAL is included), but that's better than having - * the done column be bigger than the total. - */ - if (backup_total > -1 && backup_streamed > backup_total) - { - backup_total = backup_streamed; - val[nparam++] = backup_total; - } - - pgstat_progress_update_multi_param(nparam, index, val); + /* If symlink, write it as a directory anyway */ +#ifndef WIN32 + if (S_ISLNK(statbuf->st_mode)) +#else + if (pgwin32_is_junction(pathbuf)) +#endif + statbuf->st_mode = S_IFDIR | pg_dir_create_mode; } /* |