summaryrefslogtreecommitdiff
path: root/src/bin/pg_rewind/local_source.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_rewind/local_source.c')
-rw-r--r--src/bin/pg_rewind/local_source.c76
1 files changed, 66 insertions, 10 deletions
diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c
index 3406fc7037d..58699effcc4 100644
--- a/src/bin/pg_rewind/local_source.c
+++ b/src/bin/pg_rewind/local_source.c
@@ -29,8 +29,10 @@ static void local_traverse_files(rewind_source *source,
process_file_callback_t callback);
static char *local_fetch_file(rewind_source *source, const char *path,
size_t *filesize);
-static void local_fetch_file_range(rewind_source *source, const char *path,
- off_t off, size_t len);
+static void local_queue_fetch_file(rewind_source *source, const char *path,
+ size_t len);
+static void local_queue_fetch_range(rewind_source *source, const char *path,
+ off_t off, size_t len);
static void local_finish_fetch(rewind_source *source);
static void local_destroy(rewind_source *source);
@@ -43,7 +45,8 @@ init_local_source(const char *datadir)
src->common.traverse_files = local_traverse_files;
src->common.fetch_file = local_fetch_file;
- src->common.queue_fetch_range = local_fetch_file_range;
+ src->common.queue_fetch_file = local_queue_fetch_file;
+ src->common.queue_fetch_range = local_queue_fetch_range;
src->common.finish_fetch = local_finish_fetch;
src->common.get_current_wal_insert_lsn = NULL;
src->common.destroy = local_destroy;
@@ -66,11 +69,64 @@ local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
}
/*
+ * Copy a file from source to target.
+ *
+ * 'len' is the expected length of the file.
+ */
+static void
+local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
+{
+ const char *datadir = ((local_source *) source)->datadir;
+ PGAlignedBlock buf;
+ char srcpath[MAXPGPATH];
+ int srcfd;
+ size_t written_len;
+
+ snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
+
+ /* Open source file for reading */
+ srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+ if (srcfd < 0)
+ pg_fatal("could not open source file \"%s\": %m",
+ srcpath);
+
+ /* Truncate and open the target file for writing */
+ open_target_file(path, true);
+
+ written_len = 0;
+ for (;;)
+ {
+ ssize_t read_len;
+
+ read_len = read(srcfd, buf.data, sizeof(buf));
+
+ if (read_len < 0)
+ pg_fatal("could not read file \"%s\": %m", srcpath);
+ else if (read_len == 0)
+ break; /* EOF reached */
+
+ write_target_range(buf.data, written_len, read_len);
+ written_len += read_len;
+ }
+
+ /*
+ * A local source is not expected to change while we're rewinding, so
+ * check that the size of the file matches our earlier expectation.
+ */
+ if (written_len != len)
+ pg_fatal("size of source file \"%s\" changed concurrently: " UINT64_FORMAT " bytes expected, " UINT64_FORMAT " copied",
+ srcpath, len, written_len);
+
+ if (close(srcfd) != 0)
+ pg_fatal("could not close file \"%s\": %m", srcpath);
+}
+
+/*
* Copy a file from source to target, starting at 'off', for 'len' bytes.
*/
static void
-local_fetch_file_range(rewind_source *source, const char *path, off_t off,
- size_t len)
+local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
+ size_t len)
{
const char *datadir = ((local_source *) source)->datadir;
PGAlignedBlock buf;
@@ -94,14 +150,14 @@ local_fetch_file_range(rewind_source *source, const char *path, off_t off,
while (end - begin > 0)
{
ssize_t readlen;
- size_t len;
+ size_t thislen;
if (end - begin > sizeof(buf))
- len = sizeof(buf);
+ thislen = sizeof(buf);
else
- len = end - begin;
+ thislen = end - begin;
- readlen = read(srcfd, buf.data, len);
+ readlen = read(srcfd, buf.data, thislen);
if (readlen < 0)
pg_fatal("could not read file \"%s\": %m", srcpath);
@@ -120,7 +176,7 @@ static void
local_finish_fetch(rewind_source *source)
{
/*
- * Nothing to do, local_fetch_file_range() copies the ranges immediately.
+ * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
*/
}