diff options
| -rw-r--r-- | src/backend/commands/subscriptioncmds.c | 8 | ||||
| -rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 119 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/walreceiver.c | 2 | ||||
| -rw-r--r-- | src/include/replication/walreceiver.h | 21 | 
6 files changed, 114 insertions, 40 deletions
| diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b647a81fc86..a400ba0e40c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -759,7 +759,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,  		/* Try to connect to the publisher. */  		must_use_password = !superuser_arg(owner) && opts.passwordrequired; -		wrconn = walrcv_connect(conninfo, true, must_use_password, +		wrconn = walrcv_connect(conninfo, true, true, must_use_password,  								stmt->subname, &err);  		if (!wrconn)  			ereport(ERROR, @@ -910,7 +910,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  	/* Try to connect to the publisher. */  	must_use_password = sub->passwordrequired && !sub->ownersuperuser; -	wrconn = walrcv_connect(sub->conninfo, true, must_use_password, +	wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,  							sub->name, &err);  	if (!wrconn)  		ereport(ERROR, @@ -1537,7 +1537,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,  		/* Try to connect to the publisher. */  		must_use_password = sub->passwordrequired && !sub->ownersuperuser; -		wrconn = walrcv_connect(sub->conninfo, true, must_use_password, +		wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,  								sub->name, &err);  		if (!wrconn)  			ereport(ERROR, @@ -1788,7 +1788,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)  	 */  	load_file("libpqwalreceiver", false); -	wrconn = walrcv_connect(conninfo, true, must_use_password, +	wrconn = walrcv_connect(conninfo, true, true, must_use_password,  							subname, &err);  	if (wrconn == NULL)  	{ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2439733b55b..9270d7b855b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -48,7 +48,8 @@ struct WalReceiverConn  /* Prototypes for interface functions */  static WalReceiverConn *libpqrcv_connect(const char *conninfo, -										 bool logical, bool must_use_password, +										 bool replication, bool logical, +										 bool must_use_password,  										 const char *appname, char **err);  static void libpqrcv_check_conninfo(const char *conninfo,  									bool must_use_password); @@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,  									char **sender_host, int *sender_port);  static char *libpqrcv_identify_system(WalReceiverConn *conn,  									  TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);  static int	libpqrcv_server_version(WalReceiverConn *conn);  static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,  											 TimeLineID tli, char **filename, @@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {  	.walrcv_send = libpqrcv_send,  	.walrcv_create_slot = libpqrcv_create_slot,  	.walrcv_alter_slot = libpqrcv_alter_slot, +	.walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,  	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,  	.walrcv_exec = libpqrcv_exec,  	.walrcv_disconnect = libpqrcv_disconnect @@ -121,7 +124,11 @@ _PG_init(void)  }  /* - * Establish the connection to the primary server for XLOG streaming + * Establish the connection to the primary server. + * + * This function can be used for both replication and regular connections. + * If it is a replication connection, it could be either logical or physical + * based on input argument 'logical'.   *   * If an error occurs, this function will normally return NULL and set *err   * to a palloc'ed error message. However, if must_use_password is true and @@ -132,8 +139,8 @@ _PG_init(void)   * case.   */  static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, -				 const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool replication, bool logical, +				 bool must_use_password, const char *appname, char **err)  {  	WalReceiverConn *conn;  	PostgresPollingStatusType status; @@ -156,36 +163,46 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,  	 */  	keys[i] = "dbname";  	vals[i] = conninfo; -	keys[++i] = "replication"; -	vals[i] = logical ? "database" : "true"; -	if (!logical) + +	/* We can not have logical without replication */ +	Assert(replication || !logical); + +	if (replication)  	{ -		/* -		 * The database name is ignored by the server in replication mode, but -		 * specify "replication" for .pgpass lookup. -		 */ -		keys[++i] = "dbname"; -		vals[i] = "replication"; +		keys[++i] = "replication"; +		vals[i] = logical ? "database" : "true"; + +		if (logical) +		{ +			/* Tell the publisher to translate to our encoding */ +			keys[++i] = "client_encoding"; +			vals[i] = GetDatabaseEncodingName(); + +			/* +			 * Force assorted GUC parameters to settings that ensure that the +			 * publisher will output data values in a form that is unambiguous +			 * to the subscriber.  (We don't want to modify the subscriber's +			 * GUC settings, since that might surprise user-defined code +			 * running in the subscriber, such as triggers.)  This should +			 * match what pg_dump does. +			 */ +			keys[++i] = "options"; +			vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3"; +		} +		else +		{ +			/* +			 * The database name is ignored by the server in replication mode, +			 * but specify "replication" for .pgpass lookup. +			 */ +			keys[++i] = "dbname"; +			vals[i] = "replication"; +		}  	} +  	keys[++i] = "fallback_application_name";  	vals[i] = appname; -	if (logical) -	{ -		/* Tell the publisher to translate to our encoding */ -		keys[++i] = "client_encoding"; -		vals[i] = GetDatabaseEncodingName(); -		/* -		 * Force assorted GUC parameters to settings that ensure that the -		 * publisher will output data values in a form that is unambiguous to -		 * the subscriber.  (We don't want to modify the subscriber's GUC -		 * settings, since that might surprise user-defined code running in -		 * the subscriber, such as triggers.)  This should match what pg_dump -		 * does. -		 */ -		keys[++i] = "options"; -		vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3"; -	}  	keys[++i] = NULL;  	vals[i] = NULL; @@ -472,6 +489,50 @@ libpqrcv_server_version(WalReceiverConn *conn)  }  /* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ +	PQconninfoOption *opts; +	char	   *dbname = NULL; +	char	   *err = NULL; + +	opts = PQconninfoParse(connInfo, &err); +	if (opts == NULL) +	{ +		/* The error string is malloc'd, so we must free it explicitly */ +		char	   *errcopy = err ? pstrdup(err) : "out of memory"; + +		PQfreemem(err); +		ereport(ERROR, +				(errcode(ERRCODE_SYNTAX_ERROR), +				 errmsg("invalid connection string syntax: %s", errcopy))); +	} + +	for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) +	{ +		/* +		 * If multiple dbnames are specified, then the last one will be +		 * returned +		 */ +		if (strcmp(opt->keyword, "dbname") == 0 && opt->val && +			*opt->val) +		{ +			if (dbname) +				pfree(dbname); + +			dbname = pstrdup(opt->val); +		} +	} + +	PQconninfoFree(opts); +	return dbname; +} + +/*   * Start streaming WAL data from given streaming options.   *   * Returns true if we switched successfully to copy-both mode. False diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 5acab3f3e23..ee066290881 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)  	 * so that synchronous replication can distinguish them.  	 */  	LogRepWorkerWalRcvConn = -		walrcv_connect(MySubscription->conninfo, true, +		walrcv_connect(MySubscription->conninfo, true, true,  					   must_use_password,  					   slotname, &err);  	if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 32ff4c03364..9dd2446fbfd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4519,7 +4519,7 @@ run_apply_worker()  		!MySubscription->ownersuperuser;  	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, -											must_use_password, +											true, must_use_password,  											MySubscription->name, &err);  	if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e29a6196a3e..b80447d15f1 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,7 @@ WalReceiverMain(void)  	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);  	/* Establish the connection to the primary for XLOG streaming */ -	wrconn = walrcv_connect(conninfo, false, false, +	wrconn = walrcv_connect(conninfo, true, false, false,  							cluster_name[0] ? cluster_name : "walreceiver",  							&err);  	if (!wrconn) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba16..b906bb5ce83 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -228,8 +228,10 @@ typedef struct WalRcvExecResult  /*   * walrcv_connect_fn   * - * Establish connection to a cluster.  'logical' is true if the - * connection is logical, and false if the connection is physical. + * Establish connection to a cluster.  'replication' is true if the + * connection is a replication connection, and false if it is a + * regular connection.  If it is a replication connection, it could + * be either logical or physical based on input argument 'logical'.   * 'appname' is a name associated to the connection, to use for example   * with fallback_application_name or application_name.  Returns the   * details about the connection established, as defined by @@ -237,6 +239,7 @@ typedef struct WalRcvExecResult   * returned with 'err' including the error generated.   */  typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, +											   bool replication,  											   bool logical,  											   bool must_use_password,  											   const char *appname, @@ -280,6 +283,13 @@ typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,  											TimeLineID *primary_tli);  /* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the database name from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + +/*   * walrcv_server_version_fn   *   * Returns the version number of the cluster connected to. @@ -403,6 +413,7 @@ typedef struct WalReceiverFunctionsType  	walrcv_get_conninfo_fn walrcv_get_conninfo;  	walrcv_get_senderinfo_fn walrcv_get_senderinfo;  	walrcv_identify_system_fn walrcv_identify_system; +	walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;  	walrcv_server_version_fn walrcv_server_version;  	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;  	walrcv_startstreaming_fn walrcv_startstreaming; @@ -418,8 +429,8 @@ typedef struct WalReceiverFunctionsType  extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ -	WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) +#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \ +	WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)  #define walrcv_check_conninfo(conninfo, must_use_password) \  	WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)  #define walrcv_get_conninfo(conn) \ @@ -428,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;  	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)  #define walrcv_identify_system(conn, primary_tli) \  	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ +	WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)  #define walrcv_server_version(conn) \  	WalReceiverFunctions->walrcv_server_version(conn)  #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ | 
