diff options
Diffstat (limited to 'src/backend/commands')
| -rw-r--r-- | src/backend/commands/subscriptioncmds.c | 480 |
1 files changed, 384 insertions, 96 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0f54686b699..a0974d71de1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -106,12 +106,29 @@ typedef struct SubOpts XLogRecPtr lsn; } SubOpts; -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); -static void check_publications_origin(WalReceiverConn *wrconn, - List *publications, bool copydata, - bool retain_dead_tuples, char *origin, - Oid *subrel_local_oids, int subrel_count, - char *subname); +/* + * PublicationRelKind represents a relation included in a publication. + * It stores the schema-qualified relation name (rv) and its kind (relkind). + */ +typedef struct PublicationRelKind +{ + RangeVar *rv; + char relkind; +} PublicationRelKind; + +static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications); +static void check_publications_origin_tables(WalReceiverConn *wrconn, + List *publications, bool copydata, + bool retain_dead_tuples, + char *origin, + Oid *subrel_local_oids, + int subrel_count, char *subname); +static void check_publications_origin_sequences(WalReceiverConn *wrconn, + List *publications, + bool copydata, char *origin, + Oid *subrel_local_oids, + int subrel_count, + char *subname); static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); @@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + /* + * A replication origin is currently created for all subscriptions, + * including those that only contain sequences or are otherwise empty. + * + * XXX: While this is technically unnecessary, optimizing it would require + * additional logic to skip origin creation during DDL operations and + * apply workers initialization, and to handle origin creation dynamically + * when tables are added to the subscription. It is not clear whether + * preventing creation of origins is worth additional complexity. + */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); /* * Connect to remote side to execute requested commands and fetch table - * info. + * and sequence info. */ if (opts.connect) { char *err; WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; bool must_use_password; /* Try to connect to the publisher. */ @@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool has_tables = false; + List *pubrels; + char relation_state; + check_publications(wrconn, publications); - check_publications_origin(wrconn, publications, opts.copy_data, - opts.retaindeadtuples, opts.origin, - NULL, 0, stmt->subname); + check_publications_origin_tables(wrconn, publications, + opts.copy_data, + opts.retaindeadtuples, opts.origin, + NULL, 0, stmt->subname); + check_publications_origin_sequences(wrconn, publications, + opts.copy_data, opts.origin, + NULL, 0, stmt->subname); if (opts.retaindeadtuples) check_pub_dead_tuple_retention(wrconn); @@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Build local relation status info. Relations are for both tables + * and sequences from the publisher. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + pubrels = fetch_relation_list(wrconn, publications); + + foreach_ptr(PublicationRelKind, pubrelinfo, pubrels) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; + char relkind; + RangeVar *rv = pubrelinfo->rv; relid = RangeVarGetRelid(rv, AccessShareLock, false); + relkind = get_rel_relkind(relid); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), + CheckSubscriptionRelkind(relkind, pubrelinfo->relkind, rv->schemaname, rv->relname); - - AddSubscriptionRelState(subid, relid, table_state, + has_tables |= (relkind != RELKIND_SEQUENCE); + AddSubscriptionRelState(subid, relid, relation_state, InvalidXLogRecPtr, true); } @@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to * export it. + * + * XXX: Similar to origins, it is not clear whether preventing the + * slot creation for empty and sequence-only subscriptions is + * worth additional complexity. */ if (opts.create_slot) { @@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && has_tables) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications) { char *err; - List *pubrel_names; + List *pubrels = NIL; + Oid *pubrel_local_oids; List *subrel_states; + List *sub_remove_rels = NIL; Oid *subrel_local_oids; - Oid *pubrel_local_oids; + Oid *subseq_local_oids; + int subrel_count; ListCell *lc; int off; - int remove_rel_len; - int subrel_count; + int tbl_count = 0; + int seq_count = 0; Relation rel = NULL; typedef struct SubRemoveRels { Oid relid; char state; } SubRemoveRels; - SubRemoveRels *sub_remove_rels; + WalReceiverConn *wrconn; bool must_use_password; @@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); - /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + /* Get the relation list from publisher. */ + pubrels = fetch_relation_list(wrconn, sub->publications); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false); + /* Get local relation list. */ + subrel_states = GetSubscriptionRelations(sub->oid, true, true, false); subrel_count = list_length(subrel_states); /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup - * is important. + * Build qsorted arrays of local table oids and sequence oids for + * faster lookup. This can potentially contain all tables and + * sequences in the database so speed of lookup is important. + * + * We do not yet know the exact count of tables and sequences, so we + * allocate separate arrays for table OIDs and sequence OIDs based on + * the total number of relations (subrel_count). */ subrel_local_oids = palloc(subrel_count * sizeof(Oid)); - off = 0; + subseq_local_oids = palloc(subrel_count * sizeof(Oid)); foreach(lc, subrel_states) { SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - subrel_local_oids[off++] = relstate->relid; + if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE) + subseq_local_oids[seq_count++] = relstate->relid; + else + subrel_local_oids[tbl_count++] = relstate->relid; } - qsort(subrel_local_oids, subrel_count, - sizeof(Oid), oid_cmp); - check_publications_origin(wrconn, sub->publications, copy_data, - sub->retaindeadtuples, sub->origin, - subrel_local_oids, subrel_count, sub->name); + qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp); + check_publications_origin_tables(wrconn, sub->publications, copy_data, + sub->retaindeadtuples, sub->origin, + subrel_local_oids, tbl_count, + sub->name); - /* - * Rels that we want to remove from subscription and drop any slots - * and origins corresponding to them. - */ - sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); + qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp); + check_publications_origin_sequences(wrconn, sub->publications, + copy_data, sub->origin, + subseq_local_oids, seq_count, + sub->name); /* - * Walk over the remote tables and try to match them to locally known - * tables. If the table is not known locally create a new state for - * it. + * Walk over the remote relations and try to match them to locally + * known relations. If the relation is not known locally create a new + * state for it. * - * Also builds array of local oids of remote tables for the next step. + * Also builds array of local oids of remote relations for the next + * step. */ off = 0; - pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid)); - foreach(lc, pubrel_names) + foreach_ptr(PublicationRelKind, pubrelinfo, pubrels) { - RangeVar *rv = (RangeVar *) lfirst(lc); + RangeVar *rv = pubrelinfo->rv; Oid relid; + char relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); + relkind = get_rel_relkind(relid); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), + CheckSubscriptionRelkind(relkind, pubrelinfo->relkind, rv->schemaname, rv->relname); pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - subrel_count, sizeof(Oid), oid_cmp)) + tbl_count, sizeof(Oid), oid_cmp) && + !bsearch(&relid, subseq_local_oids, + seq_count, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr, true); ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + rv->schemaname, rv->relname, sub->name)); } } @@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Next remove state for tables we should not care about anymore using * the data we collected above */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp); - remove_rel_len = 0; - for (off = 0; off < subrel_count; off++) + for (off = 0; off < tbl_count; off++) { Oid relid = subrel_local_oids[off]; if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + list_length(pubrels), sizeof(Oid), oid_cmp)) { char state; XLogRecPtr statelsn; + SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels)); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Last known rel state. */ state = GetSubscriptionRelState(sub->oid, relid, &statelsn); - sub_remove_rels[remove_rel_len].relid = relid; - sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); + remove_rel->relid = relid; + remove_rel->state = state; + + sub_remove_rels = lappend(sub_remove_rels, remove_rel); + logicalrep_worker_stop(sub->oid, relid); /* @@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * to be at the end because otherwise if there is an error while doing * the database operations we won't be able to rollback dropped slots. */ - for (off = 0; off < remove_rel_len; off++) + foreach_ptr(SubRemoveRels, rel, sub_remove_rels) { - if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_SYNCDONE) { char syncslotname[NAMEDATALEN] = {0}; @@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * dropped slots and fail. For these reasons, we allow * missing_ok = true for the drop. */ - ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, + ReplicationSlotNameForTablesync(sub->oid, rel->relid, syncslotname, sizeof(syncslotname)); ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } + + /* + * Next remove state for sequences we should not care about anymore + * using the data we collected above + */ + for (off = 0; off < seq_count; off++) + { + Oid relid = subseq_local_oids[off]; + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrels), sizeof(Oid), oid_cmp)) + { + /* + * This locking ensures that the state of rels won't change + * till we are done with this refresh operation. + */ + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + + RemoveSubscriptionRel(sub->oid, relid); + + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } } PG_FINALLY(); { @@ -1098,6 +1182,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* + * Marks all sequences with INIT state. + */ +static void +AlterSubscription_refresh_seq(Subscription *sub) +{ + char *err = NULL; + WalReceiverConn *wrconn; + bool must_use_password; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + must_use_password = sub->passwordrequired && !sub->ownersuperuser; + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("subscription \"%s\" could not connect to the publisher: %s", + sub->name, err)); + + PG_TRY(); + { + List *subrel_states; + + check_publications_origin_sequences(wrconn, sub->publications, true, + sub->origin, NULL, 0, sub->name); + + /* Get local sequence list. */ + subrel_states = GetSubscriptionRelations(sub->oid, false, true, false); + foreach_ptr(SubscriptionRelState, subrel, subrel_states) + { + Oid relid = subrel->relid; + + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr, false); + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); +} + +/* * Common checks for altering failover, two_phase, and retain_dead_tuples * options. */ @@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("%s is not allowed for disabled subscriptions", + "ALTER SUBSCRIPTION ... REFRESH SEQUENCES")); + + AlterSubscription_refresh_seq(sub); + + break; + } + case ALTER_SUBSCRIPTION_SKIP: { parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); @@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (retain_dead_tuples) check_pub_dead_tuple_retention(wrconn); - check_publications_origin(wrconn, sub->publications, false, - retain_dead_tuples, origin, NULL, 0, - sub->name); + check_publications_origin_tables(wrconn, sub->publications, false, + retain_dead_tuples, origin, NULL, 0, + sub->name); if (update_failover || update_two_phase) walrcv_alter_slot(wrconn, sub->slotname, @@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionRelations(subid, true); + rstates = GetSubscriptionRelations(subid, true, false, true); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * - See comments atop worker.c for more details. */ static void -check_publications_origin(WalReceiverConn *wrconn, List *publications, - bool copydata, bool retain_dead_tuples, - char *origin, Oid *subrel_local_oids, - int subrel_count, char *subname) +check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, + bool copydata, bool retain_dead_tuples, + char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname) { WalRcvExecResult *res; StringInfoData cmd; @@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, errmsg("could not receive list of replicated tables from the publisher: %s", res->err))); - /* Process tables. */ + /* Process publications. */ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { @@ -2483,6 +2632,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, } /* + * This function is similar to check_publications_origin_tables and serves + * same purpose for sequences. + */ +static void +check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, + bool copydata, char *origin, + Oid *subrel_local_oids, int subrel_count, + char *subname) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {TEXTOID}; + List *publist = NIL; + + /* + * Enable sequence synchronization checks only when origin is 'none' , to + * ensure that sequence data from other origins is not inadvertently + * copied. + */ + if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT P.pubname AS pubname\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_sequences(P.pubname) GPS\n" + " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPS.relid AND P.pubname IN ("); + + GetPublicationsStr(publications, &cmd, true); + appendStringInfoString(&cmd, ")\n"); + + /* + * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, + * subrel_local_oids contains the list of relations that are already + * present on the subscriber. This check should be skipped as these will + * not be re-synced. + */ + for (int i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *seqname = get_rel_name(relid); + + appendStringInfo(&cmd, + "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, seqname); + } + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated sequences from the publisher: %s", + res->err))); + + /* Process publications. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + publist = list_append_unique(publist, makeString(pubname)); + } + + /* + * Log a warning if the publisher has subscribed to the same sequence from + * some other publisher. We cannot know the origin of sequences data + * during the initial sync. + */ + if (publist) + { + StringInfo pubnames = makeStringInfo(); + StringInfo err_msg = makeStringInfo(); + StringInfo err_hint = makeStringInfo(); + + /* Prepare the list of publication(s) for warning message. */ + GetPublicationsStr(publist, pubnames, false); + + appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), + subname); + appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins.")); + + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_internal("%s", err_msg->data), + errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.", + "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.", + list_length(publist), pubnames->data), + errhint_internal("%s", err_hint->data)); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* * Determine whether the retain_dead_tuples can be enabled based on the * publisher's status. * @@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, } /* - * Get the list of tables which belong to specified publications on the - * publisher connection. + * Return true iff 'rv' is a member of the list. + */ +static bool +list_member_rangevar(const List *list, RangeVar *rv) +{ + foreach_ptr(PublicationRelKind, relinfo, list) + { + if (equal(relinfo->rv, rv)) + return true; + } + + return false; +} + +/* + * Get the list of tables and sequences which belong to specified publications + * on the publisher connection. * * Note that we don't support the case where the column list is different for * the same table in different publications to avoid sending unwanted column @@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, * list and row filter are specified for different publications. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_relation_list(WalReceiverConn *wrconn, List *publications) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid}; - List *tablelist = NIL; + Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid}; + List *relationlist = NIL; int server_version = walrcv_server_version(wrconn); bool check_columnlist = (server_version >= 150000); + int column_count = check_columnlist ? 4 : 3; StringInfo pub_names = makeStringInfo(); initStringInfo(&cmd); @@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) /* Build the pub_names comma-separated string. */ GetPublicationsStr(publications, pub_names, true); - /* Get the list of tables from the publisher. */ + /* Get the list of relations from the publisher */ if (server_version >= 160000) { - tableRow[2] = INT2VECTOROID; + tableRow[3] = INT2VECTOROID; /* * From version 16, we allowed passing multiple publications to the @@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) * to worry if different publications have specified them in a * different order. See pub_collist_validate. */ - appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n" - " FROM pg_class c\n" + appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n" + " FROM pg_class c\n" " JOIN pg_namespace n ON n.oid = c.relnamespace\n" " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n" " FROM pg_publication\n" " WHERE pubname IN ( %s )) AS gpt\n" " ON gpt.relid = c.oid\n", pub_names->data); + + /* From version 19, inclusion of sequences in the target is supported */ + if (server_version >= 190000) + appendStringInfo(&cmd, + "UNION ALL\n" + " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ( %s )", + pub_names->data); } else { - tableRow[2] = NAMEARRAYOID; - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + tableRow[3] = NAMEARRAYOID; + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n"); /* Get column lists for each relation if the publisher supports it */ if (check_columnlist) @@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) destroyStringInfo(pub_names); - res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, column_count, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) char *nspname; char *relname; bool isnull; - RangeVar *rv; + char relkind; + PublicationRelKind *relinfo = palloc_object(PublicationRelKind); nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); - rv = makeRangeVar(nspname, relname, -1); + relinfo->rv = makeRangeVar(nspname, relname, -1); + relinfo->relkind = relkind; - if (check_columnlist && list_member(tablelist, rv)) + if (relkind != RELKIND_SEQUENCE && + check_columnlist && + list_member_rangevar(relationlist, relinfo->rv)) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different column lists for table \"%s.%s\" in different publications", nspname, relname)); else - tablelist = lappend(tablelist, rv); + relationlist = lappend(relationlist, relinfo); ExecClearTuple(slot); } @@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) walrcv_clear_result(res); - return tablelist; + return relationlist; } /* |
