summaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c480
1 files changed, 384 insertions, 96 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0f54686b699..a0974d71de1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -106,12 +106,29 @@ typedef struct SubOpts
XLogRecPtr lsn;
} SubOpts;
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
-static void check_publications_origin(WalReceiverConn *wrconn,
- List *publications, bool copydata,
- bool retain_dead_tuples, char *origin,
- Oid *subrel_local_oids, int subrel_count,
- char *subname);
+/*
+ * PublicationRelKind represents a relation included in a publication.
+ * It stores the schema-qualified relation name (rv) and its kind (relkind).
+ */
+typedef struct PublicationRelKind
+{
+ RangeVar *rv;
+ char relkind;
+} PublicationRelKind;
+
+static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications_origin_tables(WalReceiverConn *wrconn,
+ List *publications, bool copydata,
+ bool retain_dead_tuples,
+ char *origin,
+ Oid *subrel_local_oids,
+ int subrel_count, char *subname);
+static void check_publications_origin_sequences(WalReceiverConn *wrconn,
+ List *publications,
+ bool copydata, char *origin,
+ Oid *subrel_local_oids,
+ int subrel_count,
+ char *subname);
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
+ /*
+ * A replication origin is currently created for all subscriptions,
+ * including those that only contain sequences or are otherwise empty.
+ *
+ * XXX: While this is technically unnecessary, optimizing it would require
+ * additional logic to skip origin creation during DDL operations and
+ * apply workers initialization, and to handle origin creation dynamically
+ * when tables are added to the subscription. It is not clear whether
+ * preventing creation of origins is worth additional complexity.
+ */
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
replorigin_create(originname);
/*
* Connect to remote side to execute requested commands and fetch table
- * info.
+ * and sequence info.
*/
if (opts.connect)
{
char *err;
WalReceiverConn *wrconn;
- List *tables;
- ListCell *lc;
- char table_state;
bool must_use_password;
/* Try to connect to the publisher. */
@@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY();
{
+ bool has_tables = false;
+ List *pubrels;
+ char relation_state;
+
check_publications(wrconn, publications);
- check_publications_origin(wrconn, publications, opts.copy_data,
- opts.retaindeadtuples, opts.origin,
- NULL, 0, stmt->subname);
+ check_publications_origin_tables(wrconn, publications,
+ opts.copy_data,
+ opts.retaindeadtuples, opts.origin,
+ NULL, 0, stmt->subname);
+ check_publications_origin_sequences(wrconn, publications,
+ opts.copy_data, opts.origin,
+ NULL, 0, stmt->subname);
if (opts.retaindeadtuples)
check_pub_dead_tuple_retention(wrconn);
@@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* Set sync state based on if we were asked to do data copy or
* not.
*/
- table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
- * Get the table list from publisher and build local table status
- * info.
+ * Build local relation status info. Relations are for both tables
+ * and sequences from the publisher.
*/
- tables = fetch_table_list(wrconn, publications);
- foreach(lc, tables)
+ pubrels = fetch_relation_list(wrconn, publications);
+
+ foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
{
- RangeVar *rv = (RangeVar *) lfirst(lc);
Oid relid;
+ char relkind;
+ RangeVar *rv = pubrelinfo->rv;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ relkind = get_rel_relkind(relid);
/* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
+ CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
rv->schemaname, rv->relname);
-
- AddSubscriptionRelState(subid, relid, table_state,
+ has_tables |= (relkind != RELKIND_SEQUENCE);
+ AddSubscriptionRelState(subid, relid, relation_state,
InvalidXLogRecPtr, true);
}
@@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
* export it.
+ *
+ * XXX: Similar to origins, it is not clear whether preventing the
+ * slot creation for empty and sequence-only subscriptions is
+ * worth additional complexity.
*/
if (opts.create_slot)
{
@@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
* PUBLICATION to work.
*/
- if (opts.twophase && !opts.copy_data && tables != NIL)
+ if (opts.twophase && !opts.copy_data && has_tables)
twophase_enabled = true;
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
List *validate_publications)
{
char *err;
- List *pubrel_names;
+ List *pubrels = NIL;
+ Oid *pubrel_local_oids;
List *subrel_states;
+ List *sub_remove_rels = NIL;
Oid *subrel_local_oids;
- Oid *pubrel_local_oids;
+ Oid *subseq_local_oids;
+ int subrel_count;
ListCell *lc;
int off;
- int remove_rel_len;
- int subrel_count;
+ int tbl_count = 0;
+ int seq_count = 0;
Relation rel = NULL;
typedef struct SubRemoveRels
{
Oid relid;
char state;
} SubRemoveRels;
- SubRemoveRels *sub_remove_rels;
+
WalReceiverConn *wrconn;
bool must_use_password;
@@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
if (validate_publications)
check_publications(wrconn, validate_publications);
- /* Get the table list from publisher. */
- pubrel_names = fetch_table_list(wrconn, sub->publications);
+ /* Get the relation list from publisher. */
+ pubrels = fetch_relation_list(wrconn, sub->publications);
- /* Get local table list. */
- subrel_states = GetSubscriptionRelations(sub->oid, false);
+ /* Get local relation list. */
+ subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
subrel_count = list_length(subrel_states);
/*
- * Build qsorted array of local table oids for faster lookup. This can
- * potentially contain all tables in the database so speed of lookup
- * is important.
+ * Build qsorted arrays of local table oids and sequence oids for
+ * faster lookup. This can potentially contain all tables and
+ * sequences in the database so speed of lookup is important.
+ *
+ * We do not yet know the exact count of tables and sequences, so we
+ * allocate separate arrays for table OIDs and sequence OIDs based on
+ * the total number of relations (subrel_count).
*/
subrel_local_oids = palloc(subrel_count * sizeof(Oid));
- off = 0;
+ subseq_local_oids = palloc(subrel_count * sizeof(Oid));
foreach(lc, subrel_states)
{
SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
- subrel_local_oids[off++] = relstate->relid;
+ if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
+ subseq_local_oids[seq_count++] = relstate->relid;
+ else
+ subrel_local_oids[tbl_count++] = relstate->relid;
}
- qsort(subrel_local_oids, subrel_count,
- sizeof(Oid), oid_cmp);
- check_publications_origin(wrconn, sub->publications, copy_data,
- sub->retaindeadtuples, sub->origin,
- subrel_local_oids, subrel_count, sub->name);
+ qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
+ check_publications_origin_tables(wrconn, sub->publications, copy_data,
+ sub->retaindeadtuples, sub->origin,
+ subrel_local_oids, tbl_count,
+ sub->name);
- /*
- * Rels that we want to remove from subscription and drop any slots
- * and origins corresponding to them.
- */
- sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
+ qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
+ check_publications_origin_sequences(wrconn, sub->publications,
+ copy_data, sub->origin,
+ subseq_local_oids, seq_count,
+ sub->name);
/*
- * Walk over the remote tables and try to match them to locally known
- * tables. If the table is not known locally create a new state for
- * it.
+ * Walk over the remote relations and try to match them to locally
+ * known relations. If the relation is not known locally create a new
+ * state for it.
*
- * Also builds array of local oids of remote tables for the next step.
+ * Also builds array of local oids of remote relations for the next
+ * step.
*/
off = 0;
- pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+ pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
- foreach(lc, pubrel_names)
+ foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
{
- RangeVar *rv = (RangeVar *) lfirst(lc);
+ RangeVar *rv = pubrelinfo->rv;
Oid relid;
+ char relkind;
relid = RangeVarGetRelid(rv, AccessShareLock, false);
+ relkind = get_rel_relkind(relid);
/* Check for supported relkind. */
- CheckSubscriptionRelkind(get_rel_relkind(relid),
+ CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
rv->schemaname, rv->relname);
pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids,
- subrel_count, sizeof(Oid), oid_cmp))
+ tbl_count, sizeof(Oid), oid_cmp) &&
+ !bsearch(&relid, subseq_local_oids,
+ seq_count, sizeof(Oid), oid_cmp))
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr, true);
ereport(DEBUG1,
- (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
+ errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
+ relkind == RELKIND_SEQUENCE ? "sequence" : "table",
+ rv->schemaname, rv->relname, sub->name));
}
}
@@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Next remove state for tables we should not care about anymore using
* the data we collected above
*/
- qsort(pubrel_local_oids, list_length(pubrel_names),
- sizeof(Oid), oid_cmp);
+ qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
- remove_rel_len = 0;
- for (off = 0; off < subrel_count; off++)
+ for (off = 0; off < tbl_count; off++)
{
Oid relid = subrel_local_oids[off];
if (!bsearch(&relid, pubrel_local_oids,
- list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ list_length(pubrels), sizeof(Oid), oid_cmp))
{
char state;
XLogRecPtr statelsn;
+ SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
/*
* Lock pg_subscription_rel with AccessExclusiveLock to
@@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Last known rel state. */
state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
- sub_remove_rels[remove_rel_len].relid = relid;
- sub_remove_rels[remove_rel_len++].state = state;
-
RemoveSubscriptionRel(sub->oid, relid);
+ remove_rel->relid = relid;
+ remove_rel->state = state;
+
+ sub_remove_rels = lappend(sub_remove_rels, remove_rel);
+
logicalrep_worker_stop(sub->oid, relid);
/*
@@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* to be at the end because otherwise if there is an error while doing
* the database operations we won't be able to rollback dropped slots.
*/
- for (off = 0; off < remove_rel_len; off++)
+ foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
{
- if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
- sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ if (rel->state != SUBREL_STATE_READY &&
+ rel->state != SUBREL_STATE_SYNCDONE)
{
char syncslotname[NAMEDATALEN] = {0};
@@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* dropped slots and fail. For these reasons, we allow
* missing_ok = true for the drop.
*/
- ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+ ReplicationSlotNameForTablesync(sub->oid, rel->relid,
syncslotname, sizeof(syncslotname));
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
+
+ /*
+ * Next remove state for sequences we should not care about anymore
+ * using the data we collected above
+ */
+ for (off = 0; off < seq_count; off++)
+ {
+ Oid relid = subseq_local_oids[off];
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrels), sizeof(Oid), oid_cmp))
+ {
+ /*
+ * This locking ensures that the state of rels won't change
+ * till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ ereport(DEBUG1,
+ errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
+ }
+ }
}
PG_FINALLY();
{
@@ -1098,6 +1182,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
+ * Marks all sequences with INIT state.
+ */
+static void
+AlterSubscription_refresh_seq(Subscription *sub)
+{
+ char *err = NULL;
+ WalReceiverConn *wrconn;
+ bool must_use_password;
+
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ must_use_password = sub->passwordrequired && !sub->ownersuperuser;
+ wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
+ sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("subscription \"%s\" could not connect to the publisher: %s",
+ sub->name, err));
+
+ PG_TRY();
+ {
+ List *subrel_states;
+
+ check_publications_origin_sequences(wrconn, sub->publications, true,
+ sub->origin, NULL, 0, sub->name);
+
+ /* Get local sequence list. */
+ subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
+ foreach_ptr(SubscriptionRelState, subrel, subrel_states)
+ {
+ Oid relid = subrel->relid;
+
+ UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
+ InvalidXLogRecPtr, false);
+ ereport(DEBUG1,
+ errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name));
+ }
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+}
+
+/*
* Common checks for altering failover, two_phase, and retain_dead_tuples
* options.
*/
@@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES:
+ {
+ if (!sub->enabled)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("%s is not allowed for disabled subscriptions",
+ "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
+
+ AlterSubscription_refresh_seq(sub);
+
+ break;
+ }
+
case ALTER_SUBSCRIPTION_SKIP:
{
parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
@@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (retain_dead_tuples)
check_pub_dead_tuple_retention(wrconn);
- check_publications_origin(wrconn, sub->publications, false,
- retain_dead_tuples, origin, NULL, 0,
- sub->name);
+ check_publications_origin_tables(wrconn, sub->publications, false,
+ retain_dead_tuples, origin, NULL, 0,
+ sub->name);
if (update_failover || update_two_phase)
walrcv_alter_slot(wrconn, sub->slotname,
@@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* the apply and tablesync workers and they can't restart because of
* exclusive lock on the subscription.
*/
- rstates = GetSubscriptionRelations(subid, true);
+ rstates = GetSubscriptionRelations(subid, true, false, true);
foreach(lc, rstates)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
@@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* - See comments atop worker.c for more details.
*/
static void
-check_publications_origin(WalReceiverConn *wrconn, List *publications,
- bool copydata, bool retain_dead_tuples,
- char *origin, Oid *subrel_local_oids,
- int subrel_count, char *subname)
+check_publications_origin_tables(WalReceiverConn *wrconn, List *publications,
+ bool copydata, bool retain_dead_tuples,
+ char *origin, Oid *subrel_local_oids,
+ int subrel_count, char *subname)
{
WalRcvExecResult *res;
StringInfoData cmd;
@@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
- /* Process tables. */
+ /* Process publications. */
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
@@ -2483,6 +2632,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
}
/*
+ * This function is similar to check_publications_origin_tables and serves
+ * same purpose for sequences.
+ */
+static void
+check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications,
+ bool copydata, char *origin,
+ Oid *subrel_local_oids, int subrel_count,
+ char *subname)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[1] = {TEXTOID};
+ List *publist = NIL;
+
+ /*
+ * Enable sequence synchronization checks only when origin is 'none' , to
+ * ensure that sequence data from other origins is not inadvertently
+ * copied.
+ */
+ if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
+ return;
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd,
+ "SELECT DISTINCT P.pubname AS pubname\n"
+ "FROM pg_publication P,\n"
+ " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
+ " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+ "WHERE C.oid = GPS.relid AND P.pubname IN (");
+
+ GetPublicationsStr(publications, &cmd, true);
+ appendStringInfoString(&cmd, ")\n");
+
+ /*
+ * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
+ * subrel_local_oids contains the list of relations that are already
+ * present on the subscriber. This check should be skipped as these will
+ * not be re-synced.
+ */
+ for (int i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *seqname = get_rel_name(relid);
+
+ appendStringInfo(&cmd,
+ "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, seqname);
+ }
+
+ res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of replicated sequences from the publisher: %s",
+ res->err)));
+
+ /* Process publications. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *pubname;
+ bool isnull;
+
+ pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ publist = list_append_unique(publist, makeString(pubname));
+ }
+
+ /*
+ * Log a warning if the publisher has subscribed to the same sequence from
+ * some other publisher. We cannot know the origin of sequences data
+ * during the initial sync.
+ */
+ if (publist)
+ {
+ StringInfo pubnames = makeStringInfo();
+ StringInfo err_msg = makeStringInfo();
+ StringInfo err_hint = makeStringInfo();
+
+ /* Prepare the list of publication(s) for warning message. */
+ GetPublicationsStr(publist, pubnames, false);
+
+ appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
+ subname);
+ appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins."));
+
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_internal("%s", err_msg->data),
+ errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
+ "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
+ list_length(publist), pubnames->data),
+ errhint_internal("%s", err_hint->data));
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
* Determine whether the retain_dead_tuples can be enabled based on the
* publisher's status.
*
@@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
}
/*
- * Get the list of tables which belong to specified publications on the
- * publisher connection.
+ * Return true iff 'rv' is a member of the list.
+ */
+static bool
+list_member_rangevar(const List *list, RangeVar *rv)
+{
+ foreach_ptr(PublicationRelKind, relinfo, list)
+ {
+ if (equal(relinfo->rv, rv))
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Get the list of tables and sequences which belong to specified publications
+ * on the publisher connection.
*
* Note that we don't support the case where the column list is different for
* the same table in different publications to avoid sending unwanted column
@@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
* list and row filter are specified for different publications.
*/
static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_relation_list(WalReceiverConn *wrconn, List *publications)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
- List *tablelist = NIL;
+ Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
+ List *relationlist = NIL;
int server_version = walrcv_server_version(wrconn);
bool check_columnlist = (server_version >= 150000);
+ int column_count = check_columnlist ? 4 : 3;
StringInfo pub_names = makeStringInfo();
initStringInfo(&cmd);
@@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
/* Build the pub_names comma-separated string. */
GetPublicationsStr(publications, pub_names, true);
- /* Get the list of tables from the publisher. */
+ /* Get the list of relations from the publisher */
if (server_version >= 160000)
{
- tableRow[2] = INT2VECTOROID;
+ tableRow[3] = INT2VECTOROID;
/*
* From version 16, we allowed passing multiple publications to the
@@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
* to worry if different publications have specified them in a
* different order. See pub_collist_validate.
*/
- appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
- " FROM pg_class c\n"
+ appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
+ " FROM pg_class c\n"
" JOIN pg_namespace n ON n.oid = c.relnamespace\n"
" JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
" FROM pg_publication\n"
" WHERE pubname IN ( %s )) AS gpt\n"
" ON gpt.relid = c.oid\n",
pub_names->data);
+
+ /* From version 19, inclusion of sequences in the target is supported */
+ if (server_version >= 190000)
+ appendStringInfo(&cmd,
+ "UNION ALL\n"
+ " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
+ " FROM pg_catalog.pg_publication_sequences s\n"
+ " WHERE s.pubname IN ( %s )",
+ pub_names->data);
}
else
{
- tableRow[2] = NAMEARRAYOID;
- appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
+ tableRow[3] = NAMEARRAYOID;
+ appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
/* Get column lists for each relation if the publisher supports it */
if (check_columnlist)
@@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
destroyStringInfo(pub_names);
- res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
+ res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
@@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
char *nspname;
char *relname;
bool isnull;
- RangeVar *rv;
+ char relkind;
+ PublicationRelKind *relinfo = palloc_object(PublicationRelKind);
nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
+ relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
- rv = makeRangeVar(nspname, relname, -1);
+ relinfo->rv = makeRangeVar(nspname, relname, -1);
+ relinfo->relkind = relkind;
- if (check_columnlist && list_member(tablelist, rv))
+ if (relkind != RELKIND_SEQUENCE &&
+ check_columnlist &&
+ list_member_rangevar(relationlist, relinfo->rv))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
nspname, relname));
else
- tablelist = lappend(tablelist, rv);
+ relationlist = lappend(relationlist, relinfo);
ExecClearTuple(slot);
}
@@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
walrcv_clear_result(res);
- return tablelist;
+ return relationlist;
}
/*