summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_publication.c15
-rw-r--r--src/backend/commands/publicationcmds.c156
-rw-r--r--src/backend/commands/tablecmds.c27
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c31
4 files changed, 81 insertions, 148 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index e27db27f04a..cd48cb6c469 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1111,6 +1111,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
HeapTuple pubtuple = NULL;
HeapTuple rettuple;
Oid relid = list_nth_oid(tables, funcctx->call_cntr);
+ Oid schemaid = get_rel_namespace(relid);
Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
@@ -1122,9 +1123,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
values[0] = ObjectIdGetDatum(relid);
- pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
- ObjectIdGetDatum(relid),
- ObjectIdGetDatum(publication->oid));
+ /*
+ * We don't consider row filters or column lists for FOR ALL TABLES or
+ * FOR TABLES IN SCHEMA publications.
+ */
+ if (!publication->alltables &&
+ !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(publication->oid)))
+ pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(publication->oid));
if (HeapTupleIsValid(pubtuple))
{
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 15ab5aa99ec..7ffad096aa1 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -66,7 +66,6 @@ typedef struct rf_context
Oid parentid; /* relid of the parent relation */
} rf_context;
-static List *OpenRelIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
static void LockSchemaList(List *schemalist);
@@ -215,44 +214,6 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
}
/*
- * Check if any of the given relation's schema is a member of the given schema
- * list.
- */
-static void
-CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
- PublicationObjSpecType checkobjtype)
-{
- ListCell *lc;
-
- foreach(lc, rels)
- {
- PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
- Relation rel = pub_rel->relation;
- Oid relSchemaId = RelationGetNamespace(rel);
-
- if (list_member_oid(schemaidlist, relSchemaId))
- {
- if (checkobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot add schema \"%s\" to publication",
- get_namespace_name(relSchemaId)),
- errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
- RelationGetRelationName(rel),
- get_namespace_name(relSchemaId)));
- else if (checkobjtype == PUBLICATIONOBJ_TABLE)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot add relation \"%s.%s\" to publication",
- get_namespace_name(relSchemaId),
- RelationGetRelationName(rel)),
- errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
- get_namespace_name(relSchemaId)));
- }
- }
-}
-
-/*
* Returns true if any of the columns used in the row filter WHERE expression is
* not part of REPLICA IDENTITY, false otherwise.
*/
@@ -721,7 +682,7 @@ TransformPubWhereClauses(List *tables, const char *queryString,
*/
static void
CheckPubRelationColumnList(List *tables, const char *queryString,
- bool pubviaroot)
+ bool publish_schema, bool pubviaroot)
{
ListCell *lc;
@@ -733,6 +694,24 @@ CheckPubRelationColumnList(List *tables, const char *queryString,
continue;
/*
+ * Disallow specifying column list if any schema is in the
+ * publication.
+ *
+ * XXX We could instead just forbid the case when the publication
+ * tries to publish the table with a column list and a schema for that
+ * table. However, if we do that then we need a restriction during
+ * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
+ * seem to be a good idea.
+ */
+ if (publish_schema)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot use publication column list for relation \"%s.%s\"",
+ get_namespace_name(RelationGetNamespace(pri->relation)),
+ RelationGetRelationName(pri->relation)),
+ errdetail("Column list cannot be specified if any schema is part of the publication or specified in the list."));
+
+ /*
* If the publication doesn't publish changes via the root partitioned
* table, the partition's column list will be used. So disallow using
* the column list on partitioned table in this case.
@@ -858,13 +837,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
List *rels;
rels = OpenTableList(relations);
- CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
- PUBLICATIONOBJ_TABLE);
-
TransformPubWhereClauses(rels, pstate->p_sourcetext,
publish_via_partition_root);
CheckPubRelationColumnList(rels, pstate->p_sourcetext,
+ schemaidlist != NIL,
publish_via_partition_root);
PublicationAddTables(puboid, rels, true, NULL);
@@ -1110,8 +1087,8 @@ InvalidatePublicationRels(List *relids)
*/
static void
AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
- List *tables, List *schemaidlist,
- const char *queryString)
+ List *tables, const char *queryString,
+ bool publish_schema)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
@@ -1129,19 +1106,12 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
if (stmt->action == AP_AddObjects)
{
- List *schemas = NIL;
-
- /*
- * Check if the relation is member of the existing schema in the
- * publication or member of the schema list specified.
- */
- schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
- CheckObjSchemaNotAlreadyInPublication(rels, schemas,
- PUBLICATIONOBJ_TABLE);
-
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
- CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
+ publish_schema |= is_schema_publication(pubid);
+
+ CheckPubRelationColumnList(rels, queryString, publish_schema,
+ pubform->pubviaroot);
PublicationAddTables(pubid, rels, false, stmt);
}
@@ -1154,12 +1124,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
List *delrels = NIL;
ListCell *oldlc;
- CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
- PUBLICATIONOBJ_TABLE);
-
TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
- CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
+ CheckPubRelationColumnList(rels, queryString, publish_schema,
+ pubform->pubviaroot);
/*
* To recreate the relation list for the publication, look for
@@ -1308,16 +1276,35 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
LockSchemaList(schemaidlist);
if (stmt->action == AP_AddObjects)
{
- List *rels;
+ ListCell *lc;
List *reloids;
reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
- rels = OpenRelIdList(reloids);
- CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
- PUBLICATIONOBJ_TABLES_IN_SCHEMA);
+ foreach(lc, reloids)
+ {
+ HeapTuple coltuple;
+
+ coltuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(lfirst_oid(lc)),
+ ObjectIdGetDatum(pubform->oid));
+
+ if (!HeapTupleIsValid(coltuple))
+ continue;
+
+ /*
+ * Disallow adding schema if column list is already part of the
+ * publication. See CheckPubRelationColumnList.
+ */
+ if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add schema to the publication"),
+ errdetail("Schema cannot be added if any table that specifies column list is already part of the publication."));
+
+ ReleaseSysCache(coltuple);
+ }
- CloseTableList(rels);
PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
}
else if (stmt->action == AP_DropObjects)
@@ -1429,14 +1416,7 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
heap_freetuple(tup);
- /*
- * Lock the publication so nobody else can do anything with it. This
- * prevents concurrent alter to add table(s) that were already going
- * to become part of the publication by adding corresponding schema(s)
- * via this command and similarly it will prevent the concurrent
- * addition of schema(s) for which there is any corresponding table
- * being added by this command.
- */
+ /* Lock the publication so nobody else can do anything with it. */
LockDatabaseObject(PublicationRelationId, pubid, 0,
AccessExclusiveLock);
@@ -1453,8 +1433,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
errmsg("publication \"%s\" does not exist",
stmt->pubname));
- AlterPublicationTables(stmt, tup, relations, schemaidlist,
- pstate->p_sourcetext);
+ AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
+ schemaidlist != NIL);
AlterPublicationSchemas(stmt, tup, schemaidlist);
}
@@ -1570,32 +1550,6 @@ RemovePublicationSchemaById(Oid psoid)
}
/*
- * Open relations specified by a relid list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
- * add them to a publication.
- */
-static List *
-OpenRelIdList(List *relids)
-{
- ListCell *lc;
- List *rels = NIL;
-
- foreach(lc, relids)
- {
- PublicationRelInfo *pub_rel;
- Oid relid = lfirst_oid(lc);
- Relation rel = table_open(relid,
- ShareUpdateExclusiveLock);
-
- pub_rel = palloc(sizeof(PublicationRelInfo));
- pub_rel->relation = rel;
- rels = lappend(rels, pub_rel);
- }
-
- return rels;
-}
-
-/*
* Open relations specified by a PublicationTable list.
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
* add them to a publication.
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 6c52edd9bee..7d8a75d23c2 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -16409,33 +16409,6 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema)
newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1);
nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL);
- /*
- * Check that setting the relation to a different schema won't result in a
- * publication having both a schema and the same schema's table, as this
- * is not supported.
- */
- if (stmt->objectType == OBJECT_TABLE)
- {
- ListCell *lc;
- List *schemaPubids = GetSchemaPublications(nspOid);
- List *relPubids = GetRelationPublications(RelationGetRelid(rel));
-
- foreach(lc, relPubids)
- {
- Oid pubid = lfirst_oid(lc);
-
- if (list_member_oid(schemaPubids, pubid))
- ereport(ERROR,
- errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot move table \"%s\" to schema \"%s\"",
- RelationGetRelationName(rel), stmt->newschema),
- errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".",
- stmt->newschema,
- RelationGetRelationName(rel),
- get_publication_name(pubid, false)));
- }
- }
-
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 92bbffbe7cd..2ecaa5b9074 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -854,6 +854,7 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
MemoryContext oldctx;
int idx;
bool has_filter = true;
+ Oid schemaid = get_rel_namespace(entry->publish_as_relid);
/*
* Find if there are any row filters for this relation. If there are, then
@@ -867,26 +868,26 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
* are multiple lists (one for each operation) to which row filters will
* be appended.
*
- * FOR ALL TABLES implies "don't use row filter expression" so it takes
- * precedence.
+ * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row
+ * filter expression" so it takes precedence.
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
HeapTuple rftuple = NULL;
Datum rfdatum = 0;
- bool pub_no_filter = false;
+ bool pub_no_filter = true;
- if (pub->alltables)
- {
- /*
- * If the publication is FOR ALL TABLES then it is treated the
- * same as if this table has no row filters (even if for other
- * publications it does).
- */
- pub_no_filter = true;
- }
- else
+ /*
+ * If the publication is FOR ALL TABLES, or the publication includes a
+ * FOR TABLES IN SCHEMA where the table belongs to the referred
+ * schema, then it is treated the same as if there are no row filters
+ * (even if other publications have a row filter).
+ */
+ if (!pub->alltables &&
+ !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pub->oid)))
{
/*
* Check for the presence of a row filter in this publication.
@@ -902,10 +903,6 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
Anum_pg_publication_rel_prqual,
&pub_no_filter);
}
- else
- {
- pub_no_filter = true;
- }
}
if (pub_no_filter)