diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
| -rw-r--r-- | src/backend/commands/subscriptioncmds.c | 480 | 
1 files changed, 384 insertions, 96 deletions
| diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0f54686b699..a0974d71de1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -106,12 +106,29 @@ typedef struct SubOpts  	XLogRecPtr	lsn;  } SubOpts; -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); -static void check_publications_origin(WalReceiverConn *wrconn, -									  List *publications, bool copydata, -									  bool retain_dead_tuples, char *origin, -									  Oid *subrel_local_oids, int subrel_count, -									  char *subname); +/* + * PublicationRelKind represents a relation included in a publication. + * It stores the schema-qualified relation name (rv) and its kind (relkind). + */ +typedef struct PublicationRelKind +{ +	RangeVar   *rv; +	char		relkind; +} PublicationRelKind; + +static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications); +static void check_publications_origin_tables(WalReceiverConn *wrconn, +											 List *publications, bool copydata, +											 bool retain_dead_tuples, +											 char *origin, +											 Oid *subrel_local_oids, +											 int subrel_count, char *subname); +static void check_publications_origin_sequences(WalReceiverConn *wrconn, +												List *publications, +												bool copydata, char *origin, +												Oid *subrel_local_oids, +												int subrel_count, +												char *subname);  static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);  static void check_duplicates_in_publist(List *publist, Datum *datums);  static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); @@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,  	recordDependencyOnOwner(SubscriptionRelationId, subid, owner); +	/* +	 * A replication origin is currently created for all subscriptions, +	 * including those that only contain sequences or are otherwise empty. +	 * +	 * XXX: While this is technically unnecessary, optimizing it would require +	 * additional logic to skip origin creation during DDL operations and +	 * apply workers initialization, and to handle origin creation dynamically +	 * when tables are added to the subscription. It is not clear whether +	 * preventing creation of origins is worth additional complexity. +	 */  	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));  	replorigin_create(originname);  	/*  	 * Connect to remote side to execute requested commands and fetch table -	 * info. +	 * and sequence info.  	 */  	if (opts.connect)  	{  		char	   *err;  		WalReceiverConn *wrconn; -		List	   *tables; -		ListCell   *lc; -		char		table_state;  		bool		must_use_password;  		/* Try to connect to the publisher. */ @@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,  		PG_TRY();  		{ +			bool		has_tables = false; +			List	   *pubrels; +			char		relation_state; +  			check_publications(wrconn, publications); -			check_publications_origin(wrconn, publications, opts.copy_data, -									  opts.retaindeadtuples, opts.origin, -									  NULL, 0, stmt->subname); +			check_publications_origin_tables(wrconn, publications, +											 opts.copy_data, +											 opts.retaindeadtuples, opts.origin, +											 NULL, 0, stmt->subname); +			check_publications_origin_sequences(wrconn, publications, +												opts.copy_data, opts.origin, +												NULL, 0, stmt->subname);  			if (opts.retaindeadtuples)  				check_pub_dead_tuple_retention(wrconn); @@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,  			 * Set sync state based on if we were asked to do data copy or  			 * not.  			 */ -			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; +			relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;  			/* -			 * Get the table list from publisher and build local table status -			 * info. +			 * Build local relation status info. Relations are for both tables +			 * and sequences from the publisher.  			 */ -			tables = fetch_table_list(wrconn, publications); -			foreach(lc, tables) +			pubrels = fetch_relation_list(wrconn, publications); + +			foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)  			{ -				RangeVar   *rv = (RangeVar *) lfirst(lc);  				Oid			relid; +				char		relkind; +				RangeVar   *rv = pubrelinfo->rv;  				relid = RangeVarGetRelid(rv, AccessShareLock, false); +				relkind = get_rel_relkind(relid);  				/* Check for supported relkind. */ -				CheckSubscriptionRelkind(get_rel_relkind(relid), +				CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,  										 rv->schemaname, rv->relname); - -				AddSubscriptionRelState(subid, relid, table_state, +				has_tables |= (relkind != RELKIND_SEQUENCE); +				AddSubscriptionRelState(subid, relid, relation_state,  										InvalidXLogRecPtr, true);  			} @@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,  			 * If requested, create permanent slot for the subscription. We  			 * won't use the initial snapshot for anything, so no need to  			 * export it. +			 * +			 * XXX: Similar to origins, it is not clear whether preventing the +			 * slot creation for empty and sequence-only subscriptions is +			 * worth additional complexity.  			 */  			if (opts.create_slot)  			{ @@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,  				 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH  				 * PUBLICATION to work.  				 */ -				if (opts.twophase && !opts.copy_data && tables != NIL) +				if (opts.twophase && !opts.copy_data && has_tables)  					twophase_enabled = true;  				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  						  List *validate_publications)  {  	char	   *err; -	List	   *pubrel_names; +	List	   *pubrels = NIL; +	Oid		   *pubrel_local_oids;  	List	   *subrel_states; +	List	   *sub_remove_rels = NIL;  	Oid		   *subrel_local_oids; -	Oid		   *pubrel_local_oids; +	Oid		   *subseq_local_oids; +	int			subrel_count;  	ListCell   *lc;  	int			off; -	int			remove_rel_len; -	int			subrel_count; +	int			tbl_count = 0; +	int			seq_count = 0;  	Relation	rel = NULL;  	typedef struct SubRemoveRels  	{  		Oid			relid;  		char		state;  	} SubRemoveRels; -	SubRemoveRels *sub_remove_rels; +  	WalReceiverConn *wrconn;  	bool		must_use_password; @@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  		if (validate_publications)  			check_publications(wrconn, validate_publications); -		/* Get the table list from publisher. */ -		pubrel_names = fetch_table_list(wrconn, sub->publications); +		/* Get the relation list from publisher. */ +		pubrels = fetch_relation_list(wrconn, sub->publications); -		/* Get local table list. */ -		subrel_states = GetSubscriptionRelations(sub->oid, false); +		/* Get local relation list. */ +		subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);  		subrel_count = list_length(subrel_states);  		/* -		 * Build qsorted array of local table oids for faster lookup. This can -		 * potentially contain all tables in the database so speed of lookup -		 * is important. +		 * Build qsorted arrays of local table oids and sequence oids for +		 * faster lookup. This can potentially contain all tables and +		 * sequences in the database so speed of lookup is important. +		 * +		 * We do not yet know the exact count of tables and sequences, so we +		 * allocate separate arrays for table OIDs and sequence OIDs based on +		 * the total number of relations (subrel_count).  		 */  		subrel_local_oids = palloc(subrel_count * sizeof(Oid)); -		off = 0; +		subseq_local_oids = palloc(subrel_count * sizeof(Oid));  		foreach(lc, subrel_states)  		{  			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); -			subrel_local_oids[off++] = relstate->relid; +			if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE) +				subseq_local_oids[seq_count++] = relstate->relid; +			else +				subrel_local_oids[tbl_count++] = relstate->relid;  		} -		qsort(subrel_local_oids, subrel_count, -			  sizeof(Oid), oid_cmp); -		check_publications_origin(wrconn, sub->publications, copy_data, -								  sub->retaindeadtuples, sub->origin, -								  subrel_local_oids, subrel_count, sub->name); +		qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp); +		check_publications_origin_tables(wrconn, sub->publications, copy_data, +										 sub->retaindeadtuples, sub->origin, +										 subrel_local_oids, tbl_count, +										 sub->name); -		/* -		 * Rels that we want to remove from subscription and drop any slots -		 * and origins corresponding to them. -		 */ -		sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); +		qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp); +		check_publications_origin_sequences(wrconn, sub->publications, +											copy_data, sub->origin, +											subseq_local_oids, seq_count, +											sub->name);  		/* -		 * Walk over the remote tables and try to match them to locally known -		 * tables. If the table is not known locally create a new state for -		 * it. +		 * Walk over the remote relations and try to match them to locally +		 * known relations. If the relation is not known locally create a new +		 * state for it.  		 * -		 * Also builds array of local oids of remote tables for the next step. +		 * Also builds array of local oids of remote relations for the next +		 * step.  		 */  		off = 0; -		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); +		pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid)); -		foreach(lc, pubrel_names) +		foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)  		{ -			RangeVar   *rv = (RangeVar *) lfirst(lc); +			RangeVar   *rv = pubrelinfo->rv;  			Oid			relid; +			char		relkind;  			relid = RangeVarGetRelid(rv, AccessShareLock, false); +			relkind = get_rel_relkind(relid);  			/* Check for supported relkind. */ -			CheckSubscriptionRelkind(get_rel_relkind(relid), +			CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,  									 rv->schemaname, rv->relname);  			pubrel_local_oids[off++] = relid;  			if (!bsearch(&relid, subrel_local_oids, -						 subrel_count, sizeof(Oid), oid_cmp)) +						 tbl_count, sizeof(Oid), oid_cmp) && +				!bsearch(&relid, subseq_local_oids, +						 seq_count, sizeof(Oid), oid_cmp))  			{  				AddSubscriptionRelState(sub->oid, relid,  										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,  										InvalidXLogRecPtr, true);  				ereport(DEBUG1, -						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", -										 rv->schemaname, rv->relname, sub->name))); +						errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", +										relkind == RELKIND_SEQUENCE ? "sequence" : "table", +										rv->schemaname, rv->relname, sub->name));  			}  		} @@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  		 * Next remove state for tables we should not care about anymore using  		 * the data we collected above  		 */ -		qsort(pubrel_local_oids, list_length(pubrel_names), -			  sizeof(Oid), oid_cmp); +		qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp); -		remove_rel_len = 0; -		for (off = 0; off < subrel_count; off++) +		for (off = 0; off < tbl_count; off++)  		{  			Oid			relid = subrel_local_oids[off];  			if (!bsearch(&relid, pubrel_local_oids, -						 list_length(pubrel_names), sizeof(Oid), oid_cmp)) +						 list_length(pubrels), sizeof(Oid), oid_cmp))  			{  				char		state;  				XLogRecPtr	statelsn; +				SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));  				/*  				 * Lock pg_subscription_rel with AccessExclusiveLock to @@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  				/* Last known rel state. */  				state = GetSubscriptionRelState(sub->oid, relid, &statelsn); -				sub_remove_rels[remove_rel_len].relid = relid; -				sub_remove_rels[remove_rel_len++].state = state; -  				RemoveSubscriptionRel(sub->oid, relid); +				remove_rel->relid = relid; +				remove_rel->state = state; + +				sub_remove_rels = lappend(sub_remove_rels, remove_rel); +  				logicalrep_worker_stop(sub->oid, relid);  				/* @@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  		 * to be at the end because otherwise if there is an error while doing  		 * the database operations we won't be able to rollback dropped slots.  		 */ -		for (off = 0; off < remove_rel_len; off++) +		foreach_ptr(SubRemoveRels, rel, sub_remove_rels)  		{ -			if (sub_remove_rels[off].state != SUBREL_STATE_READY && -				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) +			if (rel->state != SUBREL_STATE_READY && +				rel->state != SUBREL_STATE_SYNCDONE)  			{  				char		syncslotname[NAMEDATALEN] = {0}; @@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  				 * dropped slots and fail. For these reasons, we allow  				 * missing_ok = true for the drop.  				 */ -				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, +				ReplicationSlotNameForTablesync(sub->oid, rel->relid,  												syncslotname, sizeof(syncslotname));  				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);  			}  		} + +		/* +		 * Next remove state for sequences we should not care about anymore +		 * using the data we collected above +		 */ +		for (off = 0; off < seq_count; off++) +		{ +			Oid			relid = subseq_local_oids[off]; + +			if (!bsearch(&relid, pubrel_local_oids, +						 list_length(pubrels), sizeof(Oid), oid_cmp)) +			{ +				/* +				 * This locking ensures that the state of rels won't change +				 * till we are done with this refresh operation. +				 */ +				if (!rel) +					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + +				RemoveSubscriptionRel(sub->oid, relid); + +				ereport(DEBUG1, +						errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"", +										get_namespace_name(get_rel_namespace(relid)), +										get_rel_name(relid), +										sub->name)); +			} +		}  	}  	PG_FINALLY();  	{ @@ -1098,6 +1182,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,  }  /* + * Marks all sequences with INIT state. + */ +static void +AlterSubscription_refresh_seq(Subscription *sub) +{ +	char	   *err = NULL; +	WalReceiverConn *wrconn; +	bool		must_use_password; + +	/* Load the library providing us libpq calls. */ +	load_file("libpqwalreceiver", false); + +	/* Try to connect to the publisher. */ +	must_use_password = sub->passwordrequired && !sub->ownersuperuser; +	wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, +							sub->name, &err); +	if (!wrconn) +		ereport(ERROR, +				errcode(ERRCODE_CONNECTION_FAILURE), +				errmsg("subscription \"%s\" could not connect to the publisher: %s", +					   sub->name, err)); + +	PG_TRY(); +	{ +		List	   *subrel_states; + +		check_publications_origin_sequences(wrconn, sub->publications, true, +											sub->origin, NULL, 0, sub->name); + +		/* Get local sequence list. */ +		subrel_states = GetSubscriptionRelations(sub->oid, false, true, false); +		foreach_ptr(SubscriptionRelState, subrel, subrel_states) +		{ +			Oid			relid = subrel->relid; + +			UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, +									   InvalidXLogRecPtr, false); +			ereport(DEBUG1, +					errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", +									get_namespace_name(get_rel_namespace(relid)), +									get_rel_name(relid), +									sub->name)); +		} +	} +	PG_FINALLY(); +	{ +		walrcv_disconnect(wrconn); +	} +	PG_END_TRY(); +} + +/*   * Common checks for altering failover, two_phase, and retain_dead_tuples   * options.   */ @@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,  				break;  			} +		case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES: +			{ +				if (!sub->enabled) +					ereport(ERROR, +							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), +							errmsg("%s is not allowed for disabled subscriptions", +								   "ALTER SUBSCRIPTION ... REFRESH SEQUENCES")); + +				AlterSubscription_refresh_seq(sub); + +				break; +			} +  		case ALTER_SUBSCRIPTION_SKIP:  			{  				parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); @@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,  			if (retain_dead_tuples)  				check_pub_dead_tuple_retention(wrconn); -			check_publications_origin(wrconn, sub->publications, false, -									  retain_dead_tuples, origin, NULL, 0, -									  sub->name); +			check_publications_origin_tables(wrconn, sub->publications, false, +											 retain_dead_tuples, origin, NULL, 0, +											 sub->name);  			if (update_failover || update_two_phase)  				walrcv_alter_slot(wrconn, sub->slotname, @@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)  	 * the apply and tablesync workers and they can't restart because of  	 * exclusive lock on the subscription.  	 */ -	rstates = GetSubscriptionRelations(subid, true); +	rstates = GetSubscriptionRelations(subid, true, false, true);  	foreach(lc, rstates)  	{  		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)   *    - See comments atop worker.c for more details.   */  static void -check_publications_origin(WalReceiverConn *wrconn, List *publications, -						  bool copydata, bool retain_dead_tuples, -						  char *origin, Oid *subrel_local_oids, -						  int subrel_count, char *subname) +check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, +								 bool copydata, bool retain_dead_tuples, +								 char *origin, Oid *subrel_local_oids, +								 int subrel_count, char *subname)  {  	WalRcvExecResult *res;  	StringInfoData cmd; @@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,  				 errmsg("could not receive list of replicated tables from the publisher: %s",  						res->err))); -	/* Process tables. */ +	/* Process publications. */  	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);  	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))  	{ @@ -2483,6 +2632,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,  }  /* + * This function is similar to check_publications_origin_tables and serves + * same purpose for sequences. + */ +static void +check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, +									bool copydata, char *origin, +									Oid *subrel_local_oids, int subrel_count, +									char *subname) +{ +	WalRcvExecResult *res; +	StringInfoData cmd; +	TupleTableSlot *slot; +	Oid			tableRow[1] = {TEXTOID}; +	List	   *publist = NIL; + +	/* +	 * Enable sequence synchronization checks only when origin is 'none' , to +	 * ensure that sequence data from other origins is not inadvertently +	 * copied. +	 */ +	if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0) +		return; + +	initStringInfo(&cmd); +	appendStringInfoString(&cmd, +						   "SELECT DISTINCT P.pubname AS pubname\n" +						   "FROM pg_publication P,\n" +						   "     LATERAL pg_get_publication_sequences(P.pubname) GPS\n" +						   "     JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n" +						   "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" +						   "WHERE C.oid = GPS.relid AND P.pubname IN ("); + +	GetPublicationsStr(publications, &cmd, true); +	appendStringInfoString(&cmd, ")\n"); + +	/* +	 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, +	 * subrel_local_oids contains the list of relations that are already +	 * present on the subscriber. This check should be skipped as these will +	 * not be re-synced. +	 */ +	for (int i = 0; i < subrel_count; i++) +	{ +		Oid			relid = subrel_local_oids[i]; +		char	   *schemaname = get_namespace_name(get_rel_namespace(relid)); +		char	   *seqname = get_rel_name(relid); + +		appendStringInfo(&cmd, +						 "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", +						 schemaname, seqname); +	} + +	res = walrcv_exec(wrconn, cmd.data, 1, tableRow); +	pfree(cmd.data); + +	if (res->status != WALRCV_OK_TUPLES) +		ereport(ERROR, +				(errcode(ERRCODE_CONNECTION_FAILURE), +				 errmsg("could not receive list of replicated sequences from the publisher: %s", +						res->err))); + +	/* Process publications. */ +	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); +	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) +	{ +		char	   *pubname; +		bool		isnull; + +		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); +		Assert(!isnull); + +		ExecClearTuple(slot); +		publist = list_append_unique(publist, makeString(pubname)); +	} + +	/* +	 * Log a warning if the publisher has subscribed to the same sequence from +	 * some other publisher. We cannot know the origin of sequences data +	 * during the initial sync. +	 */ +	if (publist) +	{ +		StringInfo	pubnames = makeStringInfo(); +		StringInfo	err_msg = makeStringInfo(); +		StringInfo	err_hint = makeStringInfo(); + +		/* Prepare the list of publication(s) for warning message. */ +		GetPublicationsStr(publist, pubnames, false); + +		appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), +						 subname); +		appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins.")); + +		ereport(WARNING, +				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), +				errmsg_internal("%s", err_msg->data), +				errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.", +								 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.", +								 list_length(publist), pubnames->data), +				errhint_internal("%s", err_hint->data)); +	} + +	ExecDropSingleTupleTableSlot(slot); + +	walrcv_clear_result(res); +} + +/*   * Determine whether the retain_dead_tuples can be enabled based on the   * publisher's status.   * @@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,  }  /* - * Get the list of tables which belong to specified publications on the - * publisher connection. + * Return true iff 'rv' is a member of the list. + */ +static bool +list_member_rangevar(const List *list, RangeVar *rv) +{ +	foreach_ptr(PublicationRelKind, relinfo, list) +	{ +		if (equal(relinfo->rv, rv)) +			return true; +	} + +	return false; +} + +/* + * Get the list of tables and sequences which belong to specified publications + * on the publisher connection.   *   * Note that we don't support the case where the column list is different for   * the same table in different publications to avoid sending unwanted column @@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,   * list and row filter are specified for different publications.   */  static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_relation_list(WalReceiverConn *wrconn, List *publications)  {  	WalRcvExecResult *res;  	StringInfoData cmd;  	TupleTableSlot *slot; -	Oid			tableRow[3] = {TEXTOID, TEXTOID, InvalidOid}; -	List	   *tablelist = NIL; +	Oid			tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid}; +	List	   *relationlist = NIL;  	int			server_version = walrcv_server_version(wrconn);  	bool		check_columnlist = (server_version >= 150000); +	int			column_count = check_columnlist ? 4 : 3;  	StringInfo	pub_names = makeStringInfo();  	initStringInfo(&cmd); @@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)  	/* Build the pub_names comma-separated string. */  	GetPublicationsStr(publications, pub_names, true); -	/* Get the list of tables from the publisher. */ +	/* Get the list of relations from the publisher */  	if (server_version >= 160000)  	{ -		tableRow[2] = INT2VECTOROID; +		tableRow[3] = INT2VECTOROID;  		/*  		 * From version 16, we allowed passing multiple publications to the @@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)  		 * to worry if different publications have specified them in a  		 * different order. See pub_collist_validate.  		 */ -		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n" -						 "       FROM pg_class c\n" +		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n" +						 "   FROM pg_class c\n"  						 "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"  						 "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"  						 "                FROM pg_publication\n"  						 "                WHERE pubname IN ( %s )) AS gpt\n"  						 "             ON gpt.relid = c.oid\n",  						 pub_names->data); + +		/* From version 19, inclusion of sequences in the target is supported */ +		if (server_version >= 190000) +			appendStringInfo(&cmd, +							 "UNION ALL\n" +							 "  SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n" +							 "  FROM pg_catalog.pg_publication_sequences s\n" +							 "  WHERE s.pubname IN ( %s )", +							 pub_names->data);  	}  	else  	{ -		tableRow[2] = NAMEARRAYOID; -		appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); +		tableRow[3] = NAMEARRAYOID; +		appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");  		/* Get column lists for each relation if the publisher supports it */  		if (check_columnlist) @@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)  	destroyStringInfo(pub_names); -	res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); +	res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);  	pfree(cmd.data);  	if (res->status != WALRCV_OK_TUPLES) @@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)  		char	   *nspname;  		char	   *relname;  		bool		isnull; -		RangeVar   *rv; +		char		relkind; +		PublicationRelKind *relinfo = palloc_object(PublicationRelKind);  		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));  		Assert(!isnull);  		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));  		Assert(!isnull); +		relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); +		Assert(!isnull); -		rv = makeRangeVar(nspname, relname, -1); +		relinfo->rv = makeRangeVar(nspname, relname, -1); +		relinfo->relkind = relkind; -		if (check_columnlist && list_member(tablelist, rv)) +		if (relkind != RELKIND_SEQUENCE && +			check_columnlist && +			list_member_rangevar(relationlist, relinfo->rv))  			ereport(ERROR,  					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  					errmsg("cannot use different column lists for table \"%s.%s\" in different publications",  						   nspname, relname));  		else -			tablelist = lappend(tablelist, rv); +			relationlist = lappend(relationlist, relinfo);  		ExecClearTuple(slot);  	} @@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)  	walrcv_clear_result(res); -	return tablelist; +	return relationlist;  }  /* | 
