diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2020-11-12 14:52:24 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2020-11-12 14:52:24 +0200 |
commit | 9c4f5192f69ed16c99e0d079f0b5faebd7bad212 (patch) | |
tree | 0f0712e87b1ef10c8133bff75c23beda573909d8 /src/bin/pg_rewind/libpq_source.c | |
parent | 1b2b19f7584b7c0025aa40862cd38c79d340be7d (diff) |
Allow pg_rewind to use a standby server as the source system.
Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.
Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.
Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
Diffstat (limited to 'src/bin/pg_rewind/libpq_source.c')
-rw-r--r-- | src/bin/pg_rewind/libpq_source.c | 287 |
1 files changed, 202 insertions, 85 deletions
diff --git a/src/bin/pg_rewind/libpq_source.c b/src/bin/pg_rewind/libpq_source.c index c73e8bf4704..47beba277a4 100644 --- a/src/bin/pg_rewind/libpq_source.c +++ b/src/bin/pg_rewind/libpq_source.c @@ -14,30 +14,51 @@ #include "datapagemap.h" #include "file_ops.h" #include "filemap.h" +#include "lib/stringinfo.h" #include "pg_rewind.h" #include "port/pg_bswap.h" #include "rewind_source.h" /* - * Files are fetched max CHUNKSIZE bytes at a time. - * - * (This only applies to files that are copied in whole, or for truncated - * files where we copy the tail. Relation files, where we know the individual - * blocks that need to be fetched, are fetched in BLCKSZ chunks.) + * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a + * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query. */ -#define CHUNKSIZE 1000000 +#define MAX_CHUNK_SIZE (1024 * 1024) +#define MAX_CHUNKS_PER_QUERY 1000 + +/* represents a request to fetch a piece of a file from the source */ +typedef struct +{ + const char *path; /* path relative to data directory root */ + off_t offset; + size_t length; +} fetch_range_request; typedef struct { rewind_source common; /* common interface functions */ PGconn *conn; - bool copy_started; + + /* + * Queue of chunks that have been requested with the queue_fetch_range() + * function, but have not been fetched from the remote server yet. + */ + int num_requests; + fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]; + + /* temporary space for process_queued_fetch_requests() */ + StringInfoData paths; + StringInfoData offsets; + StringInfoData lengths; } libpq_source; static void init_libpq_conn(PGconn *conn); static char *run_simple_query(PGconn *conn, const char *sql); static void run_simple_command(PGconn *conn, const char *sql); +static void appendArrayEscapedString(StringInfo buf, const char *str); + +static void process_queued_fetch_requests(libpq_source *src); /* public interface functions */ static void libpq_traverse_files(rewind_source *source, @@ -74,6 +95,10 @@ init_libpq_source(PGconn *conn) src->conn = conn; + initStringInfo(&src->paths); + initStringInfo(&src->offsets); + initStringInfo(&src->lengths); + return &src->common; } @@ -91,6 +116,12 @@ init_libpq_conn(PGconn *conn) run_simple_command(conn, "SET lock_timeout = 0"); run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0"); + /* + * we don't intend to do any updates, put the connection in read-only mode + * to keep us honest + */ + run_simple_command(conn, "SET default_transaction_read_only = on"); + /* secure search_path */ res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -99,17 +130,6 @@ init_libpq_conn(PGconn *conn) PQclear(res); /* - * Check that the server is not in hot standby mode. There is no - * fundamental reason that couldn't be made to work, but it doesn't - * currently because we use a temporary table. Better to check for it - * explicitly than error out, for a better error message. - */ - str = run_simple_query(conn, "SELECT pg_is_in_recovery()"); - if (strcmp(str, "f") != 0) - pg_fatal("source server must not be in recovery mode"); - pg_free(str); - - /* * Also check that full_page_writes is enabled. We can get torn pages if * a page is modified while we read it with pg_read_binary_file(), and we * rely on full page images to fix them. @@ -118,6 +138,18 @@ init_libpq_conn(PGconn *conn) if (strcmp(str, "on") != 0) pg_fatal("full_page_writes must be enabled in the source server"); pg_free(str); + + /* Prepare a statement we'll use to fetch files */ + res = PQprepare(conn, "fetch_chunks_stmt", + "SELECT path, begin,\n" + " pg_read_binary_file(path, begin, len, true) AS chunk\n" + "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)", + 3, NULL); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("could not prepare statement to fetch file contents: %s", + PQresultErrorMessage(res)); + PQclear(res); } /* @@ -283,94 +315,125 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len) { libpq_source *src = (libpq_source *) source; - uint64 begin = off; - uint64 end = off + len; /* - * On first call, create a temporary table, and start COPYing to it. - * We will load it with the list of blocks that we need to fetch. + * Does this request happen to be a continuation of the previous chunk? If + * so, merge it with the previous one. + * + * XXX: We use pointer equality to compare the path. That's good enough + * for our purposes; the caller always passes the same pointer for the + * same filename. If it didn't, we would fail to merge requests, but it + * wouldn't affect correctness. */ - if (!src->copy_started) + if (src->num_requests > 0) { - PGresult *res; + fetch_range_request *prev = &src->request_queue[src->num_requests - 1]; - run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)"); + if (prev->offset + prev->length == off && + prev->length < MAX_CHUNK_SIZE && + prev->path == path) + { + /* + * Extend the previous request to cover as much of this new + * request as possible, without exceeding MAX_CHUNK_SIZE. + */ + size_t thislen; - res = PQexec(src->conn, "COPY fetchchunks FROM STDIN"); - if (PQresultStatus(res) != PGRES_COPY_IN) - pg_fatal("could not send file list: %s", - PQresultErrorMessage(res)); - PQclear(res); + thislen = Min(len, MAX_CHUNK_SIZE - prev->length); + prev->length += thislen; - src->copy_started = true; - } + off += thislen; + len -= thislen; - /* - * Write the file range to a temporary table in the server. - * - * The range is sent to the server as a COPY formatted line, to be inserted - * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses - * the temporary table to actually fetch the data. - */ + /* + * Fall through to create new requests for any remaining 'len' + * that didn't fit in the previous chunk. + */ + } + } - /* Split the range into CHUNKSIZE chunks */ - while (end - begin > 0) + /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */ + while (len > 0) { - char linebuf[MAXPGPATH + 23]; - unsigned int len; - - /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */ - if (end - begin > CHUNKSIZE) - len = CHUNKSIZE; - else - len = (unsigned int) (end - begin); + int32 thislen; - snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len); + /* if the queue is full, perform all the work queued up so far */ + if (src->num_requests == MAX_CHUNKS_PER_QUERY) + process_queued_fetch_requests(src); - if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1) - pg_fatal("could not send COPY data: %s", - PQerrorMessage(src->conn)); + thislen = Min(len, MAX_CHUNK_SIZE); + src->request_queue[src->num_requests].path = path; + src->request_queue[src->num_requests].offset = off; + src->request_queue[src->num_requests].length = thislen; + src->num_requests++; - begin += len; + off += thislen; + len -= thislen; } } /* - * Receive all the queued chunks and write them to the target data directory. + * Fetch all the queued chunks and write them to the target data directory. */ static void libpq_finish_fetch(rewind_source *source) { - libpq_source *src = (libpq_source *) source; + process_queued_fetch_requests((libpq_source *) source); +} + +static void +process_queued_fetch_requests(libpq_source *src) +{ + const char *params[3]; PGresult *res; - const char *sql; + int chunkno; - if (PQputCopyEnd(src->conn, NULL) != 1) - pg_fatal("could not send end-of-COPY: %s", - PQerrorMessage(src->conn)); + if (src->num_requests == 0) + return; - while ((res = PQgetResult(src->conn)) != NULL) + pg_log_debug("getting %d file chunks", src->num_requests); + + /* + * The prepared statement, 'fetch_chunks_stmt', takes three arrays with + * the same length as parameters: paths, offsets and lengths. Construct + * the string representations of them. + */ + resetStringInfo(&src->paths); + resetStringInfo(&src->offsets); + resetStringInfo(&src->lengths); + + appendStringInfoChar(&src->paths, '{'); + appendStringInfoChar(&src->offsets, '{'); + appendStringInfoChar(&src->lengths, '{'); + for (int i = 0; i < src->num_requests; i++) { - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_fatal("unexpected result while sending file list: %s", - PQresultErrorMessage(res)); - PQclear(res); + fetch_range_request *rq = &src->request_queue[i]; + + if (i > 0) + { + appendStringInfoChar(&src->paths, ','); + appendStringInfoChar(&src->offsets, ','); + appendStringInfoChar(&src->lengths, ','); + } + + appendArrayEscapedString(&src->paths, rq->path); + appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset); + appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length); } + appendStringInfoChar(&src->paths, '}'); + appendStringInfoChar(&src->offsets, '}'); + appendStringInfoChar(&src->lengths, '}'); /* - * We've now copied the list of file ranges that we need to fetch to the - * temporary table. Now, actually fetch all of those ranges. + * Execute the prepared statement. */ - sql = - "SELECT path, begin,\n" - " pg_read_binary_file(path, begin, len, true) AS chunk\n" - "FROM fetchchunks\n"; + params[0] = src->paths.data; + params[1] = src->offsets.data; + params[2] = src->lengths.data; - if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1) + if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1) pg_fatal("could not send query: %s", PQerrorMessage(src->conn)); - pg_log_debug("getting file chunks"); - if (PQsetSingleRowMode(src->conn) != 1) pg_fatal("could not set libpq connection to single row mode"); @@ -382,8 +445,10 @@ libpq_finish_fetch(rewind_source *source) * chunk bytea -- file content *---- */ + chunkno = 0; while ((res = PQgetResult(src->conn)) != NULL) { + fetch_range_request *rq = &src->request_queue[chunkno]; char *filename; int filenamelen; int64 chunkoff; @@ -404,6 +469,9 @@ libpq_finish_fetch(rewind_source *source) PQresultErrorMessage(res)); } + if (chunkno > src->num_requests) + pg_fatal("received more data chunks than requested"); + /* sanity check the result set */ if (PQnfields(res) != 3 || PQntuples(res) != 1) pg_fatal("unexpected result set size while fetching remote files"); @@ -448,31 +516,74 @@ libpq_finish_fetch(rewind_source *source) * If a file has been deleted on the source, remove it on the target * as well. Note that multiple unlink() calls may happen on the same * file if multiple data chunks are associated with it, hence ignore - * unconditionally anything missing. If this file is not a relation - * data file, then it has been already truncated when creating the - * file chunk list at the previous execution of the filemap. + * unconditionally anything missing. */ if (PQgetisnull(res, 0, 2)) { pg_log_debug("received null value for chunk for file \"%s\", file has been deleted", filename); remove_target_file(filename, true); - pg_free(filename); - PQclear(res); - continue; } + else + { + pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d", + filename, chunkoff, chunksize); + + if (strcmp(filename, rq->path) != 0) + { + pg_fatal("received data for file \"%s\", when requested for \"%s\"", + filename, rq->path); + } + if (chunkoff != rq->offset) + pg_fatal("received data at offset " INT64_FORMAT " of file \"%s\", when requested for offset " INT64_FORMAT, + chunkoff, rq->path, (int64) rq->offset); - pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d", - filename, chunkoff, chunksize); + /* + * We should not receive receive more data than we requested, or + * pg_read_binary_file() messed up. We could receive less, + * though, if the file was truncated in the source after we + * checked its size. That's OK, there should be a WAL record of + * the truncation, which will get replayed when you start the + * target system for the first time after pg_rewind has completed. + */ + if (chunksize > rq->length) + pg_fatal("received more than requested for file \"%s\"", rq->path); - open_target_file(filename, false); + open_target_file(filename, false); - write_target_range(chunk, chunkoff, chunksize); + write_target_range(chunk, chunkoff, chunksize); + } pg_free(filename); PQclear(res); + chunkno++; } + if (chunkno != src->num_requests) + pg_fatal("unexpected number of data chunks received"); + + src->num_requests = 0; +} + +/* + * Escape a string to be used as element in a text array constant + */ +static void +appendArrayEscapedString(StringInfo buf, const char *str) +{ + appendStringInfoCharMacro(buf, '\"'); + while (*str) + { + char ch = *str; + + if (ch == '"' || ch == '\\') + appendStringInfoCharMacro(buf, '\\'); + + appendStringInfoCharMacro(buf, ch); + + str++; + } + appendStringInfoCharMacro(buf, '\"'); } /* @@ -521,6 +632,12 @@ libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize) static void libpq_destroy(rewind_source *source) { - pfree(source); + libpq_source *src = (libpq_source *) source; + + pfree(src->paths.data); + pfree(src->offsets.data); + pfree(src->lengths.data); + pfree(src); + /* NOTE: we don't close the connection here, as it was not opened by us. */ } |