diff options
| -rw-r--r-- | doc/src/sgml/catalogs.sgml | 29 | ||||
| -rw-r--r-- | doc/src/sgml/ref/alter_subscription.sgml | 42 | ||||
| -rw-r--r-- | src/backend/catalog/pg_subscription.c | 53 | ||||
| -rw-r--r-- | src/backend/commands/subscriptioncmds.c | 480 | ||||
| -rw-r--r-- | src/backend/executor/execReplication.c | 28 | ||||
| -rw-r--r-- | src/backend/parser/gram.y | 9 | ||||
| -rw-r--r-- | src/backend/replication/logical/proto.c | 3 | ||||
| -rw-r--r-- | src/backend/replication/logical/relation.c | 12 | ||||
| -rw-r--r-- | src/backend/replication/logical/syncutils.c | 5 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 6 | ||||
| -rw-r--r-- | src/bin/psql/tab-complete.in.c | 10 | ||||
| -rw-r--r-- | src/include/catalog/pg_subscription_rel.h | 3 | ||||
| -rw-r--r-- | src/include/executor/executor.h | 4 | ||||
| -rw-r--r-- | src/include/nodes/parsenodes.h | 1 | ||||
| -rw-r--r-- | src/test/subscription/meson.build | 1 | ||||
| -rw-r--r-- | src/test/subscription/t/036_sequences.pl | 55 | ||||
| -rw-r--r-- | src/tools/pgindent/typedefs.list | 1 |
19 files changed, 607 insertions, 139 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 9b3aae8603b..6c8a0f173c9 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8199,16 +8199,19 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l </indexterm> <para> - The catalog <structname>pg_subscription_rel</structname> contains the - state for each replicated relation in each subscription. This is a - many-to-many mapping. + The catalog <structname>pg_subscription_rel</structname> stores the + state of each replicated table and sequence for each subscription. This + is a many-to-many mapping. </para> <para> - This catalog only contains tables known to the subscription after running - either <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link> or - <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... REFRESH - PUBLICATION</command></link>. + This catalog contains tables and sequences known to the subscription + after running: + <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>, + <link linkend="sql-altersubscription-params-refresh-publication"> + <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link>, or + <link linkend="sql-altersubscription-params-refresh-sequences"> + <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>. </para> <table> @@ -8242,7 +8245,7 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>) </para> <para> - Reference to relation + Reference to table or sequence </para></entry> </row> @@ -8251,12 +8254,20 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <structfield>srsubstate</structfield> <type>char</type> </para> <para> - State code: + State code for the table or sequence. + </para> + <para> + State codes for tables: <literal>i</literal> = initialize, <literal>d</literal> = data is being copied, <literal>f</literal> = finished table copy, <literal>s</literal> = synchronized, <literal>r</literal> = ready (normal replication) + </para> + <para> + State codes for sequences: + <literal>i</literal> = initialize, + <literal>r</literal> = ready </para></entry> </row> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 12f72ba3167..8ab3b7fbd37 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICA ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH SEQUENCES ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) @@ -139,9 +140,10 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <term><literal>refresh</literal> (<type>boolean</type>)</term> <listitem> <para> - When false, the command will not try to refresh table information. - <literal>REFRESH PUBLICATION</literal> should then be executed separately. - The default is <literal>true</literal>. + When <literal>false</literal>, the command will not try to refresh + table and sequence information. <literal>REFRESH PUBLICATION</literal> + should then be executed separately. The default is + <literal>true</literal>. </para> </listitem> </varlistentry> @@ -158,7 +160,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <term><literal>REFRESH PUBLICATION</literal></term> <listitem> <para> - Fetch missing table information from publisher. This will start + Fetch missing table and sequence information from the publisher. This will start replication of tables that were added to the subscribed-to publications since <link linkend="sql-createsubscription"> <command>CREATE SUBSCRIPTION</command></link> or @@ -166,6 +168,12 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </para> <para> + The system catalog <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link> + is updated to record all tables and sequences known to the subscription, + that are still part of the publication. + </para> + + <para> <replaceable>refresh_option</replaceable> specifies additional options for the refresh operation. The supported options are: @@ -174,15 +182,20 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <term><literal>copy_data</literal> (<type>boolean</type>)</term> <listitem> <para> - Specifies whether to copy pre-existing data in the publications - that are being subscribed to when the replication starts. - The default is <literal>true</literal>. + Specifies whether to copy pre-existing data for tables and synchronize + sequences in the publications that are being subscribed to when the replication + starts. The default is <literal>true</literal>. </para> <para> Previously subscribed tables are not copied, even if a table's row filter <literal>WHERE</literal> clause has since been modified. </para> <para> + Previously subscribed sequences are not re-synchronized. To do that, + use <link linkend="sql-altersubscription-params-refresh-sequences"> + <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>. + </para> + <para> See <xref linkend="sql-createsubscription-notes"/> for details of how <literal>copy_data = true</literal> can interact with the <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link> @@ -200,6 +213,21 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </listitem> </varlistentry> + <varlistentry id="sql-altersubscription-params-refresh-sequences"> + <term><literal>REFRESH SEQUENCES</literal></term> + <listitem> + <para> + Re-synchronize sequence data with the publisher. Unlike + <link linkend="sql-altersubscription-params-refresh-publication"> + <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link> which + only has the ability to synchronize newly added sequences, + <literal>REFRESH SEQUENCES</literal> will re-synchronize the sequence + data for all currently subscribed sequences. It does not add or remove + sequences from the subscription to match the publication. + </para> + </listitem> + </varlistentry> + <varlistentry id="sql-altersubscription-params-enable"> <term><literal>ENABLE</literal></term> <listitem> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index e06587b0265..15b233a37d8 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -284,7 +284,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); if (HeapTupleIsValid(tup)) - elog(ERROR, "subscription table %u in subscription %u already exists", + elog(ERROR, "subscription relation %u in subscription %u already exists", relid, subid); /* Form the tuple. */ @@ -478,9 +478,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid) * synchronization is in progress unless the caller updates the * corresponding subscription as well. This is to ensure that we don't * leave tablesync slots or origins in the system when the - * corresponding table is dropped. + * corresponding table is dropped. For sequences, however, it's ok to + * drop them since no separate slots or origins are created during + * synchronization. */ - if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY) + if (!OidIsValid(subid) && + subrel->srsubstate != SUBREL_STATE_READY && + get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -517,7 +521,8 @@ HasSubscriptionTables(Oid subid) Relation rel; ScanKeyData skey[1]; SysScanDesc scan; - bool has_subrels; + HeapTuple tup; + bool has_subtables = false; rel = table_open(SubscriptionRelRelationId, AccessShareLock); @@ -529,14 +534,27 @@ HasSubscriptionTables(Oid subid) scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, skey); - /* If even a single tuple exists then the subscription has tables. */ - has_subrels = HeapTupleIsValid(systable_getnext(scan)); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + char relkind; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + relkind = get_rel_relkind(subrel->srrelid); + + if (relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE) + { + has_subtables = true; + break; + } + } /* Cleanup */ systable_endscan(scan); table_close(rel, AccessShareLock); - return has_subrels; + return has_subtables; } /* @@ -547,7 +565,8 @@ HasSubscriptionTables(Oid subid) * returned list is palloc'ed in the current memory context. */ List * -GetSubscriptionRelations(Oid subid, bool not_ready) +GetSubscriptionRelations(Oid subid, bool tables, bool sequences, + bool not_ready) { List *res = NIL; Relation rel; @@ -556,6 +575,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready) ScanKeyData skey[2]; SysScanDesc scan; + /* One or both of 'tables' and 'sequences' must be true. */ + Assert(tables || sequences); + rel = table_open(SubscriptionRelRelationId, AccessShareLock); ScanKeyInit(&skey[nkeys++], @@ -578,9 +600,24 @@ GetSubscriptionRelations(Oid subid, bool not_ready) SubscriptionRelState *relstate; Datum d; bool isnull; + char relkind; subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + /* Relation is either a sequence or a table */ + relkind = get_rel_relkind(subrel->srrelid); + Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE); + + /* Skip sequences if they were not requested */ + if ((relkind == RELKIND_SEQUENCE) && !sequences) + continue; + + /* Skip tables if they were not requested */ + if ((relkind == RELKIND_RELATION || + relkind == RELKIND_PARTITIONED_TABLE) && !tables) + continue; + relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0f54686b699..a0974d71de1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -106,12 +106,29 @@ typedef struct SubOpts XLogRecPtr lsn; } SubOpts; -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); -static void check_publications_origin(WalReceiverConn *wrconn, - List *publications, bool copydata, - bool retain_dead_tuples, char *origin, - Oid *subrel_local_oids, int subrel_count, - char *subname); +/* + * PublicationRelKind represents a relation included in a publication. + * It stores the schema-qualified relation name (rv) and its kind (relkind). + */ +typedef struct PublicationRelKind +{ + RangeVar *rv; + char relkind; +} PublicationRelKind; + +static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications); +static void check_publications_origin_tables(WalReceiverConn *wrconn, + List *publications, bool copydata, + bool retain_dead_tuples, + char *origin, + Oid *subrel_local_oids, + int subrel_count, char *subname); +static void check_publications_origin_sequences(WalReceiverConn *wrconn, + List *publications, + bool copydata, char *origin, + Oid *subrel_local_oids, + int subrel_count, + char *subname); static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); @@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + /* + * A replication origin is currently created for all subscriptions, + * including those that only contain sequences or are otherwise empty. + * + * XXX: While this is technically unnecessary, optimizing it would require + * additional logic to skip origin creation during DDL operations and + * apply workers initialization, and to handle origin creation dynamically + * when tables are added to the subscription. It is not clear whether + * preventing creation of origins is worth additional complexity. + */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); /* * Connect to remote side to execute requested commands and fetch table - * info. + * and sequence info. */ if (opts.connect) { char *err; WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; bool must_use_password; /* Try to connect to the publisher. */ @@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool has_tables = false; + List *pubrels; + char relation_state; + check_publications(wrconn, publications); - check_publications_origin(wrconn, publications, opts.copy_data, - opts.retaindeadtuples, opts.origin, - NULL, 0, stmt->subname); + check_publications_origin_tables(wrconn, publications, + opts.copy_data, + opts.retaindeadtuples, opts.origin, + NULL, 0, stmt->subname); + check_publications_origin_sequences(wrconn, publications, + opts.copy_data, opts.origin, + NULL, 0, stmt->subname); if (opts.retaindeadtuples) check_pub_dead_tuple_retention(wrconn); @@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table list from publisher and build local table status - * info. + * Build local relation status info. Relations are for both tables + * and sequences from the publisher. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + pubrels = fetch_relation_list(wrconn, publications); + + foreach_ptr(PublicationRelKind, pubrelinfo, pubrels) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; + char relkind; + RangeVar *rv = pubrelinfo->rv; relid = RangeVarGetRelid(rv, AccessShareLock, false); + relkind = get_rel_relkind(relid); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), + CheckSubscriptionRelkind(relkind, pubrelinfo->relkind, rv->schemaname, rv->relname); - - AddSubscriptionRelState(subid, relid, table_state, + has_tables |= (relkind != RELKIND_SEQUENCE); + AddSubscriptionRelState(subid, relid, relation_state, InvalidXLogRecPtr, true); } @@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to * export it. + * + * XXX: Similar to origins, it is not clear whether preventing the + * slot creation for empty and sequence-only subscriptions is + * worth additional complexity. */ if (opts.create_slot) { @@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && tables != NIL) + if (opts.twophase && !opts.copy_data && has_tables) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications) { char *err; - List *pubrel_names; + List *pubrels = NIL; + Oid *pubrel_local_oids; List *subrel_states; + List *sub_remove_rels = NIL; Oid *subrel_local_oids; - Oid *pubrel_local_oids; + Oid *subseq_local_oids; + int subrel_count; ListCell *lc; int off; - int remove_rel_len; - int subrel_count; + int tbl_count = 0; + int seq_count = 0; Relation rel = NULL; typedef struct SubRemoveRels { Oid relid; char state; } SubRemoveRels; - SubRemoveRels *sub_remove_rels; + WalReceiverConn *wrconn; bool must_use_password; @@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); - /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + /* Get the relation list from publisher. */ + pubrels = fetch_relation_list(wrconn, sub->publications); - /* Get local table list. */ - subrel_states = GetSubscriptionRelations(sub->oid, false); + /* Get local relation list. */ + subrel_states = GetSubscriptionRelations(sub->oid, true, true, false); subrel_count = list_length(subrel_states); /* - * Build qsorted array of local table oids for faster lookup. This can - * potentially contain all tables in the database so speed of lookup - * is important. + * Build qsorted arrays of local table oids and sequence oids for + * faster lookup. This can potentially contain all tables and + * sequences in the database so speed of lookup is important. + * + * We do not yet know the exact count of tables and sequences, so we + * allocate separate arrays for table OIDs and sequence OIDs based on + * the total number of relations (subrel_count). */ subrel_local_oids = palloc(subrel_count * sizeof(Oid)); - off = 0; + subseq_local_oids = palloc(subrel_count * sizeof(Oid)); foreach(lc, subrel_states) { SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - subrel_local_oids[off++] = relstate->relid; + if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE) + subseq_local_oids[seq_count++] = relstate->relid; + else + subrel_local_oids[tbl_count++] = relstate->relid; } - qsort(subrel_local_oids, subrel_count, - sizeof(Oid), oid_cmp); - check_publications_origin(wrconn, sub->publications, copy_data, - sub->retaindeadtuples, sub->origin, - subrel_local_oids, subrel_count, sub->name); + qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp); + check_publications_origin_tables(wrconn, sub->publications, copy_data, + sub->retaindeadtuples, sub->origin, + subrel_local_oids, tbl_count, + sub->name); - /* - * Rels that we want to remove from subscription and drop any slots - * and origins corresponding to them. - */ - sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); + qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp); + check_publications_origin_sequences(wrconn, sub->publications, + copy_data, sub->origin, + subseq_local_oids, seq_count, + sub->name); /* - * Walk over the remote tables and try to match them to locally known - * tables. If the table is not known locally create a new state for - * it. + * Walk over the remote relations and try to match them to locally + * known relations. If the relation is not known locally create a new + * state for it. * - * Also builds array of local oids of remote tables for the next step. + * Also builds array of local oids of remote relations for the next + * step. */ off = 0; - pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid)); - foreach(lc, pubrel_names) + foreach_ptr(PublicationRelKind, pubrelinfo, pubrels) { - RangeVar *rv = (RangeVar *) lfirst(lc); + RangeVar *rv = pubrelinfo->rv; Oid relid; + char relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); + relkind = get_rel_relkind(relid); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), + CheckSubscriptionRelkind(relkind, pubrelinfo->relkind, rv->schemaname, rv->relname); pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - subrel_count, sizeof(Oid), oid_cmp)) + tbl_count, sizeof(Oid), oid_cmp) && + !bsearch(&relid, subseq_local_oids, + seq_count, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, InvalidXLogRecPtr, true); ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", - rv->schemaname, rv->relname, sub->name))); + errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", + rv->schemaname, rv->relname, sub->name)); } } @@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Next remove state for tables we should not care about anymore using * the data we collected above */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp); - remove_rel_len = 0; - for (off = 0; off < subrel_count; off++) + for (off = 0; off < tbl_count; off++) { Oid relid = subrel_local_oids[off]; if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + list_length(pubrels), sizeof(Oid), oid_cmp)) { char state; XLogRecPtr statelsn; + SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels)); /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Last known rel state. */ state = GetSubscriptionRelState(sub->oid, relid, &statelsn); - sub_remove_rels[remove_rel_len].relid = relid; - sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); + remove_rel->relid = relid; + remove_rel->state = state; + + sub_remove_rels = lappend(sub_remove_rels, remove_rel); + logicalrep_worker_stop(sub->oid, relid); /* @@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * to be at the end because otherwise if there is an error while doing * the database operations we won't be able to rollback dropped slots. */ - for (off = 0; off < remove_rel_len; off++) + foreach_ptr(SubRemoveRels, rel, sub_remove_rels) { - if (sub_remove_rels[off].state != SUBREL_STATE_READY && - sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE) + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_SYNCDONE) { char syncslotname[NAMEDATALEN] = {0}; @@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * dropped slots and fail. For these reasons, we allow * missing_ok = true for the drop. */ - ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, + ReplicationSlotNameForTablesync(sub->oid, rel->relid, syncslotname, sizeof(syncslotname)); ReplicationSlotDropAtPubNode(wrconn, syncslotname, true); } } + + /* + * Next remove state for sequences we should not care about anymore + * using the data we collected above + */ + for (off = 0; off < seq_count; off++) + { + Oid relid = subseq_local_oids[off]; + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrels), sizeof(Oid), oid_cmp)) + { + /* + * This locking ensures that the state of rels won't change + * till we are done with this refresh operation. + */ + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + + RemoveSubscriptionRel(sub->oid, relid); + + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } } PG_FINALLY(); { @@ -1098,6 +1182,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* + * Marks all sequences with INIT state. + */ +static void +AlterSubscription_refresh_seq(Subscription *sub) +{ + char *err = NULL; + WalReceiverConn *wrconn; + bool must_use_password; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + must_use_password = sub->passwordrequired && !sub->ownersuperuser; + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, + sub->name, &err); + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("subscription \"%s\" could not connect to the publisher: %s", + sub->name, err)); + + PG_TRY(); + { + List *subrel_states; + + check_publications_origin_sequences(wrconn, sub->publications, true, + sub->origin, NULL, 0, sub->name); + + /* Get local sequence list. */ + subrel_states = GetSubscriptionRelations(sub->oid, false, true, false); + foreach_ptr(SubscriptionRelState, subrel, subrel_states) + { + Oid relid = subrel->relid; + + UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, + InvalidXLogRecPtr, false); + ereport(DEBUG1, + errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name)); + } + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); +} + +/* * Common checks for altering failover, two_phase, and retain_dead_tuples * options. */ @@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES: + { + if (!sub->enabled) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("%s is not allowed for disabled subscriptions", + "ALTER SUBSCRIPTION ... REFRESH SEQUENCES")); + + AlterSubscription_refresh_seq(sub); + + break; + } + case ALTER_SUBSCRIPTION_SKIP: { parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); @@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (retain_dead_tuples) check_pub_dead_tuple_retention(wrconn); - check_publications_origin(wrconn, sub->publications, false, - retain_dead_tuples, origin, NULL, 0, - sub->name); + check_publications_origin_tables(wrconn, sub->publications, false, + retain_dead_tuples, origin, NULL, 0, + sub->name); if (update_failover || update_two_phase) walrcv_alter_slot(wrconn, sub->slotname, @@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. */ - rstates = GetSubscriptionRelations(subid, true); + rstates = GetSubscriptionRelations(subid, true, false, true); foreach(lc, rstates) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * - See comments atop worker.c for more details. */ static void -check_publications_origin(WalReceiverConn *wrconn, List *publications, - bool copydata, bool retain_dead_tuples, - char *origin, Oid *subrel_local_oids, - int subrel_count, char *subname) +check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, + bool copydata, bool retain_dead_tuples, + char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname) { WalRcvExecResult *res; StringInfoData cmd; @@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, errmsg("could not receive list of replicated tables from the publisher: %s", res->err))); - /* Process tables. */ + /* Process publications. */ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { @@ -2483,6 +2632,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, } /* + * This function is similar to check_publications_origin_tables and serves + * same purpose for sequences. + */ +static void +check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, + bool copydata, char *origin, + Oid *subrel_local_oids, int subrel_count, + char *subname) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[1] = {TEXTOID}; + List *publist = NIL; + + /* + * Enable sequence synchronization checks only when origin is 'none' , to + * ensure that sequence data from other origins is not inadvertently + * copied. + */ + if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT P.pubname AS pubname\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_sequences(P.pubname) GPS\n" + " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPS.relid AND P.pubname IN ("); + + GetPublicationsStr(publications, &cmd, true); + appendStringInfoString(&cmd, ")\n"); + + /* + * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, + * subrel_local_oids contains the list of relations that are already + * present on the subscriber. This check should be skipped as these will + * not be re-synced. + */ + for (int i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *seqname = get_rel_name(relid); + + appendStringInfo(&cmd, + "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, seqname); + } + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated sequences from the publisher: %s", + res->err))); + + /* Process publications. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + publist = list_append_unique(publist, makeString(pubname)); + } + + /* + * Log a warning if the publisher has subscribed to the same sequence from + * some other publisher. We cannot know the origin of sequences data + * during the initial sync. + */ + if (publist) + { + StringInfo pubnames = makeStringInfo(); + StringInfo err_msg = makeStringInfo(); + StringInfo err_hint = makeStringInfo(); + + /* Prepare the list of publication(s) for warning message. */ + GetPublicationsStr(publist, pubnames, false); + + appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), + subname); + appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins.")); + + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_internal("%s", err_msg->data), + errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.", + "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.", + list_length(publist), pubnames->data), + errhint_internal("%s", err_hint->data)); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* * Determine whether the retain_dead_tuples can be enabled based on the * publisher's status. * @@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, } /* - * Get the list of tables which belong to specified publications on the - * publisher connection. + * Return true iff 'rv' is a member of the list. + */ +static bool +list_member_rangevar(const List *list, RangeVar *rv) +{ + foreach_ptr(PublicationRelKind, relinfo, list) + { + if (equal(relinfo->rv, rv)) + return true; + } + + return false; +} + +/* + * Get the list of tables and sequences which belong to specified publications + * on the publisher connection. * * Note that we don't support the case where the column list is different for * the same table in different publications to avoid sending unwanted column @@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, * list and row filter are specified for different publications. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_relation_list(WalReceiverConn *wrconn, List *publications) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid}; - List *tablelist = NIL; + Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid}; + List *relationlist = NIL; int server_version = walrcv_server_version(wrconn); bool check_columnlist = (server_version >= 150000); + int column_count = check_columnlist ? 4 : 3; StringInfo pub_names = makeStringInfo(); initStringInfo(&cmd); @@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) /* Build the pub_names comma-separated string. */ GetPublicationsStr(publications, pub_names, true); - /* Get the list of tables from the publisher. */ + /* Get the list of relations from the publisher */ if (server_version >= 160000) { - tableRow[2] = INT2VECTOROID; + tableRow[3] = INT2VECTOROID; /* * From version 16, we allowed passing multiple publications to the @@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) * to worry if different publications have specified them in a * different order. See pub_collist_validate. */ - appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n" - " FROM pg_class c\n" + appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n" + " FROM pg_class c\n" " JOIN pg_namespace n ON n.oid = c.relnamespace\n" " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n" " FROM pg_publication\n" " WHERE pubname IN ( %s )) AS gpt\n" " ON gpt.relid = c.oid\n", pub_names->data); + + /* From version 19, inclusion of sequences in the target is supported */ + if (server_version >= 190000) + appendStringInfo(&cmd, + "UNION ALL\n" + " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n" + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ( %s )", + pub_names->data); } else { - tableRow[2] = NAMEARRAYOID; - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + tableRow[3] = NAMEARRAYOID; + appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n"); /* Get column lists for each relation if the publisher supports it */ if (check_columnlist) @@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) destroyStringInfo(pub_names); - res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, column_count, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) char *nspname; char *relname; bool isnull; - RangeVar *rv; + char relkind; + PublicationRelKind *relinfo = palloc_object(PublicationRelKind); nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); - rv = makeRangeVar(nspname, relname, -1); + relinfo->rv = makeRangeVar(nspname, relname, -1); + relinfo->relkind = relkind; - if (check_columnlist && list_member(tablelist, rv)) + if (relkind != RELKIND_SEQUENCE && + check_columnlist && + list_member_rangevar(relationlist, relinfo->rv)) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different column lists for table \"%s.%s\" in different publications", nspname, relname)); else - tablelist = lappend(tablelist, rv); + relationlist = lappend(relationlist, relinfo); ExecClearTuple(slot); } @@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) walrcv_clear_result(res); - return tablelist; + return relationlist; } /* diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b409d4ecbf5..def32774c90 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -1112,18 +1112,36 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) /* - * Check if we support writing into specific relkind. + * Check if we support writing into specific relkind of local relation and check + * if it aligns with the relkind of the relation on the publisher. * * The nspname and relname are only needed for error reporting. */ void -CheckSubscriptionRelkind(char relkind, const char *nspname, - const char *relname) +CheckSubscriptionRelkind(char localrelkind, char remoterelkind, + const char *nspname, const char *relname) { - if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + if (localrelkind != RELKIND_RELATION && + localrelkind != RELKIND_PARTITIONED_TABLE && + localrelkind != RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", nspname, relname), - errdetail_relkind_not_supported(relkind))); + errdetail_relkind_not_supported(localrelkind))); + + /* + * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated + * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match + * exactly on both publisher and subscriber. + */ + if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) || + (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + /* translator: 3rd and 4th %s are "sequence" or "table" */ + errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"", + nspname, relname, + remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table", + localrelkind == RELKIND_SEQUENCE ? "sequence" : "table")); } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index dc0c2886674..a4b29c822e8 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10992,6 +10992,15 @@ AlterSubscriptionStmt: n->options = $6; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name REFRESH SEQUENCES + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_REFRESH_SEQUENCES; + n->subname = $3; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 2436a263dc2..ed62888764c 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -708,6 +708,9 @@ logicalrep_read_rel(StringInfo in) /* Read the replica identity. */ rel->replident = pq_getmsgbyte(in); + /* relkind is not sent */ + rel->relkind = 0; + /* Get attribute description */ logicalrep_read_attrs(in, rel); diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index f59046ad620..745fd3bab64 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -196,6 +196,17 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) entry->remoterel.atttyps[i] = remoterel->atttyps[i]; } entry->remoterel.replident = remoterel->replident; + + /* + * XXX The walsender currently does not transmit the relkind of the remote + * relation when replicating changes. Since we support replicating only + * table changes at present, we default to initializing relkind as + * RELKIND_RELATION. This is needed in CheckSubscriptionRelkind() to check + * if the publisher and subscriber relation kinds are compatible. + */ + entry->remoterel.relkind = + (remoterel->relkind == 0) ? RELKIND_RELATION : remoterel->relkind; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); MemoryContextSwitchTo(oldctx); } @@ -425,6 +436,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) /* Check for supported relkind. */ CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, + remoterel->relkind, remoterel->nspname, remoterel->relname); /* diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index 1bb3ca01db0..e452a1e78d4 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -149,8 +149,9 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables and sequences that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true); + /* Fetch tables that are in non-ready state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, false, + true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2ba12517e93..40e1ed3c20e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -840,7 +840,7 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, /* * We don't support the case where the column list is different for * the same table when combining publications. See comments atop - * fetch_table_list. So there should be only one row returned. + * fetch_relation_list. So there should be only one row returned. * Although we already checked this when creating the subscription, we * still need to check here in case the column list was changed after * creating the subscription and before the sync worker is started. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ec65a385f2d..5df5a4612b6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3368,6 +3368,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * at CREATE/ALTER SUBSCRIPTION would be insufficient. */ CheckSubscriptionRelkind(partrel->rd_rel->relkind, + relmapentry->remoterel.relkind, get_namespace_name(RelationGetNamespace(partrel)), RelationGetRelationName(partrel)); @@ -3564,6 +3565,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, /* Check that new partition also has supported relkind. */ CheckSubscriptionRelkind(partrel_new->rd_rel->relkind, + relmapentry->remoterel.relkind, get_namespace_name(RelationGetNamespace(partrel_new)), RelationGetRelationName(partrel_new)); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 378b61fbd18..942e1abdb58 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1137,9 +1137,9 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, * * Note that we don't support the case where the column list is different * for the same table when combining publications. See comments atop - * fetch_table_list. But one can later change the publication so we still - * need to check all the given publication-table mappings and report an - * error if any publications have a different column list. + * fetch_relation_list. But one can later change the publication so we + * still need to check all the given publication-table mappings and report + * an error if any publications have a different column list. */ foreach(lc, publications) { diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index ad37f9f6ed0..fa08059671b 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2319,11 +2319,11 @@ match_previous_words(int pattern_id, /* ALTER SUBSCRIPTION <name> */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny)) COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO", - "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (", - "ADD PUBLICATION", "DROP PUBLICATION"); - /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */ - else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION")) - COMPLETE_WITH("WITH ("); + "RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES", + "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION"); + /* ALTER SUBSCRIPTION <name> REFRESH */ + else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH")) + COMPLETE_WITH("PUBLICATION", "SEQUENCES"); /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "(")) COMPLETE_WITH("copy_data"); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 61b63c6bb7a..9f88498ecd3 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -90,7 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern bool HasSubscriptionTables(Oid subid); -extern List *GetSubscriptionRelations(Oid subid, bool not_ready); +extern List *GetSubscriptionRelations(Oid subid, bool tables, bool sequences, + bool not_ready); extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 3248e78cd28..0ba86c2ad72 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -784,8 +784,8 @@ extern void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, TupleTableSlot *searchslot); extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd); -extern void CheckSubscriptionRelkind(char relkind, const char *nspname, - const char *relname); +extern void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, + const char *nspname, const char *relname); /* * prototypes from functions in nodeModifyTable.c diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4e445fe0cd7..ecbddd12e1b 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4362,6 +4362,7 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, + ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_SKIP, } AlterSubscriptionType; diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 20b4e523d93..85d10a89994 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -45,6 +45,7 @@ tests += { 't/033_run_as_table_owner.pl', 't/034_temporal.pl', 't/035_conflicts.pl', + 't/036_sequences.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl new file mode 100644 index 00000000000..557fc91c017 --- /dev/null +++ b/src/test/subscription/t/036_sequences.pl @@ -0,0 +1,55 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# This tests that sequences are registered to be synced to the subscriber +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); + +# Avoid checkpoint during the test, otherwise, extra values will be fetched for +# the sequences which will cause the test to fail randomly. +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Setup structure on the publisher +my $ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; +); +$node_publisher->safe_psql('postgres', $ddl); + +# Setup the same structure on the subscriber +$node_subscriber->safe_psql('postgres', $ddl); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- generate a number of values using the sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Setup logical replication pub/sub +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" +); + +# Confirm sequences can be listed in pg_subscription_rel +my $result = $node_subscriber->safe_psql('postgres', + "SELECT relname, srsubstate FROM pg_class, pg_subscription_rel WHERE oid = srrelid" +); +is($result, 'regress_s1|i', "Sequence can be in pg_subscription_rel catalog"); + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c13f92988ee..43fe3bcd593 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2363,6 +2363,7 @@ PublicationObjSpec PublicationObjSpecType PublicationPartOpt PublicationRelInfo +PublicationRelKind PublicationSchemaInfo PublicationTable PublishGencolsType |
