diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 61 | ||||
| -rw-r--r-- | src/backend/replication/repl_gram.y | 35 | ||||
| -rw-r--r-- | src/backend/replication/walsender.c | 40 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 40 | 
4 files changed, 124 insertions, 52 deletions
| diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 19ea159af4f..5c6e56a5b24 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,  	PGresult   *res;  	StringInfoData cmd;  	char	   *snapshot; +	int			use_new_options_syntax; + +	use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);  	initStringInfo(&cmd); @@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,  	if (conn->logical)  	{ -		appendStringInfoString(&cmd, " LOGICAL pgoutput"); +		appendStringInfoString(&cmd, " LOGICAL pgoutput "); +		if (use_new_options_syntax) +			appendStringInfoChar(&cmd, '(');  		if (two_phase) -			appendStringInfoString(&cmd, " TWO_PHASE"); +		{ +			appendStringInfoString(&cmd, "TWO_PHASE"); +			if (use_new_options_syntax) +				appendStringInfoString(&cmd, ", "); +			else +				appendStringInfoChar(&cmd, ' '); +		} -		switch (snapshot_action) +		if (use_new_options_syntax)  		{ -			case CRS_EXPORT_SNAPSHOT: -				appendStringInfoString(&cmd, " EXPORT_SNAPSHOT"); -				break; -			case CRS_NOEXPORT_SNAPSHOT: -				appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT"); -				break; -			case CRS_USE_SNAPSHOT: -				appendStringInfoString(&cmd, " USE_SNAPSHOT"); -				break; +			switch (snapshot_action) +			{ +				case CRS_EXPORT_SNAPSHOT: +					appendStringInfoString(&cmd, "SNAPSHOT 'export'"); +					break; +				case CRS_NOEXPORT_SNAPSHOT: +					appendStringInfoString(&cmd, "SNAPSHOT 'nothing'"); +					break; +				case CRS_USE_SNAPSHOT: +					appendStringInfoString(&cmd, "SNAPSHOT 'use'"); +					break; +			}  		} +		else +		{ +			switch (snapshot_action) +			{ +				case CRS_EXPORT_SNAPSHOT: +					appendStringInfoString(&cmd, "EXPORT_SNAPSHOT"); +					break; +				case CRS_NOEXPORT_SNAPSHOT: +					appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT"); +					break; +				case CRS_USE_SNAPSHOT: +					appendStringInfoString(&cmd, "USE_SNAPSHOT"); +					break; +			} +		} + +		if (use_new_options_syntax) +			appendStringInfoChar(&cmd, ')');  	}  	else  	{ -		appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); +		if (use_new_options_syntax) +			appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)"); +		else +			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");  	}  	res = libpqrcv_PQexec(conn->streamConn, cmd.data); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 3b59d62ed86..126380e2df7 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void);  %type <node>	plugin_opt_arg  %type <str>		opt_slot var_name ident_or_keyword  %type <boolval>	opt_temporary -%type <list>	create_slot_opt_list -%type <defelt>	create_slot_opt +%type <list>	create_slot_options create_slot_legacy_opt_list +%type <defelt>	create_slot_legacy_opt  %% @@ -243,8 +243,8 @@ base_backup_legacy_opt:  			;  create_replication_slot: -			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ -			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list +			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */ +			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options  				{  					CreateReplicationSlotCmd *cmd;  					cmd = makeNode(CreateReplicationSlotCmd); @@ -254,8 +254,8 @@ create_replication_slot:  					cmd->options = $5;  					$$ = (Node *) cmd;  				} -			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ -			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list +			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */ +			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options  				{  					CreateReplicationSlotCmd *cmd;  					cmd = makeNode(CreateReplicationSlotCmd); @@ -268,28 +268,33 @@ create_replication_slot:  				}  			; -create_slot_opt_list: -			create_slot_opt_list create_slot_opt +create_slot_options: +			'(' generic_option_list ')'			{ $$ = $2; } +			| create_slot_legacy_opt_list		{ $$ = $1; } +			; + +create_slot_legacy_opt_list: +			create_slot_legacy_opt_list create_slot_legacy_opt  				{ $$ = lappend($1, $2); }  			| /* EMPTY */  				{ $$ = NIL; }  			; -create_slot_opt: +create_slot_legacy_opt:  			K_EXPORT_SNAPSHOT  				{ -				  $$ = makeDefElem("export_snapshot", -								   (Node *)makeInteger(true), -1); +				  $$ = makeDefElem("snapshot", +								   (Node *)makeString("export"), -1);  				}  			| K_NOEXPORT_SNAPSHOT  				{ -				  $$ = makeDefElem("export_snapshot", -								   (Node *)makeInteger(false), -1); +				  $$ = makeDefElem("snapshot", +								   (Node *)makeString("nothing"), -1);  				}  			| K_USE_SNAPSHOT  				{ -				  $$ = makeDefElem("use_snapshot", -								   (Node *)makeInteger(true), -1); +				  $$ = makeDefElem("snapshot", +								   (Node *)makeString("use"), -1);  				}  			| K_RESERVE_WAL  				{ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389d..b811a5c0ef2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,  	{  		DefElem    *defel = (DefElem *) lfirst(lc); -		if (strcmp(defel->defname, "export_snapshot") == 0) +		if (strcmp(defel->defname, "snapshot") == 0)  		{ +			char	   *action; +  			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)  				ereport(ERROR,  						(errcode(ERRCODE_SYNTAX_ERROR),  						 errmsg("conflicting or redundant options"))); +			action = defGetString(defel);  			snapshot_action_given = true; -			*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT : -				CRS_NOEXPORT_SNAPSHOT; -		} -		else if (strcmp(defel->defname, "use_snapshot") == 0) -		{ -			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) + +			if (strcmp(action, "export") == 0) +				*snapshot_action = CRS_EXPORT_SNAPSHOT; +			else if (strcmp(action, "nothing") == 0) +				*snapshot_action = CRS_NOEXPORT_SNAPSHOT; +			else if (strcmp(action, "use") == 0) +				*snapshot_action = CRS_USE_SNAPSHOT; +			else  				ereport(ERROR, -						(errcode(ERRCODE_SYNTAX_ERROR), -						 errmsg("conflicting or redundant options"))); +						(errcode(ERRCODE_INVALID_PARAMETER_VALUE), +						 errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"", +								defel->defname, action))); -			snapshot_action_given = true; -			*snapshot_action = CRS_USE_SNAPSHOT;  		}  		else if (strcmp(defel->defname, "reserve_wal") == 0)  		{ @@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,  						 errmsg("conflicting or redundant options")));  			reserve_wal_given = true; -			*reserve_wal = true; +			*reserve_wal = defGetBoolean(defel);  		}  		else if (strcmp(defel->defname, "two_phase") == 0)  		{ @@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,  						(errcode(ERRCODE_SYNTAX_ERROR),  						 errmsg("conflicting or redundant options")));  			two_phase_given = true; -			*two_phase = true; +			*two_phase = defGetBoolean(defel);  		}  		else  			elog(ERROR, "unrecognized option: %s", defel->defname); @@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)  				ereport(ERROR,  				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */  						(errmsg("%s must not be called inside a transaction", -								"CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT"))); +								"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));  			need_full_snapshot = true;  		} @@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)  				ereport(ERROR,  				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */  						(errmsg("%s must be called inside a transaction", -								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); +								"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));  			if (XactIsoLevel != XACT_REPEATABLE_READ)  				ereport(ERROR,  				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */  						(errmsg("%s must be called in REPEATABLE READ isolation mode transaction", -								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); +								"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));  			if (FirstSnapshotSet)  				ereport(ERROR,  				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */  						(errmsg("%s must be called before any query", -								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); +								"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));  			if (IsSubTransaction())  				ereport(ERROR,  				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */  						(errmsg("%s must not be called in a subtransaction", -								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); +								"CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));  			need_full_snapshot = true;  		} diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index d782b81adc6..37237cd5d95 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -490,6 +490,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,  {  	PQExpBuffer query;  	PGresult   *res; +	bool		use_new_option_syntax = (PQserverVersion(conn) >= 150000);  	query = createPQExpBuffer(); @@ -498,27 +499,54 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,  	Assert(!(two_phase && is_physical));  	Assert(slot_name != NULL); -	/* Build query */ +	/* Build base portion of query */  	appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);  	if (is_temporary)  		appendPQExpBufferStr(query, " TEMPORARY");  	if (is_physical) -	{  		appendPQExpBufferStr(query, " PHYSICAL"); +	else +		appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin); + +	/* Add any requested options */ +	if (use_new_option_syntax) +		appendPQExpBufferStr(query, " ("); +	if (is_physical) +	{  		if (reserve_wal) -			appendPQExpBufferStr(query, " RESERVE_WAL"); +			AppendPlainCommandOption(query, use_new_option_syntax, +									 "RESERVE_WAL");  	}  	else  	{ -		appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);  		if (two_phase && PQserverVersion(conn) >= 150000) -			appendPQExpBufferStr(query, " TWO_PHASE"); +			AppendPlainCommandOption(query, use_new_option_syntax, +									 "TWO_PHASE");  		if (PQserverVersion(conn) >= 100000) +		{  			/* pg_recvlogical doesn't use an exported snapshot, so suppress */ -			appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT"); +			if (use_new_option_syntax) +				AppendStringCommandOption(query, use_new_option_syntax, +										   "SNAPSHOT", "nothing"); +			else +				AppendPlainCommandOption(query, use_new_option_syntax, +										 "NOEXPORT_SNAPSHOT"); +		} +	} +	if (use_new_option_syntax) +	{ +		/* Suppress option list if it would be empty, otherwise terminate */ +		if (query->data[query->len - 1] == '(') +		{ +			query->len -= 2; +			query->data[query->len] = '\0'; +		} +		else +			appendPQExpBufferChar(query, ')');  	} +	/* Now run the query */  	res = PQexec(conn, query->data);  	if (PQresultStatus(res) != PGRES_TUPLES_OK)  	{ | 
