summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/catalogs.sgml29
-rw-r--r--doc/src/sgml/ref/alter_subscription.sgml42
-rw-r--r--src/backend/catalog/pg_subscription.c53
-rw-r--r--src/backend/commands/subscriptioncmds.c480
-rw-r--r--src/backend/executor/execReplication.c28
-rw-r--r--src/backend/parser/gram.y9
-rw-r--r--src/backend/replication/logical/proto.c3
-rw-r--r--src/backend/replication/logical/relation.c12
-rw-r--r--src/backend/replication/logical/syncutils.c5
-rw-r--r--src/backend/replication/logical/tablesync.c2
-rw-r--r--src/backend/replication/logical/worker.c2
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c6
-rw-r--r--src/bin/psql/tab-complete.in.c10
-rw-r--r--src/include/catalog/pg_subscription_rel.h3
-rw-r--r--src/include/executor/executor.h4
-rw-r--r--src/include/nodes/parsenodes.h1
-rw-r--r--src/test/subscription/meson.build1
-rw-r--r--src/test/subscription/t/036_sequences.pl55
-rw-r--r--src/tools/pgindent/typedefs.list1
19 files changed, 607 insertions, 139 deletions
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 9b3aae8603b..6c8a0f173c9 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8199,16 +8199,19 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
</indexterm>
<para>
- The catalog <structname>pg_subscription_rel</structname> contains the
- state for each replicated relation in each subscription. This is a
- many-to-many mapping.
+ The catalog <structname>pg_subscription_rel</structname> stores the
+ state of each replicated table and sequence for each subscription. This
+ is a many-to-many mapping.
</para>
<para>
- This catalog only contains tables known to the subscription after running
- either <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link> or
- <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... REFRESH
- PUBLICATION</command></link>.
+ This catalog contains tables and sequences known to the subscription
+ after running:
+ <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>,
+ <link linkend="sql-altersubscription-params-refresh-publication">
+ <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link>, or
+ <link linkend="sql-altersubscription-params-refresh-sequences">
+ <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>.
</para>
<table>
@@ -8242,7 +8245,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
(references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>)
</para>
<para>
- Reference to relation
+ Reference to table or sequence
</para></entry>
</row>
@@ -8251,12 +8254,20 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<structfield>srsubstate</structfield> <type>char</type>
</para>
<para>
- State code:
+ State code for the table or sequence.
+ </para>
+ <para>
+ State codes for tables:
<literal>i</literal> = initialize,
<literal>d</literal> = data is being copied,
<literal>f</literal> = finished table copy,
<literal>s</literal> = synchronized,
<literal>r</literal> = ready (normal replication)
+ </para>
+ <para>
+ State codes for sequences:
+ <literal>i</literal> = initialize,
+ <literal>r</literal> = ready
</para></entry>
</row>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 12f72ba3167..8ab3b7fbd37 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICA
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH SEQUENCES
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
@@ -139,9 +140,10 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<term><literal>refresh</literal> (<type>boolean</type>)</term>
<listitem>
<para>
- When false, the command will not try to refresh table information.
- <literal>REFRESH PUBLICATION</literal> should then be executed separately.
- The default is <literal>true</literal>.
+ When <literal>false</literal>, the command will not try to refresh
+ table and sequence information. <literal>REFRESH PUBLICATION</literal>
+ should then be executed separately. The default is
+ <literal>true</literal>.
</para>
</listitem>
</varlistentry>
@@ -158,7 +160,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<term><literal>REFRESH PUBLICATION</literal></term>
<listitem>
<para>
- Fetch missing table information from publisher. This will start
+ Fetch missing table and sequence information from the publisher. This will start
replication of tables that were added to the subscribed-to publications
since <link linkend="sql-createsubscription">
<command>CREATE SUBSCRIPTION</command></link> or
@@ -166,6 +168,12 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</para>
<para>
+ The system catalog <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>
+ is updated to record all tables and sequences known to the subscription,
+ that are still part of the publication.
+ </para>
+
+ <para>
<replaceable>refresh_option</replaceable> specifies additional options for the
refresh operation. The supported options are:
@@ -174,15 +182,20 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<term><literal>copy_data</literal> (<type>boolean</type>)</term>
<listitem>
<para>
- Specifies whether to copy pre-existing data in the publications
- that are being subscribed to when the replication starts.
- The default is <literal>true</literal>.
+ Specifies whether to copy pre-existing data for tables and synchronize
+ sequences in the publications that are being subscribed to when the replication
+ starts. The default is <literal>true</literal>.
</para>
<para>
Previously subscribed tables are not copied, even if a table's row
filter <literal>WHERE</literal> clause has since been modified.
</para>
<para>
+ Previously subscribed sequences are not re-synchronized. To do that,
+ use <link linkend="sql-altersubscription-params-refresh-sequences">
+ <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>.
+ </para>
+ <para>
See <xref linkend="sql-createsubscription-notes"/> for details of
how <literal>copy_data = true</literal> can interact with the
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>
@@ -200,6 +213,21 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</listitem>
</varlistentry>
+ <varlistentry id="sql-altersubscription-params-refresh-sequences">
+ <term><literal>REFRESH SEQUENCES</literal></term>
+ <listitem>
+ <para>
+ Re-synchronize sequence data with the publisher. Unlike
+ <link linkend="sql-altersubscription-params-refresh-publication">
+ <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link> which
+ only has the ability to synchronize newly added sequences,
+ <literal>REFRESH SEQUENCES</literal> will re-synchronize the sequence
+ data for all currently subscribed sequences. It does not add or remove
+ sequences from the subscription to match the publication.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-altersubscription-params-enable">
<term><literal>ENABLE</literal></term>
<listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index e06587b0265..15b233a37d8 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -284,7 +284,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
if (HeapTupleIsValid(tup))
- elog(ERROR, "subscription table %u in subscription %u already exists",
+ elog(ERROR, "subscription relation %u in subscription %u already exists",
relid, subid);
/* Form the tuple. */
@@ -478,9 +478,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
* synchronization is in progress unless the caller updates the
* corresponding subscription as well. This is to ensure that we don't
* leave tablesync slots or origins in the system when the
- * corresponding table is dropped.
+ * corresponding table is dropped. For sequences, however, it's ok to
+ * drop them since no separate slots or origins are created during
+ * synchronization.
*/
- if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
+ if (!OidIsValid(subid) &&
+ subrel->srsubstate != SUBREL_STATE_READY &&
+ get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -517,7 +521,8 @@ HasSubscriptionTables(Oid subid)
Relation rel;
ScanKeyData skey[1];
SysScanDesc scan;
- bool has_subrels;
+ HeapTuple tup;
+ bool has_subtables = false;
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
@@ -529,14 +534,27 @@ HasSubscriptionTables(Oid subid)
scan = systable_beginscan(rel, InvalidOid, false,
NULL, 1, skey);
- /* If even a single tuple exists then the subscription has tables. */
- has_subrels = HeapTupleIsValid(systable_getnext(scan));
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_subscription_rel subrel;
+ char relkind;
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+ relkind = get_rel_relkind(subrel->srrelid);
+
+ if (relkind == RELKIND_RELATION ||
+ relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ has_subtables = true;
+ break;
+ }
+ }
/* Cleanup */
systable_endscan(scan);
table_close(rel, AccessShareLock);
- return has_subrels;
+ return has_subtables;
}
/*
@@ -547,7 +565,8 @@ HasSubscriptionTables(Oid subid)
* returned list is palloc'ed in the current memory context.
*/
List *
-GetSubscriptionRelations(Oid subid, bool not_ready)
+GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
+ bool not_ready)
{
List *res = NIL;
Relation rel;
@@ -556,6 +575,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
ScanKeyData skey[2];
SysScanDesc scan;
+ /* One or both of 'tables' and 'sequences' must be true. */
+ Assert(tables || sequences);
+
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
ScanKeyInit(&skey[nkeys++],
@@ -578,9 +600,24 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
SubscriptionRelState *relstate;
Datum d;
bool isnull;
+ char relkind;
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+ /* Relation is either a sequence or a table */
+ relkind = get_rel_relkind(subrel->srrelid);
+ Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
+ relkind == RELKIND_PARTITIONED_TABLE);
+
+ /* Skip sequences if they were not requested */
+ if ((relkind == RELKIND_SEQUENCE) && !sequences)
+ continue;
+
+ /* Skip tables if they were not requested */
+ if ((relkind == RELKIND_RELATION ||
+ relkind == RELKIND_PARTITIONED_TABLE) && !tables)
+ continue;
+
relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
relstate->relid = subrel->srrelid;
relstate->state = subrel->srsubstate;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0f54686b699..a0974d71de1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -106,12 +106,29 @@ typedef struct SubOpts
XLogRecPtr lsn;
} SubOpts;
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
-static void check_publications_origin(WalReceiverConn *wrconn,
- List *publications, bool copydata,
- bool retain_dead_tuples, char *origin,
- Oid *subrel_local_oids, int subrel_count,
- char *subname);
+/*
+ * PublicationRelKind represents a relation included in a publication.
+ * It stores the schema-qualified relation name (rv) and its kind (relkind).
+ */
+typedef struct PublicationRelKind
+{
+ RangeVar *rv;
+ char relkind;
+} PublicationRelKind;
+
+static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications_origin_tables(WalReceiverConn *wrconn,
+ List *publications, bool copydata,
+ bool retain_dead_tuples,
+ char *origin,
+ Oid *subrel_local_oids,
+ int subrel_count, char *subname);
+static void check_publications_origin_sequences(WalReceiverConn *wrconn,
+ List *publications,
+ bool copydata, char *origin,
+ Oid *subrel_local_oids,
+ int subrel_count,
+ char *subname);
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
+ /*
+ * A replication origin is currently created for all subscriptions,
+ * including those that only contain sequences or are otherwise empty.
+ *
+ * XXX: While this is technically unnecessary, optimizing it would require
+ * additional logic to skip origin creation during DDL operations and
+ * apply workers initialization, and to handle origin creation dynamically
+ * when tables are added to the subscription. It is not clear whether
+ * preventing creation of origins is worth additional complexity.
+ */
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
/*
* Connect to remote side to execute requested commands and fetch table
- * info.
+ * and sequence info.
*/
if (opts.connect)
{
char *err;
WalReceiverConn *wrconn;
- List *tables;
- ListCell *lc;
- char table_state;
bool must_use_password;
/* Try to connect to the publisher. */
@@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY();
{
+ bool has_tables = false;
+ List *pubrels;
+ char relation_state;
+
check_publications(wrconn, publications);
- check_publications_origin(wrconn, publications, opts.copy_data,
- opts.retaindeadtuples, opts.origin,
- NULL, 0, stmt->subname);
+ check_publications_origin_tables(wrconn, publications,
+ opts.copy_data,
+ opts.retaindeadtuples, opts.origin,
+ NULL, 0, stmt->subname);
+ check_publications_origin_sequences(wrconn, publications,
+ opts.copy_data, opts.origin,
+ NULL, 0, stmt->subname);
if (opts.retaindeadtuples)
check_pub_dead_tuple_retention(wrconn);
@@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* Set sync state based on if we were asked to do data copy or
* not.
*/
- table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
- * Get the table list from publisher and build local table status
- * info.
+ * Build local relation status info. Relations are for both tables
+ * and sequences from the publisher.
*/
- tables = fetch_table_list(wrconn, publications);
- foreach(lc, tables)
+ pubrels = fetch_relation_list(wrconn, publications);
+
+ foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
{
- RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
+ char relkind;
+ RangeVar *rv = pubrelinfo->rv;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ relkind = get_rel_relkind(relid);
/* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
+ CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
rv->schemaname, rv->relname);
-
- AddSubscriptionRelState(subid, relid, table_state,
+ has_tables |= (relkind != RELKIND_SEQUENCE);
+ AddSubscriptionRelState(subid, relid, relation_state,
InvalidXLogRecPtr, true);
}
@@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
* export it.
+ *
+ * XXX: Similar to origins, it is not clear whether preventing the
+ * slot creation for empty and sequence-only subscriptions is
+ * worth additional complexity.
*/
if (opts.create_slot)
{
@@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
* PUBLICATION to work.
*/
- if (opts.twophase && !opts.copy_data && tables != NIL)
+ if (opts.twophase && !opts.copy_data && has_tables)
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
List *validate_publications)
{
char *err;
- List *pubrel_names;
+ List *pubrels = NIL;
+ Oid *pubrel_local_oids;
List *subrel_states;
+ List *sub_remove_rels = NIL;
Oid *subrel_local_oids;
- Oid *pubrel_local_oids;
+ Oid *subseq_local_oids;
+ int subrel_count;
ListCell *lc;
int off;
- int remove_rel_len;
- int subrel_count;
+ int tbl_count = 0;
+ int seq_count = 0;
Relation rel = NULL;
typedef struct SubRemoveRels
{
Oid relid;
char state;
} SubRemoveRels;
- SubRemoveRels *sub_remove_rels;
+
WalReceiverConn *wrconn;
bool must_use_password;
@@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
if (validate_publications)
check_publications(wrconn, validate_publications);
- /* Get the table list from publisher. */
- pubrel_names = fetch_table_list(wrconn, sub->publications);
+ /* Get the relation list from publisher. */
+ pubrels = fetch_relation_list(wrconn, sub->publications);
- /* Get local table list. */
- subrel_states = GetSubscriptionRelations(sub->oid, false);
+ /* Get local relation list. */
+ subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
subrel_count = list_length(subrel_states);
/*
- * Build qsorted array of local table oids for faster lookup. This can
- * potentially contain all tables in the database so speed of lookup
- * is important.
+ * Build qsorted arrays of local table oids and sequence oids for
+ * faster lookup. This can potentially contain all tables and
+ * sequences in the database so speed of lookup is important.
+ *
+ * We do not yet know the exact count of tables and sequences, so we
+ * allocate separate arrays for table OIDs and sequence OIDs based on
+ * the total number of relations (subrel_count).
*/
subrel_local_oids = palloc(subrel_count * sizeof(Oid));
- off = 0;
+ subseq_local_oids = palloc(subrel_count * sizeof(Oid));
foreach(lc, subrel_states)
{
SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
- subrel_local_oids[off++] = relstate->relid;
+ if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
+ subseq_local_oids[seq_count++] = relstate->relid;
+ else
+ subrel_local_oids[tbl_count++] = relstate->relid;
}
- qsort(subrel_local_oids, subrel_count,
- sizeof(Oid), oid_cmp);
- check_publications_origin(wrconn, sub->publications, copy_data,
- sub->retaindeadtuples, sub->origin,
- subrel_local_oids, subrel_count, sub->name);
+ qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
+ check_publications_origin_tables(wrconn, sub->publications, copy_data,
+ sub->retaindeadtuples, sub->origin,
+ subrel_local_oids, tbl_count,
+ sub->name);
- /*
- * Rels that we want to remove from subscription and drop any slots
- * and origins corresponding to them.
- */
- sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
+ qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
+ check_publications_origin_sequences(wrconn, sub->publications,
+ copy_data, sub->origin,
+ subseq_local_oids, seq_count,
+ sub->name);
/*
- * Walk over the remote tables and try to match them to locally known
- * tables. If the table is not known locally create a new state for
- * it.
+ * Walk over the remote relations and try to match them to locally
+ * known relations. If the relation is not known locally create a new
+ * state for it.
*
- * Also builds array of local oids of remote tables for the next step.
+ * Also builds array of local oids of remote relations for the next
+ * step.
*/
off = 0;
- pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+ pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
- foreach(lc, pubrel_names)
+ foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
{
- RangeVar *rv = (RangeVar *) lfirst(lc);
+ RangeVar *rv = pubrelinfo->rv;
Oid relid;
+ char relkind;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ relkind = get_rel_relkind(relid);
/* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
+ CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
rv->schemaname, rv->relname);
pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids,
- subrel_count, sizeof(Oid), oid_cmp))
+ tbl_count, sizeof(Oid), oid_cmp) &&
+ !bsearch(&relid, subseq_local_oids,
+ seq_count, sizeof(Oid), oid_cmp))
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr, true);
ereport(DEBUG1,
- (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
+ errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
+ relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+ rv->schemaname, rv->relname, sub->name));
}
}
@@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Next remove state for tables we should not care about anymore using
* the data we collected above
*/
- qsort(pubrel_local_oids, list_length(pubrel_names),
- sizeof(Oid), oid_cmp);
+ qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
- remove_rel_len = 0;
- for (off = 0; off < subrel_count; off++)
+ for (off = 0; off < tbl_count; off++)
{
Oid relid = subrel_local_oids[off];
if (!bsearch(&relid, pubrel_local_oids,
- list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ list_length(pubrels), sizeof(Oid), oid_cmp))
{
char state;
XLogRecPtr statelsn;
+ SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
/*
* Lock pg_subscription_rel with AccessExclusiveLock to
@@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Last known rel state. */
state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
- sub_remove_rels[remove_rel_len].relid = relid;
- sub_remove_rels[remove_rel_len++].state = state;
-
RemoveSubscriptionRel(sub->oid, relid);
+ remove_rel->relid = relid;
+ remove_rel->state = state;
+
+ sub_remove_rels = lappend(sub_remove_rels, remove_rel);
+
logicalrep_worker_stop(sub->oid, relid);
/*
@@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* to be at the end because otherwise if there is an error while doing
* the database operations we won't be able to rollback dropped slots.
*/
- for (off = 0; off < remove_rel_len; off++)
+ foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
{
- if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
- sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ if (rel->state != SUBREL_STATE_READY &&
+ rel->state != SUBREL_STATE_SYNCDONE)
{
char syncslotname[NAMEDATALEN] = {0};
@@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* dropped slots and fail. For these reasons, we allow
* missing_ok = true for the drop.
*/
- ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+ ReplicationSlotNameForTablesync(sub->oid, rel->relid,
syncslotname, sizeof(syncslotname));
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
+
+ /*
+ * Next remove state for sequences we should not care about anymore
+ * using the data we collected above
+ */
+ for (off = 0; off < seq_count; off++)
+ {
+ Oid relid = subseq_local_oids[off];
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrels), sizeof(Oid), oid_cmp))
+ {
+ /*
+ * This locking ensures that the state of rels won't change
+ * till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ ereport(DEBUG1,
+ errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
+ }
+ }
}
PG_FINALLY();
{
@@ -1098,6 +1182,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
+ * Marks all sequences with INIT state.
+ */
+static void
+AlterSubscription_refresh_seq(Subscription *sub)
+{
+ char *err = NULL;
+ WalReceiverConn *wrconn;
+ bool must_use_password;
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ must_use_password = sub->passwordrequired && !sub->ownersuperuser;
+ wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
+ sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("subscription \"%s\" could not connect to the publisher: %s",
+ sub->name, err));
+
+ PG_TRY();
+ {
+ List *subrel_states;
+
+ check_publications_origin_sequences(wrconn, sub->publications, true,
+ sub->origin, NULL, 0, sub->name);
+
+ /* Get local sequence list. */
+ subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
+ foreach_ptr(SubscriptionRelState, subrel, subrel_states)
+ {
+ Oid relid = subrel->relid;
+
+ UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
+ InvalidXLogRecPtr, false);
+ ereport(DEBUG1,
+ errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
+ }
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+}
+
+/*
* Common checks for altering failover, two_phase, and retain_dead_tuples
* options.
*/
@@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
+ {
+ if (!sub->enabled)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("%s is not allowed for disabled subscriptions",
+ "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
+
+ AlterSubscription_refresh_seq(sub);
+
+ break;
+ }
+
case ALTER_SUBSCRIPTION_SKIP:
{
parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
@@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (retain_dead_tuples)
check_pub_dead_tuple_retention(wrconn);
- check_publications_origin(wrconn, sub->publications, false,
- retain_dead_tuples, origin, NULL, 0,
- sub->name);
+ check_publications_origin_tables(wrconn, sub->publications, false,
+ retain_dead_tuples, origin, NULL, 0,
+ sub->name);
if (update_failover || update_two_phase)
walrcv_alter_slot(wrconn, sub->slotname,
@@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* the apply and tablesync workers and they can't restart because of
* exclusive lock on the subscription.
*/
- rstates = GetSubscriptionRelations(subid, true);
+ rstates = GetSubscriptionRelations(subid, true, false, true);
foreach(lc, rstates)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* - See comments atop worker.c for more details.
*/
static void
-check_publications_origin(WalReceiverConn *wrconn, List *publications,
- bool copydata, bool retain_dead_tuples,
- char *origin, Oid *subrel_local_oids,
- int subrel_count, char *subname)
+check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
+ bool copydata, bool retain_dead_tuples,
+ char *origin, Oid *subrel_local_oids,
+ int subrel_count, char *subname)
{
WalRcvExecResult *res;
StringInfoData cmd;
@@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
- /* Process tables. */
+ /* Process publications. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
@@ -2483,6 +2632,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
}
/*
+ * This function is similar to check_publications_origin_tables and serves
+ * same purpose for sequences.
+ */
+static void
+check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
+ bool copydata, char *origin,
+ Oid *subrel_local_oids, int subrel_count,
+ char *subname)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[1] = {TEXTOID};
+ List *publist = NIL;
+
+ /*
+ * Enable sequence synchronization checks only when origin is 'none' , to
+ * ensure that sequence data from other origins is not inadvertently
+ * copied.
+ */
+ if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
+ return;
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd,
+ "SELECT DISTINCT P.pubname AS pubname\n"
+ "FROM pg_publication P,\n"
+ " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
+ " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+ "WHERE C.oid = GPS.relid AND P.pubname IN (");
+
+ GetPublicationsStr(publications, &cmd, true);
+ appendStringInfoString(&cmd, ")\n");
+
+ /*
+ * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
+ * subrel_local_oids contains the list of relations that are already
+ * present on the subscriber. This check should be skipped as these will
+ * not be re-synced.
+ */
+ for (int i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *seqname = get_rel_name(relid);
+
+ appendStringInfo(&cmd,
+ "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, seqname);
+ }
+
+ res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of replicated sequences from the publisher: %s",
+ res->err)));
+
+ /* Process publications. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *pubname;
+ bool isnull;
+
+ pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ publist = list_append_unique(publist, makeString(pubname));
+ }
+
+ /*
+ * Log a warning if the publisher has subscribed to the same sequence from
+ * some other publisher. We cannot know the origin of sequences data
+ * during the initial sync.
+ */
+ if (publist)
+ {
+ StringInfo pubnames = makeStringInfo();
+ StringInfo err_msg = makeStringInfo();
+ StringInfo err_hint = makeStringInfo();
+
+ /* Prepare the list of publication(s) for warning message. */
+ GetPublicationsStr(publist, pubnames, false);
+
+ appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
+ subname);
+ appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins."));
+
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_internal("%s", err_msg->data),
+ errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
+ "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
+ list_length(publist), pubnames->data),
+ errhint_internal("%s", err_hint->data));
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
* Determine whether the retain_dead_tuples can be enabled based on the
* publisher's status.
*
@@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
}
/*
- * Get the list of tables which belong to specified publications on the
- * publisher connection.
+ * Return true iff 'rv' is a member of the list.
+ */
+static bool
+list_member_rangevar(const List *list, RangeVar *rv)
+{
+ foreach_ptr(PublicationRelKind, relinfo, list)
+ {
+ if (equal(relinfo->rv, rv))
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Get the list of tables and sequences which belong to specified publications
+ * on the publisher connection.
*
* Note that we don't support the case where the column list is different for
* the same table in different publications to avoid sending unwanted column
@@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
* list and row filter are specified for different publications.
*/
static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_relation_list(WalReceiverConn *wrconn, List *publications)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
- List *tablelist = NIL;
+ Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
+ List *relationlist = NIL;
int server_version = walrcv_server_version(wrconn);
bool check_columnlist = (server_version >= 150000);
+ int column_count = check_columnlist ? 4 : 3;
StringInfo pub_names = makeStringInfo();
initStringInfo(&cmd);
@@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
/* Build the pub_names comma-separated string. */
GetPublicationsStr(publications, pub_names, true);
- /* Get the list of tables from the publisher. */
+ /* Get the list of relations from the publisher */
if (server_version >= 160000)
{
- tableRow[2] = INT2VECTOROID;
+ tableRow[3] = INT2VECTOROID;
/*
* From version 16, we allowed passing multiple publications to the
@@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
* to worry if different publications have specified them in a
* different order. See pub_collist_validate.
*/
- appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
- " FROM pg_class c\n"
+ appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
+ " FROM pg_class c\n"
" JOIN pg_namespace n ON n.oid = c.relnamespace\n"
" JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
" FROM pg_publication\n"
" WHERE pubname IN ( %s )) AS gpt\n"
" ON gpt.relid = c.oid\n",
pub_names->data);
+
+ /* From version 19, inclusion of sequences in the target is supported */
+ if (server_version >= 190000)
+ appendStringInfo(&cmd,
+ "UNION ALL\n"
+ " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
+ " FROM pg_catalog.pg_publication_sequences s\n"
+ " WHERE s.pubname IN ( %s )",
+ pub_names->data);
}
else
{
- tableRow[2] = NAMEARRAYOID;
- appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
+ tableRow[3] = NAMEARRAYOID;
+ appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
/* Get column lists for each relation if the publisher supports it */
if (check_columnlist)
@@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
destroyStringInfo(pub_names);
- res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
+ res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
@@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
char *nspname;
char *relname;
bool isnull;
- RangeVar *rv;
+ char relkind;
+ PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
+ relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
- rv = makeRangeVar(nspname, relname, -1);
+ relinfo->rv = makeRangeVar(nspname, relname, -1);
+ relinfo->relkind = relkind;
- if (check_columnlist && list_member(tablelist, rv))
+ if (relkind != RELKIND_SEQUENCE &&
+ check_columnlist &&
+ list_member_rangevar(relationlist, relinfo->rv))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
nspname, relname));
else
- tablelist = lappend(tablelist, rv);
+ relationlist = lappend(relationlist, relinfo);
ExecClearTuple(slot);
}
@@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
walrcv_clear_result(res);
- return tablelist;
+ return relationlist;
}
/*
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b409d4ecbf5..def32774c90 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -1112,18 +1112,36 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
/*
- * Check if we support writing into specific relkind.
+ * Check if we support writing into specific relkind of local relation and check
+ * if it aligns with the relkind of the relation on the publisher.
*
* The nspname and relname are only needed for error reporting.
*/
void
-CheckSubscriptionRelkind(char relkind, const char *nspname,
- const char *relname)
+CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
+ const char *nspname, const char *relname)
{
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ if (localrelkind != RELKIND_RELATION &&
+ localrelkind != RELKIND_PARTITIONED_TABLE &&
+ localrelkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
nspname, relname),
- errdetail_relkind_not_supported(relkind)));
+ errdetail_relkind_not_supported(localrelkind)));
+
+ /*
+ * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
+ * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
+ * exactly on both publisher and subscriber.
+ */
+ if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
+ (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ /* translator: 3rd and 4th %s are "sequence" or "table" */
+ errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
+ nspname, relname,
+ remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
+ localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dc0c2886674..a4b29c822e8 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10992,6 +10992,15 @@ AlterSubscriptionStmt:
n->options = $6;
$$ = (Node *) n;
}
+ | ALTER SUBSCRIPTION name REFRESH SEQUENCES
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+
+ n->kind = ALTER_SUBSCRIPTION_REFRESH_SEQUENCES;
+ n->subname = $3;
+ $$ = (Node *) n;
+ }
| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
{
AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 2436a263dc2..ed62888764c 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -708,6 +708,9 @@ logicalrep_read_rel(StringInfo in)
/* Read the replica identity. */
rel->replident = pq_getmsgbyte(in);
+ /* relkind is not sent */
+ rel->relkind = 0;
+
/* Get attribute description */
logicalrep_read_attrs(in, rel);
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f59046ad620..745fd3bab64 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -196,6 +196,17 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
+
+ /*
+ * XXX The walsender currently does not transmit the relkind of the remote
+ * relation when replicating changes. Since we support replicating only
+ * table changes at present, we default to initializing relkind as
+ * RELKIND_RELATION. This is needed in CheckSubscriptionRelkind() to check
+ * if the publisher and subscriber relation kinds are compatible.
+ */
+ entry->remoterel.relkind =
+ (remoterel->relkind == 0) ? RELKIND_RELATION : remoterel->relkind;
+
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
MemoryContextSwitchTo(oldctx);
}
@@ -425,6 +436,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
/* Check for supported relkind. */
CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
+ remoterel->relkind,
remoterel->nspname, remoterel->relname);
/*
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index 1bb3ca01db0..e452a1e78d4 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -149,8 +149,9 @@ FetchRelationStates(bool *started_tx)
*started_tx = true;
}
- /* Fetch tables and sequences that are in non-ready state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true);
+ /* Fetch tables that are in non-ready state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
+ true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2ba12517e93..40e1ed3c20e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -840,7 +840,7 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
/*
* We don't support the case where the column list is different for
* the same table when combining publications. See comments atop
- * fetch_table_list. So there should be only one row returned.
+ * fetch_relation_list. So there should be only one row returned.
* Although we already checked this when creating the subscription, we
* still need to check here in case the column list was changed after
* creating the subscription and before the sync worker is started.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ec65a385f2d..5df5a4612b6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3368,6 +3368,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* at CREATE/ALTER SUBSCRIPTION would be insufficient.
*/
CheckSubscriptionRelkind(partrel->rd_rel->relkind,
+ relmapentry->remoterel.relkind,
get_namespace_name(RelationGetNamespace(partrel)),
RelationGetRelationName(partrel));
@@ -3564,6 +3565,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Check that new partition also has supported relkind. */
CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
+ relmapentry->remoterel.relkind,
get_namespace_name(RelationGetNamespace(partrel_new)),
RelationGetRelationName(partrel_new));
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 378b61fbd18..942e1abdb58 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1137,9 +1137,9 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
*
* Note that we don't support the case where the column list is different
* for the same table when combining publications. See comments atop
- * fetch_table_list. But one can later change the publication so we still
- * need to check all the given publication-table mappings and report an
- * error if any publications have a different column list.
+ * fetch_relation_list. But one can later change the publication so we
+ * still need to check all the given publication-table mappings and report
+ * an error if any publications have a different column list.
*/
foreach(lc, publications)
{
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index ad37f9f6ed0..fa08059671b 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2319,11 +2319,11 @@ match_previous_words(int pattern_id,
/* ALTER SUBSCRIPTION <name> */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
- "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (",
- "ADD PUBLICATION", "DROP PUBLICATION");
- /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
- else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION"))
- COMPLETE_WITH("WITH (");
+ "RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES",
+ "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
+ /* ALTER SUBSCRIPTION <name> REFRESH */
+ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH"))
+ COMPLETE_WITH("PUBLICATION", "SEQUENCES");
/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH", "PUBLICATION", "WITH", "("))
COMPLETE_WITH("copy_data");
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 61b63c6bb7a..9f88498ecd3 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -90,7 +90,8 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
extern bool HasSubscriptionTables(Oid subid);
-extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern List *GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
+ bool not_ready);
extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 3248e78cd28..0ba86c2ad72 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -784,8 +784,8 @@ extern void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
TupleTableSlot *searchslot);
extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
-extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
- const char *relname);
+extern void CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
+ const char *nspname, const char *relname);
/*
* prototypes from functions in nodeModifyTable.c
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4e445fe0cd7..ecbddd12e1b 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4362,6 +4362,7 @@ typedef enum AlterSubscriptionType
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
ALTER_SUBSCRIPTION_DROP_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH_PUBLICATION,
+ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES,
ALTER_SUBSCRIPTION_ENABLED,
ALTER_SUBSCRIPTION_SKIP,
} AlterSubscriptionType;
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 20b4e523d93..85d10a89994 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -45,6 +45,7 @@ tests += {
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
't/035_conflicts.pl',
+ 't/036_sequences.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
new file mode 100644
index 00000000000..557fc91c017
--- /dev/null
+++ b/src/test/subscription/t/036_sequences.pl
@@ -0,0 +1,55 @@
+
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# This tests that sequences are registered to be synced to the subscriber
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+
+# Avoid checkpoint during the test, otherwise, extra values will be fetched for
+# the sequences which will cause the test to fail randomly.
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Initialize subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Setup structure on the publisher
+my $ddl = qq(
+ CREATE TABLE regress_seq_test (v BIGINT);
+ CREATE SEQUENCE regress_s1;
+);
+$node_publisher->safe_psql('postgres', $ddl);
+
+# Setup the same structure on the subscriber
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Insert initial test data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ -- generate a number of values using the sequence
+ INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
+));
+
+# Setup logical replication pub/sub
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION regress_seq_pub FOR ALL SEQUENCES");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub"
+);
+
+# Confirm sequences can be listed in pg_subscription_rel
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT relname, srsubstate FROM pg_class, pg_subscription_rel WHERE oid = srrelid"
+);
+is($result, 'regress_s1|i', "Sequence can be in pg_subscription_rel catalog");
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c13f92988ee..43fe3bcd593 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2363,6 +2363,7 @@ PublicationObjSpec
PublicationObjSpecType
PublicationPartOpt
PublicationRelInfo
+PublicationRelKind
PublicationSchemaInfo
PublicationTable
PublishGencolsType