summaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/streamutil.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/streamutil.c')
-rw-r--r--src/bin/pg_basebackup/streamutil.c177
1 files changed, 175 insertions, 2 deletions
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1100260c05a..2f4bac95508 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -27,6 +27,7 @@
#include "receivelog.h"
#include "streamutil.h"
+#include "pqexpbuffer.h"
#include "common/fe_memutils.h"
#include "datatype/timestamp.h"
@@ -227,11 +228,183 @@ GetConnection(void)
return tmpconn;
}
+/*
+ * Run IDENTIFY_SYSTEM through a given connection and give back to caller
+ * some result information if requested:
+ * - Start LSN position
+ * - Current timeline ID
+ * - System identifier
+ * - Plugin name
+ */
+bool
+RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
+ XLogRecPtr *startpos, char **db_name)
+{
+ PGresult *res;
+ uint32 hi, lo;
+
+ /* Check connection existence */
+ Assert(conn != NULL);
+
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+ return false;
+ }
+ if (PQntuples(res) != 1 || PQnfields(res) < 3)
+ {
+ fprintf(stderr,
+ _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 3);
+ return false;
+ }
+
+ /* Get system identifier */
+ if (sysid != NULL)
+ *sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+ /* Get timeline ID to start streaming from */
+ if (starttli != NULL)
+ *starttli = atoi(PQgetvalue(res, 0, 1));
+
+ /* Get LSN start position if necessary */
+ if (startpos != NULL)
+ {
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse transaction log location \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ return false;
+ }
+ *startpos = ((uint64) hi) << 32 | lo;
+ }
+
+ /* Get database name, only available in 9.4 and newer versions */
+ if (db_name != NULL)
+ {
+ if (PQnfields(res) < 4)
+ fprintf(stderr,
+ _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 4);
+
+ if (PQgetisnull(res, 0, 3))
+ *db_name = NULL;
+ else
+ *db_name = pg_strdup(PQgetvalue(res, 0, 3));
+ }
+
+ PQclear(res);
+ return true;
+}
+
+/*
+ * Create a replication slot for the given connection. This function
+ * returns true in case of success as well as the start position
+ * obtained after the slot creation.
+ */
+bool
+CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
+ XLogRecPtr *startpos, bool is_physical)
+{
+ PQExpBuffer query;
+ PGresult *res;
+
+ query = createPQExpBuffer();
+
+ Assert((is_physical && plugin == NULL) ||
+ (!is_physical && plugin != NULL));
+ Assert(slot_name != NULL);
+
+ /* Build query */
+ if (is_physical)
+ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+ slot_name);
+ else
+ appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+ slot_name, plugin);
+
+ res = PQexec(conn, query->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query->data, PQerrorMessage(conn));
+ return false;
+ }
+
+ if (PQntuples(res) != 1 || PQnfields(res) != 4)
+ {
+ fprintf(stderr,
+ _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, slot_name,
+ PQntuples(res), PQnfields(res), 1, 4);
+ return false;
+ }
+
+ /* Get LSN start position if necessary */
+ if (startpos != NULL)
+ {
+ uint32 hi, lo;
+
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse transaction log location \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 1));
+ return false;
+ }
+ *startpos = ((uint64) hi) << 32 | lo;
+ }
+
+ PQclear(res);
+ return true;
+}
+
+/*
+ * Drop a replication slot for the given connection. This function
+ * returns true in case of success.
+ */
+bool
+DropReplicationSlot(PGconn *conn, const char *slot_name)
+{
+ PQExpBuffer query;
+ PGresult *res;
+
+ Assert(slot_name != NULL);
+
+ query = createPQExpBuffer();
+
+ /* Build query */
+ appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
+ slot_name);
+ res = PQexec(conn, query->data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query->data, PQerrorMessage(conn));
+ return false;
+ }
+
+ if (PQntuples(res) != 0 || PQnfields(res) != 0)
+ {
+ fprintf(stderr,
+ _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, slot_name,
+ PQntuples(res), PQnfields(res), 0, 0);
+ return false;
+ }
+
+ PQclear(res);
+ return true;
+}
+
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
+ * backend code. The replication protocol always uses integer timestamps,
+ * regardless of the server setting.
*/
int64
feGetCurrentTimestamp(void)