diff options
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/pg_publication.c | 15 | ||||
| -rw-r--r-- | src/backend/commands/publicationcmds.c | 156 | ||||
| -rw-r--r-- | src/backend/commands/tablecmds.c | 27 | ||||
| -rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 31 |
4 files changed, 81 insertions, 148 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index e27db27f04a..cd48cb6c469 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1111,6 +1111,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) HeapTuple pubtuple = NULL; HeapTuple rettuple; Oid relid = list_nth_oid(tables, funcctx->call_cntr); + Oid schemaid = get_rel_namespace(relid); Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0}; bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0}; @@ -1122,9 +1123,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) values[0] = ObjectIdGetDatum(relid); - pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP, - ObjectIdGetDatum(relid), - ObjectIdGetDatum(publication->oid)); + /* + * We don't consider row filters or column lists for FOR ALL TABLES or + * FOR TABLES IN SCHEMA publications. + */ + if (!publication->alltables && + !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(publication->oid))) + pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(publication->oid)); if (HeapTupleIsValid(pubtuple)) { diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 15ab5aa99ec..7ffad096aa1 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -66,7 +66,6 @@ typedef struct rf_context Oid parentid; /* relid of the parent relation */ } rf_context; -static List *OpenRelIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void LockSchemaList(List *schemalist); @@ -215,44 +214,6 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, } /* - * Check if any of the given relation's schema is a member of the given schema - * list. - */ -static void -CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, - PublicationObjSpecType checkobjtype) -{ - ListCell *lc; - - foreach(lc, rels) - { - PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc); - Relation rel = pub_rel->relation; - Oid relSchemaId = RelationGetNamespace(rel); - - if (list_member_oid(schemaidlist, relSchemaId)) - { - if (checkobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add schema \"%s\" to publication", - get_namespace_name(relSchemaId)), - errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.", - RelationGetRelationName(rel), - get_namespace_name(relSchemaId))); - else if (checkobjtype == PUBLICATIONOBJ_TABLE) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot add relation \"%s.%s\" to publication", - get_namespace_name(relSchemaId), - RelationGetRelationName(rel)), - errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.", - get_namespace_name(relSchemaId))); - } - } -} - -/* * Returns true if any of the columns used in the row filter WHERE expression is * not part of REPLICA IDENTITY, false otherwise. */ @@ -721,7 +682,7 @@ TransformPubWhereClauses(List *tables, const char *queryString, */ static void CheckPubRelationColumnList(List *tables, const char *queryString, - bool pubviaroot) + bool publish_schema, bool pubviaroot) { ListCell *lc; @@ -733,6 +694,24 @@ CheckPubRelationColumnList(List *tables, const char *queryString, continue; /* + * Disallow specifying column list if any schema is in the + * publication. + * + * XXX We could instead just forbid the case when the publication + * tries to publish the table with a column list and a schema for that + * table. However, if we do that then we need a restriction during + * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't + * seem to be a good idea. + */ + if (publish_schema) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use publication column list for relation \"%s.%s\"", + get_namespace_name(RelationGetNamespace(pri->relation)), + RelationGetRelationName(pri->relation)), + errdetail("Column list cannot be specified if any schema is part of the publication or specified in the list.")); + + /* * If the publication doesn't publish changes via the root partitioned * table, the partition's column list will be used. So disallow using * the column list on partitioned table in this case. @@ -858,13 +837,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) List *rels; rels = OpenTableList(relations); - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, - PUBLICATIONOBJ_TABLE); - TransformPubWhereClauses(rels, pstate->p_sourcetext, publish_via_partition_root); CheckPubRelationColumnList(rels, pstate->p_sourcetext, + schemaidlist != NIL, publish_via_partition_root); PublicationAddTables(puboid, rels, true, NULL); @@ -1110,8 +1087,8 @@ InvalidatePublicationRels(List *relids) */ static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *schemaidlist, - const char *queryString) + List *tables, const char *queryString, + bool publish_schema) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -1129,19 +1106,12 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (stmt->action == AP_AddObjects) { - List *schemas = NIL; - - /* - * Check if the relation is member of the existing schema in the - * publication or member of the schema list specified. - */ - schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); - CheckObjSchemaNotAlreadyInPublication(rels, schemas, - PUBLICATIONOBJ_TABLE); - TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); - CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + publish_schema |= is_schema_publication(pubid); + + CheckPubRelationColumnList(rels, queryString, publish_schema, + pubform->pubviaroot); PublicationAddTables(pubid, rels, false, stmt); } @@ -1154,12 +1124,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *delrels = NIL; ListCell *oldlc; - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, - PUBLICATIONOBJ_TABLE); - TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); - CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot); + CheckPubRelationColumnList(rels, queryString, publish_schema, + pubform->pubviaroot); /* * To recreate the relation list for the publication, look for @@ -1308,16 +1276,35 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, LockSchemaList(schemaidlist); if (stmt->action == AP_AddObjects) { - List *rels; + ListCell *lc; List *reloids; reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); - rels = OpenRelIdList(reloids); - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, - PUBLICATIONOBJ_TABLES_IN_SCHEMA); + foreach(lc, reloids) + { + HeapTuple coltuple; + + coltuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(lfirst_oid(lc)), + ObjectIdGetDatum(pubform->oid)); + + if (!HeapTupleIsValid(coltuple)) + continue; + + /* + * Disallow adding schema if column list is already part of the + * publication. See CheckPubRelationColumnList. + */ + if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema to the publication"), + errdetail("Schema cannot be added if any table that specifies column list is already part of the publication.")); + + ReleaseSysCache(coltuple); + } - CloseTableList(rels); PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); } else if (stmt->action == AP_DropObjects) @@ -1429,14 +1416,7 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) heap_freetuple(tup); - /* - * Lock the publication so nobody else can do anything with it. This - * prevents concurrent alter to add table(s) that were already going - * to become part of the publication by adding corresponding schema(s) - * via this command and similarly it will prevent the concurrent - * addition of schema(s) for which there is any corresponding table - * being added by this command. - */ + /* Lock the publication so nobody else can do anything with it. */ LockDatabaseObject(PublicationRelationId, pubid, 0, AccessExclusiveLock); @@ -1453,8 +1433,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, relations, schemaidlist, - pstate->p_sourcetext); + AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext, + schemaidlist != NIL); AlterPublicationSchemas(stmt, tup, schemaidlist); } @@ -1570,32 +1550,6 @@ RemovePublicationSchemaById(Oid psoid) } /* - * Open relations specified by a relid list. - * The returned tables are locked in ShareUpdateExclusiveLock mode in order to - * add them to a publication. - */ -static List * -OpenRelIdList(List *relids) -{ - ListCell *lc; - List *rels = NIL; - - foreach(lc, relids) - { - PublicationRelInfo *pub_rel; - Oid relid = lfirst_oid(lc); - Relation rel = table_open(relid, - ShareUpdateExclusiveLock); - - pub_rel = palloc(sizeof(PublicationRelInfo)); - pub_rel->relation = rel; - rels = lappend(rels, pub_rel); - } - - return rels; -} - -/* * Open relations specified by a PublicationTable list. * The returned tables are locked in ShareUpdateExclusiveLock mode in order to * add them to a publication. diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 6c52edd9bee..7d8a75d23c2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -16409,33 +16409,6 @@ AlterTableNamespace(AlterObjectSchemaStmt *stmt, Oid *oldschema) newrv = makeRangeVar(stmt->newschema, RelationGetRelationName(rel), -1); nspOid = RangeVarGetAndCheckCreationNamespace(newrv, NoLock, NULL); - /* - * Check that setting the relation to a different schema won't result in a - * publication having both a schema and the same schema's table, as this - * is not supported. - */ - if (stmt->objectType == OBJECT_TABLE) - { - ListCell *lc; - List *schemaPubids = GetSchemaPublications(nspOid); - List *relPubids = GetRelationPublications(RelationGetRelid(rel)); - - foreach(lc, relPubids) - { - Oid pubid = lfirst_oid(lc); - - if (list_member_oid(schemaPubids, pubid)) - ereport(ERROR, - errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot move table \"%s\" to schema \"%s\"", - RelationGetRelationName(rel), stmt->newschema), - errdetail("The schema \"%s\" and same schema's table \"%s\" cannot be part of the same publication \"%s\".", - stmt->newschema, - RelationGetRelationName(rel), - get_publication_name(pubid, false))); - } - } - /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 92bbffbe7cd..2ecaa5b9074 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -854,6 +854,7 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, MemoryContext oldctx; int idx; bool has_filter = true; + Oid schemaid = get_rel_namespace(entry->publish_as_relid); /* * Find if there are any row filters for this relation. If there are, then @@ -867,26 +868,26 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, * are multiple lists (one for each operation) to which row filters will * be appended. * - * FOR ALL TABLES implies "don't use row filter expression" so it takes - * precedence. + * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row + * filter expression" so it takes precedence. */ foreach(lc, publications) { Publication *pub = lfirst(lc); HeapTuple rftuple = NULL; Datum rfdatum = 0; - bool pub_no_filter = false; + bool pub_no_filter = true; - if (pub->alltables) - { - /* - * If the publication is FOR ALL TABLES then it is treated the - * same as if this table has no row filters (even if for other - * publications it does). - */ - pub_no_filter = true; - } - else + /* + * If the publication is FOR ALL TABLES, or the publication includes a + * FOR TABLES IN SCHEMA where the table belongs to the referred + * schema, then it is treated the same as if there are no row filters + * (even if other publications have a row filter). + */ + if (!pub->alltables && + !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP, + ObjectIdGetDatum(schemaid), + ObjectIdGetDatum(pub->oid))) { /* * Check for the presence of a row filter in this publication. @@ -902,10 +903,6 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, Anum_pg_publication_rel_prqual, &pub_no_filter); } - else - { - pub_no_filter = true; - } } if (pub_no_filter) |
