diff options
author | Michael Paquier <michael@paquier.xyz> | 2021-10-26 09:30:37 +0900 |
---|---|---|
committer | Michael Paquier <michael@paquier.xyz> | 2021-10-26 09:30:37 +0900 |
commit | f61e1dd2cee6b1a1da75c2bb0ca3bc72f18748c1 (patch) | |
tree | d7b636d11bc014ae29eb2ff32b5878b4dbb3b6d6 /src/bin/pg_basebackup/streamutil.c | |
parent | 8781b0ce25e702ba4a4f032d00da7acdef8dbfe1 (diff) |
Allow pg_receivewal to stream from a slot's restart LSN
Prior to this patch, when running pg_receivewal, the streaming start
point would be the current location of the archives if anything is
found in the local directory where WAL segments are written, and
pg_receivewal would fall back to the current WAL flush location if there
are no archives, as of the result of an IDENTIFY_SYSTEM command.
If for some reason the WAL files from pg_receivewal were moved, it is
better to try a restart where we left at, which is the replication
slot's restart_lsn instead of skipping right to the current flush
location, to avoid holes in the WAL backed up. This commit changes
pg_receivewal to use the following sequence of methods to determine the
starting streaming LSN:
- Scan the local archives.
- Use the slot's restart_lsn, if supported by the backend and if a slot
is defined.
- Fallback to the current flush LSN as reported by IDENTIFY_SYSTEM.
To keep compatibility with older server versions, we only attempt to use
READ_REPLICATION_SLOT if the backend version is at least 15, and
fallback to the older behavior of streaming from the current flush
LSN if the command is not supported.
Some TAP tests are added to cover this feature.
Author: Ronan Dunklau
Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy
Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan
Diffstat (limited to 'src/bin/pg_basebackup/streamutil.c')
-rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index a9bc1ce2149..2a3e0c688fd 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -480,6 +480,103 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, } /* + * Run READ_REPLICATION_SLOT through a given connection and give back to + * caller some result information if requested for this slot: + * - Start LSN position, InvalidXLogRecPtr if unknown. + * - Current timeline ID, 0 if unknown. + * Returns false on failure, and true otherwise. + */ +bool +GetSlotInformation(PGconn *conn, const char *slot_name, + XLogRecPtr *restart_lsn, TimeLineID *restart_tli) +{ + PGresult *res; + PQExpBuffer query; + XLogRecPtr lsn_loc = InvalidXLogRecPtr; + TimeLineID tli_loc = 0; + + if (restart_lsn) + *restart_lsn = lsn_loc; + if (restart_tli) + *restart_tli = tli_loc; + + query = createPQExpBuffer(); + appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name); + res = PQexec(conn, query->data); + destroyPQExpBuffer(query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not send replication command \"%s\": %s", + "READ_REPLICATION_SLOT", PQerrorMessage(conn)); + PQclear(res); + return false; + } + + /* The command should always return precisely one tuple and three fields */ + if (PQntuples(res) != 1 || PQnfields(res) != 3) + { + pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields", + slot_name, PQntuples(res), PQnfields(res), 1, 3); + PQclear(res); + return false; + } + + /* + * When the slot doesn't exist, the command returns a tuple with NULL + * values. This checks only the slot type field. + */ + if (PQgetisnull(res, 0, 0)) + { + pg_log_error("could not find replication slot \"%s\"", slot_name); + PQclear(res); + return false; + } + + /* + * Note that this cannot happen as READ_REPLICATION_SLOT supports only + * physical slots, but play it safe. + */ + if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0) + { + pg_log_error("expected a physical replication slot, got type \"%s\" instead", + PQgetvalue(res, 0, 0)); + PQclear(res); + return false; + } + + /* restart LSN */ + if (!PQgetisnull(res, 0, 1)) + { + uint32 hi, + lo; + + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"", + PQgetvalue(res, 0, 1), slot_name); + PQclear(res); + return false; + } + lsn_loc = ((uint64) hi) << 32 | lo; + } + + /* current TLI */ + if (!PQgetisnull(res, 0, 2)) + tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2)); + + PQclear(res); + + /* Assign results if requested */ + if (restart_lsn) + *restart_lsn = lsn_loc; + if (restart_tli) + *restart_tli = tli_loc; + + return true; +} + +/* * Create a replication slot for the given connection. This function * returns true in case of success. */ |