diff options
Diffstat (limited to 'src/bin/pg_rewind/local_source.c')
-rw-r--r-- | src/bin/pg_rewind/local_source.c | 76 |
1 files changed, 66 insertions, 10 deletions
diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c index 3406fc7037d..58699effcc4 100644 --- a/src/bin/pg_rewind/local_source.c +++ b/src/bin/pg_rewind/local_source.c @@ -29,8 +29,10 @@ static void local_traverse_files(rewind_source *source, process_file_callback_t callback); static char *local_fetch_file(rewind_source *source, const char *path, size_t *filesize); -static void local_fetch_file_range(rewind_source *source, const char *path, - off_t off, size_t len); +static void local_queue_fetch_file(rewind_source *source, const char *path, + size_t len); +static void local_queue_fetch_range(rewind_source *source, const char *path, + off_t off, size_t len); static void local_finish_fetch(rewind_source *source); static void local_destroy(rewind_source *source); @@ -43,7 +45,8 @@ init_local_source(const char *datadir) src->common.traverse_files = local_traverse_files; src->common.fetch_file = local_fetch_file; - src->common.queue_fetch_range = local_fetch_file_range; + src->common.queue_fetch_file = local_queue_fetch_file; + src->common.queue_fetch_range = local_queue_fetch_range; src->common.finish_fetch = local_finish_fetch; src->common.get_current_wal_insert_lsn = NULL; src->common.destroy = local_destroy; @@ -66,11 +69,64 @@ local_fetch_file(rewind_source *source, const char *path, size_t *filesize) } /* + * Copy a file from source to target. + * + * 'len' is the expected length of the file. + */ +static void +local_queue_fetch_file(rewind_source *source, const char *path, size_t len) +{ + const char *datadir = ((local_source *) source)->datadir; + PGAlignedBlock buf; + char srcpath[MAXPGPATH]; + int srcfd; + size_t written_len; + + snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path); + + /* Open source file for reading */ + srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0); + if (srcfd < 0) + pg_fatal("could not open source file \"%s\": %m", + srcpath); + + /* Truncate and open the target file for writing */ + open_target_file(path, true); + + written_len = 0; + for (;;) + { + ssize_t read_len; + + read_len = read(srcfd, buf.data, sizeof(buf)); + + if (read_len < 0) + pg_fatal("could not read file \"%s\": %m", srcpath); + else if (read_len == 0) + break; /* EOF reached */ + + write_target_range(buf.data, written_len, read_len); + written_len += read_len; + } + + /* + * A local source is not expected to change while we're rewinding, so + * check that the size of the file matches our earlier expectation. + */ + if (written_len != len) + pg_fatal("size of source file \"%s\" changed concurrently: " UINT64_FORMAT " bytes expected, " UINT64_FORMAT " copied", + srcpath, len, written_len); + + if (close(srcfd) != 0) + pg_fatal("could not close file \"%s\": %m", srcpath); +} + +/* * Copy a file from source to target, starting at 'off', for 'len' bytes. */ static void -local_fetch_file_range(rewind_source *source, const char *path, off_t off, - size_t len) +local_queue_fetch_range(rewind_source *source, const char *path, off_t off, + size_t len) { const char *datadir = ((local_source *) source)->datadir; PGAlignedBlock buf; @@ -94,14 +150,14 @@ local_fetch_file_range(rewind_source *source, const char *path, off_t off, while (end - begin > 0) { ssize_t readlen; - size_t len; + size_t thislen; if (end - begin > sizeof(buf)) - len = sizeof(buf); + thislen = sizeof(buf); else - len = end - begin; + thislen = end - begin; - readlen = read(srcfd, buf.data, len); + readlen = read(srcfd, buf.data, thislen); if (readlen < 0) pg_fatal("could not read file \"%s\": %m", srcpath); @@ -120,7 +176,7 @@ static void local_finish_fetch(rewind_source *source) { /* - * Nothing to do, local_fetch_file_range() copies the ranges immediately. + * Nothing to do, local_queue_fetch_range() copies the ranges immediately. */ } |