diff options
| -rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 21 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/pg_receivexlog.c | 38 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/pg_recvlogical.c | 116 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 177 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/streamutil.h | 11 | 
5 files changed, 222 insertions, 141 deletions
| diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8b9acea9f08..0ebda9ae9e0 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -1569,8 +1569,8 @@ BaseBackup(void)  {  	PGresult   *res;  	char	   *sysidentifier; -	uint32		latesttli; -	uint32		starttli; +	TimeLineID	latesttli; +	TimeLineID	starttli;  	char	   *basebkp;  	char		escaped_label[MAXPGPATH];  	char	   *maxrate_clause = NULL; @@ -1624,23 +1624,8 @@ BaseBackup(void)  	/*  	 * Run IDENTIFY_SYSTEM so we can get the timeline  	 */ -	res = PQexec(conn, "IDENTIFY_SYSTEM"); -	if (PQresultStatus(res) != PGRES_TUPLES_OK) -	{ -		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), -				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); +	if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))  		disconnect_and_exit(1); -	} -	if (PQntuples(res) != 1 || PQnfields(res) < 3) -	{ -		fprintf(stderr, -				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), -				progname, PQntuples(res), PQnfields(res), 1, 3); -		disconnect_and_exit(1); -	} -	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0)); -	latesttli = atoi(PQgetvalue(res, 0, 1)); -	PQclear(res);  	/*  	 * Start the actual backup diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index a8b9ad3c05f..171cf431f57 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -253,13 +253,8 @@ FindStreamingStart(uint32 *tli)  static void  StreamLog(void)  { -	PGresult   *res; -	XLogRecPtr	startpos; -	uint32		starttli; -	XLogRecPtr	serverpos; -	uint32		servertli; -	uint32		hi, -				lo; +	XLogRecPtr	startpos, serverpos; +	TimeLineID	starttli, servertli;  	/*  	 * Connect in replication mode to the server @@ -280,33 +275,12 @@ StreamLog(void)  	}  	/* -	 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog -	 * position. +	 * Identify server, obtaining start LSN position and current timeline ID +	 * at the same time, necessary if not valid data can be found in the +	 * existing output directory.  	 */ -	res = PQexec(conn, "IDENTIFY_SYSTEM"); -	if (PQresultStatus(res) != PGRES_TUPLES_OK) -	{ -		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), -				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); -		disconnect_and_exit(1); -	} -	if (PQntuples(res) != 1 || PQnfields(res) < 3) -	{ -		fprintf(stderr, -				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), -				progname, PQntuples(res), PQnfields(res), 1, 3); +	if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))  		disconnect_and_exit(1); -	} -	servertli = atoi(PQgetvalue(res, 0, 1)); -	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) -	{ -		fprintf(stderr, -				_("%s: could not parse transaction log location \"%s\"\n"), -				progname, PQgetvalue(res, 0, 2)); -		disconnect_and_exit(1); -	} -	serverpos = ((uint64) hi) << 32 | lo; -	PQclear(res);  	/*  	 * Figure out where to start streaming. diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index a88ffacc06d..c48ceccf901 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -596,7 +596,6 @@ sighup_handler(int signum)  int  main(int argc, char **argv)  { -	PGresult   *res;  	static struct option long_options[] = {  /* general options */  		{"file", required_argument, NULL, 'f'}, @@ -628,6 +627,7 @@ main(int argc, char **argv)  	int			option_index;  	uint32		hi,  				lo; +	char	   *db_name;  	progname = get_progname(argv[0]);  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical")); @@ -834,124 +834,62 @@ main(int argc, char **argv)  #endif  	/* -	 * don't really need this but it actually helps to get more precise error -	 * messages about authentication, required GUCs and such without starting -	 * to loop around connection attempts lateron. +	 * Obtain a connection to server. This is not really necessary but it +	 * helps to get more precise error messages about authentification, +	 * required GUC parameters and such.  	 */ -	{ -		conn = GetConnection(); -		if (!conn) -			/* Error message already written in GetConnection() */ -			exit(1); +	conn = GetConnection(); +	if (!conn) +		/* Error message already written in GetConnection() */ +		exit(1); -		/* -		 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog -		 * position. -		 */ -		res = PQexec(conn, "IDENTIFY_SYSTEM"); -		if (PQresultStatus(res) != PGRES_TUPLES_OK) -		{ -			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), -					progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); -			disconnect_and_exit(1); -		} +	/* +	 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific +	 * replication connection. +	 */ +	if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) +		disconnect_and_exit(1); -		if (PQntuples(res) != 1 || PQnfields(res) < 4) -		{ -			fprintf(stderr, -					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), -					progname, PQntuples(res), PQnfields(res), 1, 4); -			disconnect_and_exit(1); -		} -		PQclear(res); +	if (db_name == NULL) +	{ +		fprintf(stderr, +				_("%s: failed to establish database specific replication connection\n"), +				progname); +		disconnect_and_exit(1);  	} - -	/* -	 * drop a replication slot -	 */ +	/* Drop a replication slot. */  	if (do_drop_slot)  	{ -		char		query[256]; -  		if (verbose)  			fprintf(stderr,  					_("%s: dropping replication slot \"%s\"\n"),  					progname, replication_slot); -		snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", -				 replication_slot); -		res = PQexec(conn, query); -		if (PQresultStatus(res) != PGRES_COMMAND_OK) -		{ -			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), -					progname, query, PQerrorMessage(conn)); +		if (!DropReplicationSlot(conn, replication_slot))  			disconnect_and_exit(1); -		} - -		if (PQntuples(res) != 0 || PQnfields(res) != 0) -		{ -			fprintf(stderr, -					_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), -					progname, replication_slot, PQntuples(res), PQnfields(res), 0, 0); -			disconnect_and_exit(1); -		} - -		PQclear(res); -		disconnect_and_exit(0);  	} -	/* -	 * create a replication slot -	 */ +	/* Create a replication slot. */  	if (do_create_slot)  	{ -		char		query[256]; -  		if (verbose)  			fprintf(stderr,  					_("%s: creating replication slot \"%s\"\n"),  					progname, replication_slot); -		snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", -				 replication_slot, plugin); - -		res = PQexec(conn, query); -		if (PQresultStatus(res) != PGRES_TUPLES_OK) -		{ -			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), -					progname, query, PQerrorMessage(conn)); +		if (!CreateReplicationSlot(conn, replication_slot, plugin, +								   &startpos, false))  			disconnect_and_exit(1); -		} - -		if (PQntuples(res) != 1 || PQnfields(res) != 4) -		{ -			fprintf(stderr, -					_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), -					progname, replication_slot, PQntuples(res), PQnfields(res), 1, 4); -			disconnect_and_exit(1); -		} - -		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) -		{ -			fprintf(stderr, -					_("%s: could not parse transaction log location \"%s\"\n"), -					progname, PQgetvalue(res, 0, 1)); -			disconnect_and_exit(1); -		} -		startpos = ((uint64) hi) << 32 | lo; - -		replication_slot = strdup(PQgetvalue(res, 0, 0)); -		PQclear(res);  	} -  	if (!do_start_slot)  		disconnect_and_exit(0); +	/* Stream loop */  	while (true)  	{ -		StreamLog(); +		StreamLogicalLog();  		if (time_to_abort)  		{  			/* diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 1100260c05a..2f4bac95508 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -27,6 +27,7 @@  #include "receivelog.h"  #include "streamutil.h" +#include "pqexpbuffer.h"  #include "common/fe_memutils.h"  #include "datatype/timestamp.h" @@ -227,11 +228,183 @@ GetConnection(void)  	return tmpconn;  } +/* + * Run IDENTIFY_SYSTEM through a given connection and give back to caller + * some result information if requested: + * - Start LSN position + * - Current timeline ID + * - System identifier + * - Plugin name + */ +bool +RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, +				  XLogRecPtr *startpos, char **db_name) +{ +	PGresult   *res; +	uint32		hi, lo; + +	/* Check connection existence */ +	Assert(conn != NULL); + +	res = PQexec(conn, "IDENTIFY_SYSTEM"); +	if (PQresultStatus(res) != PGRES_TUPLES_OK) +	{ +		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), +				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); +		return false; +	} +	if (PQntuples(res) != 1 || PQnfields(res) < 3) +	{ +		fprintf(stderr, +				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), +				progname, PQntuples(res), PQnfields(res), 1, 3); +		return false; +	} + +	/* Get system identifier */ +	if (sysid != NULL) +		*sysid = pg_strdup(PQgetvalue(res, 0, 0)); + +	/* Get timeline ID to start streaming from */ +	if (starttli != NULL) +		*starttli = atoi(PQgetvalue(res, 0, 1)); + +	/* Get LSN start position if necessary */ +	if (startpos != NULL) +	{ +		if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) +		{ +			fprintf(stderr, +					_("%s: could not parse transaction log location \"%s\"\n"), +					progname, PQgetvalue(res, 0, 2)); +			return false; +		} +		*startpos = ((uint64) hi) << 32 | lo; +	} + +	/* Get database name, only available in 9.4 and newer versions */ +	if  (db_name != NULL) +	{ +		if (PQnfields(res) < 4) +			fprintf(stderr, +					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"), +					progname, PQntuples(res), PQnfields(res), 1, 4); + +		if (PQgetisnull(res, 0, 3)) +			*db_name =  NULL; +		else +			*db_name = pg_strdup(PQgetvalue(res, 0, 3)); +	} + +	PQclear(res); +	return true; +} + +/* + * Create a replication slot for the given connection. This function + * returns true in case of success as well as the start position + * obtained after the slot creation. + */ +bool +CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, +					  XLogRecPtr *startpos, bool is_physical) +{ +	PQExpBuffer query; +	PGresult   *res; + +	query = createPQExpBuffer(); + +	Assert((is_physical && plugin == NULL) || +		   (!is_physical && plugin != NULL)); +	Assert(slot_name != NULL); + +	/* Build query */ +	if (is_physical) +		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", +						  slot_name); +	else +		appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", +						  slot_name, plugin); + +	res = PQexec(conn, query->data); +	if (PQresultStatus(res) != PGRES_TUPLES_OK) +	{ +		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), +				progname, query->data, PQerrorMessage(conn)); +		return false; +	} + +	if (PQntuples(res) != 1 || PQnfields(res) != 4) +	{ +		fprintf(stderr, +				_("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), +				progname, slot_name, +				PQntuples(res), PQnfields(res), 1, 4); +		return false; +	} + +	/* Get LSN start position if necessary */ +	if (startpos != NULL) +	{ +		uint32		hi, lo; + +		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) +		{ +			fprintf(stderr, +					_("%s: could not parse transaction log location \"%s\"\n"), +					progname, PQgetvalue(res, 0, 1)); +			return false; +		} +		*startpos = ((uint64) hi) << 32 | lo; +	} + +	PQclear(res); +	return true; +} + +/* + * Drop a replication slot for the given connection. This function + * returns true in case of success. + */ +bool +DropReplicationSlot(PGconn *conn, const char *slot_name) +{ +	PQExpBuffer query; +	PGresult   *res; + +	Assert(slot_name != NULL); + +	query = createPQExpBuffer(); + +	/* Build query */ +	appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", +					  slot_name); +	res = PQexec(conn, query->data); +	if (PQresultStatus(res) != PGRES_COMMAND_OK) +	{ +		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), +				progname, query->data, PQerrorMessage(conn)); +		return false; +	} + +	if (PQntuples(res) != 0 || PQnfields(res) != 0) +	{ +		fprintf(stderr, +				_("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"), +				progname, slot_name, +				PQntuples(res), PQnfields(res), 0, 0); +		return false; +	} + +	PQclear(res); +	return true; +} +  /*   * Frontend version of GetCurrentTimestamp(), since we are not linked with - * backend code. The protocol always uses integer timestamps, regardless of - * server setting. + * backend code. The replication protocol always uses integer timestamps, + * regardless of the server setting.   */  int64  feGetCurrentTimestamp(void) diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 8c6691f9c8c..ac66145c359 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -14,6 +14,8 @@  #include "libpq-fe.h" +#include "access/xlogdefs.h" +  extern const char *progname;  extern char *connection_string;  extern char *dbhost; @@ -28,6 +30,15 @@ extern PGconn *conn;  extern PGconn *GetConnection(void); +/* Replication commands */ +extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, +								  const char *plugin, XLogRecPtr *startpos, +								  bool is_physical); +extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); +extern bool RunIdentifySystem(PGconn *conn, char **sysid, +							  TimeLineID *starttli, +							  XLogRecPtr *startpos, +							  char **db_name);  extern int64 feGetCurrentTimestamp(void);  extern void feTimestampDifference(int64 start_time, int64 stop_time,  					  long *secs, int *microsecs); | 
