diff options
Diffstat (limited to 'src/bin/pg_basebackup/streamutil.c')
-rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 177 |
1 files changed, 175 insertions, 2 deletions
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 1100260c05a..2f4bac95508 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -27,6 +27,7 @@ #include "receivelog.h" #include "streamutil.h" +#include "pqexpbuffer.h" #include "common/fe_memutils.h" #include "datatype/timestamp.h" @@ -227,11 +228,183 @@ GetConnection(void) return tmpconn; } +/* + * Run IDENTIFY_SYSTEM through a given connection and give back to caller + * some result information if requested: + * - Start LSN position + * - Current timeline ID + * - System identifier + * - Plugin name + */ +bool +RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, + XLogRecPtr *startpos, char **db_name) +{ + PGresult *res; + uint32 hi, lo; + + /* Check connection existence */ + Assert(conn != NULL); + + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); + return false; + } + if (PQntuples(res) != 1 || PQnfields(res) < 3) + { + fprintf(stderr, + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 3); + return false; + } + + /* Get system identifier */ + if (sysid != NULL) + *sysid = pg_strdup(PQgetvalue(res, 0, 0)); + + /* Get timeline ID to start streaming from */ + if (starttli != NULL) + *starttli = atoi(PQgetvalue(res, 0, 1)); + + /* Get LSN start position if necessary */ + if (startpos != NULL) + { + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse transaction log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 2)); + return false; + } + *startpos = ((uint64) hi) << 32 | lo; + } + + /* Get database name, only available in 9.4 and newer versions */ + if (db_name != NULL) + { + if (PQnfields(res) < 4) + fprintf(stderr, + _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), + progname, PQntuples(res), PQnfields(res), 1, 4); + + if (PQgetisnull(res, 0, 3)) + *db_name = NULL; + else + *db_name = pg_strdup(PQgetvalue(res, 0, 3)); + } + + PQclear(res); + return true; +} + +/* + * Create a replication slot for the given connection. This function + * returns true in case of success as well as the start position + * obtained after the slot creation. + */ +bool +CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, + XLogRecPtr *startpos, bool is_physical) +{ + PQExpBuffer query; + PGresult *res; + + query = createPQExpBuffer(); + + Assert((is_physical && plugin == NULL) || + (!is_physical && plugin != NULL)); + Assert(slot_name != NULL); + + /* Build query */ + if (is_physical) + appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", + slot_name); + else + appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", + slot_name, plugin); + + res = PQexec(conn, query->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query->data, PQerrorMessage(conn)); + return false; + } + + if (PQntuples(res) != 1 || PQnfields(res) != 4) + { + fprintf(stderr, + _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, slot_name, + PQntuples(res), PQnfields(res), 1, 4); + return false; + } + + /* Get LSN start position if necessary */ + if (startpos != NULL) + { + uint32 hi, lo; + + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse transaction log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 1)); + return false; + } + *startpos = ((uint64) hi) << 32 | lo; + } + + PQclear(res); + return true; +} + +/* + * Drop a replication slot for the given connection. This function + * returns true in case of success. + */ +bool +DropReplicationSlot(PGconn *conn, const char *slot_name) +{ + PQExpBuffer query; + PGresult *res; + + Assert(slot_name != NULL); + + query = createPQExpBuffer(); + + /* Build query */ + appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", + slot_name); + res = PQexec(conn, query->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query->data, PQerrorMessage(conn)); + return false; + } + + if (PQntuples(res) != 0 || PQnfields(res) != 0) + { + fprintf(stderr, + _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, slot_name, + PQntuples(res), PQnfields(res), 0, 0); + return false; + } + + PQclear(res); + return true; +} + /* * Frontend version of GetCurrentTimestamp(), since we are not linked with - * backend code. The protocol always uses integer timestamps, regardless of - * server setting. + * backend code. The replication protocol always uses integer timestamps, + * regardless of the server setting. */ int64 feGetCurrentTimestamp(void) |