diff options
| author | Amit Kapila <akapila@postgresql.org> | 2022-02-22 07:54:12 +0530 |
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2022-02-22 08:11:50 +0530 |
| commit | 52e4f0cd472d39d07732b99559989ea3b615be78 (patch) | |
| tree | e40cc7b7690f82c7cfb945fd55afdf55e9bc944f /src/backend | |
| parent | ebf6c5249b7db525e59563fb149642665c88f747 (diff) | |
Allow specifying row filters for logical replication of tables.
This feature adds row filtering for publication tables. When a publication
is defined or modified, an optional WHERE clause can be specified. Rows
that don't satisfy this WHERE clause will be filtered out. This allows a
set of tables to be partially replicated. The row filter is per table. A
new row filter can be added simply by specifying a WHERE clause after the
table name. The WHERE clause must be enclosed by parentheses.
The row filter WHERE clause for a table added to a publication that
publishes UPDATE and/or DELETE operations must contain only columns that
are covered by REPLICA IDENTITY. The row filter WHERE clause for a table
added to a publication that publishes INSERT can use any column. If the
row filter evaluates to NULL, it is regarded as "false". The WHERE clause
only allows simple expressions that don't have user-defined functions,
user-defined operators, user-defined types, user-defined collations,
non-immutable built-in functions, or references to system columns. These
restrictions could be addressed in the future.
If you choose to do the initial table synchronization, only data that
satisfies the row filters is copied to the subscriber. If the subscription
has several publications in which a table has been published with
different WHERE clauses, rows that satisfy ANY of the expressions will be
copied. If a subscriber is a pre-15 version, the initial table
synchronization won't use row filters even if they are defined in the
publisher.
The row filters are applied before publishing the changes. If the
subscription has several publications in which the same table has been
published with different filters (for the same publish operation), those
expressions get OR'ed together so that rows satisfying any of the
expressions will be replicated.
This means all the other filters become redundant if (a) one of the
publications have no filter at all, (b) one of the publications was
created using FOR ALL TABLES, (c) one of the publications was created
using FOR ALL TABLES IN SCHEMA and the table belongs to that same schema.
If your publication contains a partitioned table, the publication
parameter publish_via_partition_root determines if it uses the partition's
row filter (if the parameter is false, the default) or the root
partitioned table's row filter.
Psql commands \dRp+ and \d <table-name> will display any row filters.
Author: Hou Zhijie, Euler Taveira, Peter Smith, Ajin Cherian
Reviewed-by: Greg Nancarrow, Haiying Tang, Amit Kapila, Tomas Vondra, Dilip Kumar, Vignesh C, Alvaro Herrera, Andres Freund, Wei Wang
Discussion: https://www.postgresql.org/message-id/flat/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/pg_publication.c | 59 | ||||
| -rw-r--r-- | src/backend/commands/publicationcmds.c | 583 | ||||
| -rw-r--r-- | src/backend/executor/execReplication.c | 39 | ||||
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 1 | ||||
| -rw-r--r-- | src/backend/nodes/equalfuncs.c | 1 | ||||
| -rw-r--r-- | src/backend/parser/gram.y | 38 | ||||
| -rw-r--r-- | src/backend/replication/logical/proto.c | 36 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 142 | ||||
| -rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 833 | ||||
| -rw-r--r-- | src/backend/utils/cache/relcache.c | 98 |
10 files changed, 1621 insertions, 209 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index e14ca2f5630..25998fbb39b 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -276,17 +276,56 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, } /* + * Returns the relid of the topmost ancestor that is published via this + * publication if any, otherwise returns InvalidOid. + * + * Note that the list of ancestors should be ordered such that the topmost + * ancestor is at the end of the list. + */ +Oid +GetTopMostAncestorInPublication(Oid puboid, List *ancestors) +{ + ListCell *lc; + Oid topmost_relid = InvalidOid; + + /* + * Find the "topmost" ancestor that is in this publication. + */ + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *apubids = GetRelationPublications(ancestor); + List *aschemaPubids = NIL; + + if (list_member_oid(apubids, puboid)) + topmost_relid = ancestor; + else + { + aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); + if (list_member_oid(aschemaPubids, puboid)) + topmost_relid = ancestor; + } + + list_free(apubids); + list_free(aschemaPubids); + } + + return topmost_relid; +} + +/* * Insert new publication / relation mapping. */ ObjectAddress -publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, +publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists) { Relation rel; HeapTuple tup; Datum values[Natts_pg_publication_rel]; bool nulls[Natts_pg_publication_rel]; - Oid relid = RelationGetRelid(targetrel->relation); + Relation targetrel = pri->relation; + Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Publication *pub = GetPublication(pubid); ObjectAddress myself, @@ -311,10 +350,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("relation \"%s\" is already member of publication \"%s\"", - RelationGetRelationName(targetrel->relation), pub->name))); + RelationGetRelationName(targetrel), pub->name))); } - check_publication_add_relation(targetrel->relation); + check_publication_add_relation(targetrel); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -328,6 +367,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + /* Add qualifications, if available */ + if (pri->whereClause != NULL) + values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause)); + else + nulls[Anum_pg_publication_rel_prqual - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -345,6 +390,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectAddressSet(referenced, RelationRelationId, relid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + /* Add dependency on the objects mentioned in the qualifications */ + if (pri->whereClause) + recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid, + DEPENDENCY_NORMAL, DEPENDENCY_NORMAL, + false); + /* Close the table. */ table_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 0e4bb97fb73..16b8661a1b7 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -26,6 +26,7 @@ #include "catalog/partition.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" @@ -36,6 +37,10 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/nodeFuncs.h" +#include "parser/parse_clause.h" +#include "parser/parse_collate.h" +#include "parser/parse_relation.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" @@ -48,6 +53,19 @@ #include "utils/syscache.h" #include "utils/varlena.h" +/* + * Information used to validate the columns in the row filter expression. See + * contain_invalid_rfcolumn_walker for details. + */ +typedef struct rf_context +{ + Bitmapset *bms_replident; /* bitset of replica identity columns */ + bool pubviaroot; /* true if we are validating the parent + * relation's row filter */ + Oid relid; /* relid of the relation */ + Oid parentid; /* relid of the parent relation */ +} rf_context; + static List *OpenRelIdList(List *relids); static List *OpenTableList(List *tables); static void CloseTableList(List *rels); @@ -235,6 +253,362 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, } /* + * Returns true if any of the columns used in the row filter WHERE expression is + * not part of REPLICA IDENTITY, false otherwise. + */ +static bool +contain_invalid_rfcolumn_walker(Node *node, rf_context *context) +{ + if (node == NULL) + return false; + + if (IsA(node, Var)) + { + Var *var = (Var *) node; + AttrNumber attnum = var->varattno; + + /* + * If pubviaroot is true, we are validating the row filter of the + * parent table, but the bitmap contains the replica identity + * information of the child table. So, get the column number of the + * child table as parent and child column order could be different. + */ + if (context->pubviaroot) + { + char *colname = get_attname(context->parentid, attnum, false); + + attnum = get_attnum(context->relid, colname); + } + + if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, + context->bms_replident)) + return true; + } + + return expression_tree_walker(node, contain_invalid_rfcolumn_walker, + (void *) context); +} + +/* + * Check if all columns referenced in the filter expression are part of the + * REPLICA IDENTITY index or not. + * + * Returns true if any invalid column is found. + */ +bool +contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors, + bool pubviaroot) +{ + HeapTuple rftuple; + Oid relid = RelationGetRelid(relation); + Oid publish_as_relid = RelationGetRelid(relation); + bool result = false; + Datum rfdatum; + bool rfisnull; + + /* + * FULL means all columns are in the REPLICA IDENTITY, so all columns are + * allowed in the row filter and we can skip the validation. + */ + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + return false; + + /* + * For a partition, if pubviaroot is true, find the topmost ancestor that + * is published via this publication as we need to use its row filter + * expression to filter the partition's changes. + * + * Note that even though the row filter used is for an ancestor, the + * REPLICA IDENTITY used will be for the actual child table. + */ + if (pubviaroot && relation->rd_rel->relispartition) + { + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors); + + if (!OidIsValid(publish_as_relid)) + publish_as_relid = relid; + } + + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pubid)); + + if (!HeapTupleIsValid(rftuple)) + return false; + + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prqual, + &rfisnull); + + if (!rfisnull) + { + rf_context context = {0}; + Node *rfnode; + Bitmapset *bms = NULL; + + context.pubviaroot = pubviaroot; + context.parentid = publish_as_relid; + context.relid = relid; + + /* Remember columns that are part of the REPLICA IDENTITY */ + bms = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + context.bms_replident = bms; + rfnode = stringToNode(TextDatumGetCString(rfdatum)); + result = contain_invalid_rfcolumn_walker(rfnode, &context); + + bms_free(bms); + pfree(rfnode); + } + + ReleaseSysCache(rftuple); + + return result; +} + +/* check_functions_in_node callback */ +static bool +contain_mutable_or_user_functions_checker(Oid func_id, void *context) +{ + return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE || + func_id >= FirstNormalObjectId); +} + +/* + * Check if the node contains any unallowed object. See + * check_simple_rowfilter_expr_walker. + * + * Returns the error detail message in errdetail_msg for unallowed expressions. + */ +static void +expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg) +{ + if (IsA(node, List)) + { + /* + * OK, we don't need to perform other expr checks for List nodes + * because those are undefined for List. + */ + return; + } + + if (exprType(node) >= FirstNormalObjectId) + *errdetail_msg = _("User-defined types are not allowed."); + else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker, + (void *) pstate)) + *errdetail_msg = _("User-defined or built-in mutable functions are not allowed."); + else if (exprCollation(node) >= FirstNormalObjectId || + exprInputCollation(node) >= FirstNormalObjectId) + *errdetail_msg = _("User-defined collations are not allowed."); +} + +/* + * The row filter walker checks if the row filter expression is a "simple + * expression". + * + * It allows only simple or compound expressions such as: + * - (Var Op Const) + * - (Var Op Var) + * - (Var Op Const) AND/OR (Var Op Const) + * - etc + * (where Var is a column of the table this filter belongs to) + * + * The simple expression has the following restrictions: + * - User-defined operators are not allowed; + * - User-defined functions are not allowed; + * - User-defined types are not allowed; + * - User-defined collations are not allowed; + * - Non-immutable built-in functions are not allowed; + * - System columns are not allowed. + * + * NOTES + * + * We don't allow user-defined functions/operators/types/collations because + * (a) if a user drops a user-defined object used in a row filter expression or + * if there is any other error while using it, the logical decoding + * infrastructure won't be able to recover from such an error even if the + * object is recreated again because a historic snapshot is used to evaluate + * the row filter; + * (b) a user-defined function can be used to access tables that could have + * unpleasant results because a historic snapshot is used. That's why only + * immutable built-in functions are allowed in row filter expressions. + * + * We don't allow system columns because currently, we don't have that + * information in the tuple passed to downstream. Also, as we don't replicate + * those to subscribers, there doesn't seem to be a need for a filter on those + * columns. + * + * We can allow other node types after more analysis and testing. + */ +static bool +check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate) +{ + char *errdetail_msg = NULL; + + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_Var: + /* System columns are not allowed. */ + if (((Var *) node)->varattno < InvalidAttrNumber) + errdetail_msg = _("System columns are not allowed."); + break; + case T_OpExpr: + case T_DistinctExpr: + case T_NullIfExpr: + /* OK, except user-defined operators are not allowed. */ + if (((OpExpr *) node)->opno >= FirstNormalObjectId) + errdetail_msg = _("User-defined operators are not allowed."); + break; + case T_ScalarArrayOpExpr: + /* OK, except user-defined operators are not allowed. */ + if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId) + errdetail_msg = _("User-defined operators are not allowed."); + + /* + * We don't need to check the hashfuncid and negfuncid of + * ScalarArrayOpExpr as those functions are only built for a + * subquery. + */ + break; + case T_RowCompareExpr: + { + ListCell *opid; + + /* OK, except user-defined operators are not allowed. */ + foreach(opid, ((RowCompareExpr *) node)->opnos) + { + if (lfirst_oid(opid) >= FirstNormalObjectId) + { + errdetail_msg = _("User-defined operators are not allowed."); + break; + } + } + } + break; + case T_Const: + case T_FuncExpr: + case T_BoolExpr: + case T_RelabelType: + case T_CollateExpr: + case T_CaseExpr: + case T_CaseTestExpr: + case T_ArrayExpr: + case T_RowExpr: + case T_CoalesceExpr: + case T_MinMaxExpr: + case T_XmlExpr: + case T_NullTest: + case T_BooleanTest: + case T_List: + /* OK, supported */ + break; + default: + errdetail_msg = _("Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations and immutable built-in functions."); + break; + } + + /* + * For all the supported nodes, check the types, functions, and collations + * used in the nodes. + */ + if (!errdetail_msg) + expr_allowed_in_node(node, pstate, &errdetail_msg); + + if (errdetail_msg) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("invalid publication WHERE expression"), + errdetail("%s", errdetail_msg), + parser_errposition(pstate, exprLocation(node)))); + + return expression_tree_walker(node, check_simple_rowfilter_expr_walker, + (void *) pstate); +} + +/* + * Check if the row filter expression is a "simple expression". + * + * See check_simple_rowfilter_expr_walker for details. + */ +static bool +check_simple_rowfilter_expr(Node *node, ParseState *pstate) +{ + return check_simple_rowfilter_expr_walker(node, pstate); +} + +/* + * Transform the publication WHERE expression for all the relations in the list, + * ensuring it is coerced to boolean and necessary collation information is + * added if required, and add a new nsitem/RTE for the associated relation to + * the ParseState's namespace list. + * + * Also check the publication row filter expression and throw an error if + * anything not permitted or unexpected is encountered. + */ +static void +TransformPubWhereClauses(List *tables, const char *queryString, + bool pubviaroot) +{ + ListCell *lc; + + foreach(lc, tables) + { + ParseNamespaceItem *nsitem; + Node *whereclause = NULL; + ParseState *pstate; + PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); + + if (pri->whereClause == NULL) + continue; + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's row filter will be used. So disallow using + * WHERE clause on partitioned table in this case. + */ + if (!pubviaroot && + pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot use publication WHERE clause for relation \"%s\"", + RelationGetRelationName(pri->relation)), + errdetail("WHERE clause cannot be used for a partitioned table when %s is false.", + "publish_via_partition_root"))); + + pstate = make_parsestate(NULL); + pstate->p_sourcetext = queryString; + + nsitem = addRangeTableEntryForRelation(pstate, pri->relation, + AccessShareLock, NULL, + false, false); + + addNSItemToQuery(pstate, nsitem, false, true, true); + + whereclause = transformWhereClause(pstate, + copyObject(pri->whereClause), + EXPR_KIND_WHERE, + "PUBLICATION WHERE"); + + /* Fix up collation information */ + assign_expr_collations(pstate, whereclause); + + /* + * We allow only simple expressions in row filters. See + * check_simple_rowfilter_expr_walker. + */ + check_simple_rowfilter_expr(whereclause, pstate); + + free_parsestate(pstate); + + pri->whereClause = whereclause; + } +} + +/* * Create new publication. */ ObjectAddress @@ -346,6 +720,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) rels = OpenTableList(relations); CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLE); + + TransformPubWhereClauses(rels, pstate->p_sourcetext, + publish_via_partition_root); + PublicationAddTables(puboid, rels, true, NULL); CloseTableList(rels); } @@ -392,6 +770,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, bool publish_via_partition_root; ObjectAddress obj; Form_pg_publication pubform; + List *root_relids = NIL; + ListCell *lc; parse_publication_options(pstate, stmt->options, @@ -399,6 +779,65 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, &publish_via_partition_root_given, &publish_via_partition_root); + pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* + * If the publication doesn't publish changes via the root partitioned + * table, the partition's row filter will be used. So disallow using WHERE + * clause on partitioned table in this case. + */ + if (!pubform->puballtables && publish_via_partition_root_given && + !publish_via_partition_root) + { + /* + * Lock the publication so nobody else can do anything with it. This + * prevents concurrent alter to add partitioned table(s) with WHERE + * clause(s) which we don't allow when not publishing via root. + */ + LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + AccessShareLock); + + root_relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ROOT); + + foreach(lc, root_relids) + { + HeapTuple rftuple; + Oid relid = lfirst_oid(lc); + + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubform->oid)); + + if (HeapTupleIsValid(rftuple) && + !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL)) + { + HeapTuple tuple; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (HeapTupleIsValid(tuple)) + { + Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple); + + if (relform->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set %s for publication \"%s\"", + "publish_via_partition_root = false", + stmt->pubname), + errdetail("The publication contains a WHERE clause for a partitioned table \"%s\" " + "which is not allowed when %s is false.", + NameStr(relform->relname), + "publish_via_partition_root"))); + + ReleaseSysCache(tuple); + } + + ReleaseSysCache(rftuple); + } + } + } + /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -450,8 +889,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + if (root_relids == NIL) + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + else + { + /* + * We already got tables explicitly mentioned in the publication. + * Now get all partitions for the partitioned table in the list. + */ + foreach(lc, root_relids) + relids = GetPubPartitionOptionRelations(relids, + PUBLICATION_PART_ALL, + lfirst_oid(lc)); + } + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, PUBLICATION_PART_ALL); relids = list_concat_unique_oid(relids, schemarelids); @@ -492,7 +944,8 @@ InvalidatePublicationRels(List *relids) */ static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, - List *tables, List *schemaidlist) + List *tables, List *schemaidlist, + const char *queryString) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); @@ -519,6 +972,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); CheckObjSchemaNotAlreadyInPublication(rels, schemas, PUBLICATIONOBJ_TABLE); + + TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + PublicationAddTables(pubid, rels, false, stmt); } else if (stmt->action == AP_DropObjects) @@ -533,37 +989,76 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, PUBLICATIONOBJ_TABLE); - /* Calculate which relations to drop. */ + TransformPubWhereClauses(rels, queryString, pubform->pubviaroot); + + /* + * To recreate the relation list for the publication, look for + * existing relations that do not need to be dropped. + */ foreach(oldlc, oldrelids) { Oid oldrelid = lfirst_oid(oldlc); ListCell *newlc; + PublicationRelInfo *oldrel; bool found = false; + HeapTuple rftuple; + bool rfisnull = true; + Node *oldrelwhereclause = NULL; + + /* look up the cache for the old relmap */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(oldrelid), + ObjectIdGetDatum(pubid)); + + if (HeapTupleIsValid(rftuple)) + { + Datum whereClauseDatum; + + whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prqual, + &rfisnull); + if (!rfisnull) + oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum)); + + ReleaseSysCache(rftuple); + } foreach(newlc, rels) { PublicationRelInfo *newpubrel; newpubrel = (PublicationRelInfo *) lfirst(newlc); + + /* + * Check if any of the new set of relations matches with the + * existing relations in the publication. Additionally, if the + * relation has an associated WHERE clause, check the WHERE + * expressions also match. Drop the rest. + */ if (RelationGetRelid(newpubrel->relation) == oldrelid) { - found = true; - break; + if (equal(oldrelwhereclause, newpubrel->whereClause)) + { + found = true; + break; + } } } - /* Not yet in the list, open it and add to the list */ - if (!found) - { - Relation oldrel; - PublicationRelInfo *pubrel; - /* Wrap relation into PublicationRelInfo */ - oldrel = table_open(oldrelid, ShareUpdateExclusiveLock); + if (oldrelwhereclause) + pfree(oldrelwhereclause); - pubrel = palloc(sizeof(PublicationRelInfo)); - pubrel->relation = oldrel; - - delrels = lappend(delrels, pubrel); + /* + * Add the non-matched relations to a list so that they can be + * dropped. + */ + if (!found) + { + oldrel = palloc(sizeof(PublicationRelInfo)); + oldrel->whereClause = NULL; + oldrel->relation = table_open(oldrelid, + ShareUpdateExclusiveLock); + delrels = lappend(delrels, oldrel); } } @@ -720,12 +1215,15 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) { List *relations = NIL; List *schemaidlist = NIL; + Oid pubid = pubform->oid; ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, &schemaidlist); CheckAlterPublication(stmt, tup, relations, schemaidlist); + heap_freetuple(tup); + /* * Lock the publication so nobody else can do anything with it. This * prevents concurrent alter to add table(s) that were already going @@ -734,22 +1232,24 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) * addition of schema(s) for which there is any corresponding table * being added by this command. */ - LockDatabaseObject(PublicationRelationId, pubform->oid, 0, + LockDatabaseObject(PublicationRelationId, pubid, 0, AccessExclusiveLock); /* * It is possible that by the time we acquire the lock on publication, * concurrent DDL has removed it. We can test this by checking the - * existence of publication. + * existence of publication. We get the tuple again to avoid the risk + * of any publication option getting changed. */ - if (!SearchSysCacheExists1(PUBLICATIONOID, - ObjectIdGetDatum(pubform->oid))) + tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(tup)) ereport(ERROR, errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("publication \"%s\" does not exist", stmt->pubname)); - AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationTables(stmt, tup, relations, schemaidlist, + pstate->p_sourcetext); AlterPublicationSchemas(stmt, tup, schemaidlist); } @@ -901,6 +1401,7 @@ OpenTableList(List *tables) List *relids = NIL; List *rels = NIL; ListCell *lc; + List *relids_with_rf = NIL; /* * Open, share-lock, and check all the explicitly-specified relations @@ -928,15 +1429,26 @@ OpenTableList(List *tables) */ if (list_member_oid(relids, myrelid)) { + /* Disallow duplicate tables if there are any with row filters. */ + if (t->whereClause || list_member_oid(relids_with_rf, myrelid)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant WHERE clauses for table \"%s\"", + RelationGetRelationName(rel)))); + table_close(rel, ShareUpdateExclusiveLock); continue; } pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->whereClause = t->whereClause; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); + if (t->whereClause) + relids_with_rf = lappend_oid(relids_with_rf, myrelid); + /* * Add children of this rel, if requested, so that they too are added * to the publication. A partitioned table can't have any inheritance @@ -963,19 +1475,39 @@ OpenTableList(List *tables) * tables. */ if (list_member_oid(relids, childrelid)) + { + /* + * We don't allow to specify row filter for both parent + * and child table at the same time as it is not very + * clear which one should be given preference. + */ + if (childrelid != myrelid && + (t->whereClause || list_member_oid(relids_with_rf, childrelid))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("conflicting or redundant WHERE clauses for table \"%s\"", + RelationGetRelationName(rel)))); + continue; + } /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + /* child inherits WHERE clause from parent */ + pub_rel->whereClause = t->whereClause; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); + + if (t->whereClause) + relids_with_rf = lappend_oid(relids_with_rf, childrelid); } } } list_free(relids); + list_free(relids_with_rf); return rels; } @@ -995,6 +1527,8 @@ CloseTableList(List *rels) pub_rel = (PublicationRelInfo *) lfirst(lc); table_close(pub_rel->relation, NoLock); } + + list_free_deep(rels); } /* @@ -1090,6 +1624,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) RelationGetRelationName(rel)))); } + if (pubrel->whereClause) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot use a WHERE clause when removing a table from a publication"))); + ObjectAddressSet(obj, PublicationRelRelationId, prid); performDeletion(&obj, DROP_CASCADE, 0); } diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 313c87398b2..de106d767d1 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -567,15 +567,43 @@ ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, void CheckCmdReplicaIdentity(Relation rel, CmdType cmd) { - PublicationActions *pubactions; + PublicationDesc pubdesc; /* We only need to do checks for UPDATE and DELETE. */ if (cmd != CMD_UPDATE && cmd != CMD_DELETE) return; + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + return; + + /* + * It is only safe to execute UPDATE/DELETE when all columns, referenced + * in the row filters from publications which the relation is in, are + * valid - i.e. when all referenced columns are part of REPLICA IDENTITY + * or the table does not publish UPDATEs or DELETEs. + * + * XXX We could optimize it by first checking whether any of the + * publications have a row filter for this relation. If not and relation + * has replica identity then we can avoid building the descriptor but as + * this happens only one time it doesn't seem worth the additional + * complexity. + */ + RelationBuildPublicationDesc(rel, &pubdesc); + if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot update table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot delete from table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Column used in the publication WHERE expression is not part of the replica identity."))); + /* If relation has replica identity we are always good. */ - if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || - OidIsValid(RelationGetReplicaIndex(rel))) + if (OidIsValid(RelationGetReplicaIndex(rel))) return; /* @@ -583,14 +611,13 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) * * Check if the table publishes UPDATES or DELETES. */ - pubactions = GetRelationPublicationActions(rel); - if (cmd == CMD_UPDATE && pubactions->pubupdate) + if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates", RelationGetRelationName(rel)), errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE."))); - else if (cmd == CMD_DELETE && pubactions->pubdelete) + else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index bc0d90b4b1b..d4f8455a2bd 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4849,6 +4849,7 @@ _copyPublicationTable(const PublicationTable *from) PublicationTable *newnode = makeNode(PublicationTable); COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(whereClause); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 2e7122ad2f6..f1002afe7a0 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2321,6 +2321,7 @@ static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(whereClause); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 92f93cfc72d..a03b33b53bd 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9751,12 +9751,13 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr + TABLE relation_expr OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; + $$->pubtable->whereClause = $3; } | ALL TABLES IN_P SCHEMA ColId { @@ -9771,28 +9772,45 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA; $$->location = @5; } - | ColId + | ColId OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - $$->name = $1; + if ($2) + { + /* + * The OptWhereClause must be stored here but it is + * valid only for tables. For non-table objects, an + * error will be thrown later via + * preprocess_pubobj_list(). + */ + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->whereClause = $2; + } + else + { + $$->name = $1; + } $$->location = @1; } - | ColId indirection + | ColId indirection OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->whereClause = $3; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr + | extended_relation_expr OptWhereClause { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; + $$->pubtable->whereClause = $2; } | CURRENT_SCHEMA { @@ -17448,7 +17466,8 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid table name at or near"), parser_errposition(pubobj->location)); - else if (pubobj->name) + + if (pubobj->name) { /* convert it to PublicationTable */ PublicationTable *pubtable = makeNode(PublicationTable); @@ -17462,6 +17481,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA || pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA) { + /* WHERE clause is not allowed on a schema object */ + if (pubobj->pubtable && pubobj->pubtable->whereClause) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("WHERE clause not allowed for schema"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 953942692ce..c9b0eeefd7e 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -31,8 +31,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); - + TupleTableSlot *slot, + bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + TupleTableSlot *newslot, bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newslot, binary); } /* @@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); - if (oldtuple != NULL) + if (oldslot != NULL) { if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldslot, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newslot, binary); } /* @@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, */ void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, bool binary) + TupleTableSlot *oldslot, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldslot, binary); } /* @@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, + bool binary) { TupleDesc desc; - Datum values[MaxTupleAttributeNumber]; - bool isnull[MaxTupleAttributeNumber]; + Datum *values; + bool *isnull; int i; uint16 nliveatts = 0; @@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar } pq_sendint16(out, nliveatts); - /* try to allocate enough memory from the get-go */ - enlargeStringInfo(out, tuple->t_len + - nliveatts * (1 + 4)); - - heap_deform_tuple(tuple, desc, values, isnull); + slot_getallattrs(slot); + values = slot->tts_values; + isnull = slot->tts_isnull; /* Write the values */ for (i = 0; i < desc->natts; i++) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e596b69d466..1659964571c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in the COPY command. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[] = {TEXTOID}; bool isnull; int natt; + ListCell *lc; + bool first; lrel->nspname = nspname; lrel->relname = relname; @@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->natts = natt; walrcv_clear_result(res); + + /* + * Get relation's row filter expressions. DISTINCT avoids the same + * expression of a table in multiple publications from being included + * multiple times in the final expression. + * + * We need to copy the row even if it matches just one of the + * publications, so we later combine all the quals with OR. + * + * For initial synchronization, row filtering can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any row filter + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as ALL TABLES IN + * SCHEMA that includes this relation + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + StringInfoData pub_names; + + /* Build the pubname list. */ + initStringInfo(&pub_names); + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&pub_names, ", "); + + appendStringInfoString(&pub_names, quote_literal_cstr(pubname)); + } + + /* Check for row filters. */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)" + " FROM pg_publication p" + " LEFT OUTER JOIN pg_publication_rel pr" + " ON (p.oid = pr.prpubid AND pr.prrelid = %u)," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + lrel->remoteid, + pub_names.data); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + /* + * Multiple row filter expressions for the same table will be combined + * by COPY using OR. If any of the filter expressions for this table + * are null, it means the whole table will be copied. In this case it + * is not necessary to construct a unified row filter expression at + * all. + */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + else + { + /* Ignore filters and cleanup as necessary. */ + if (*qual) + { + list_free_deep(*qual); + *qual = NIL; + } + break; + } + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + pfree(cmd.data); } @@ -811,6 +907,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -819,7 +916,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -830,14 +927,18 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - if (lrel.relkind == RELKIND_RELATION) + + /* Regular table with no row filter */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL) appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); else { /* - * For non-tables, we need to do COPY (SELECT ...), but we can't just - * do SELECT * because we need to not copy generated columns. + * For non-tables and tables with row filters, we need to do COPY + * (SELECT ...), but we can't just do SELECT * because we need to not + * copy generated columns. For tables with any row filters, build a + * SELECT query with OR'ed row filters for COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) @@ -846,8 +947,33 @@ copy_table(Relation rel) if (i < lrel.natts - 1) appendStringInfoString(&cmd, ", "); } - appendStringInfo(&cmd, " FROM %s) TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + + appendStringInfoString(&cmd, " FROM "); + + /* + * For regular tables, make sure we don't copy data from a child that + * inherits the named table as those will be copied separately. + */ + if (lrel.relkind == RELKIND_RELATION) + appendStringInfoString(&cmd, "ONLY "); + + appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* list of OR'ed filters */ + if (qual != NIL) + { + ListCell *lc; + char *q = strVal(linitial(qual)); + + appendStringInfo(&cmd, " WHERE %s", q); + for_each_from(lc, qual, 1) + { + q = strVal(lfirst(lc)); + appendStringInfo(&cmd, " OR %s", q); + } + list_free_deep(qual); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 4162bb8de7b..ea57a0477f0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,12 +15,17 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel.h" #include "commands/defrem.h" +#include "executor/executor.h" #include "fmgr.h" +#include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -86,6 +91,19 @@ static void send_repl_origin(LogicalDecodingContext *ctx, bool send_origin); /* + * Only 3 publication actions are used for row filtering ("insert", "update", + * "delete"). See RelationSyncEntry.exprstate[]. + */ +enum RowFilterPubAction +{ + PUBACTION_INSERT, + PUBACTION_UPDATE, + PUBACTION_DELETE +}; + +#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1) + +/* * Entry in the map used to remember which relation schemas we sent. * * The schema_sent flag determines if the current schema record for the @@ -117,6 +135,21 @@ typedef struct RelationSyncEntry PublicationActions pubactions; /* + * ExprState array for row filter. Different publication actions don't + * allow multiple expressions to always be combined into one, because + * updates or deletes restrict the column in expression to be part of the + * replica identity index whereas inserts do not have this restriction, so + * there is one ExprState per publication action. + */ + ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS]; + EState *estate; /* executor state used for row filter */ + MemoryContext cache_expr_cxt; /* private context for exprstate and + * estate, if any */ + + TupleTableSlot *new_slot; /* slot for storing new tuple */ + TupleTableSlot *old_slot; /* slot for storing old tuple */ + + /* * OID of the relation to publish changes as. For a partition, this may * be set to one of its ancestors whose schema will be used when * replicating changes, if publish_via_partition_root is set for the @@ -130,7 +163,7 @@ typedef struct RelationSyncEntry * same as 'relid' or if unnecessary due to partition and the ancestor * having identical TupleDesc. */ - TupleConversionMap *map; + AttrMap *attrmap; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -138,7 +171,8 @@ static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, + Relation relation); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -146,6 +180,20 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); +static void init_tuple_slot(PGOutputData *data, Relation relation, + RelationSyncEntry *entry); + +/* row filter routines */ +static EState *create_estate_for_relation(Relation rel); +static void pgoutput_row_filter_init(PGOutputData *data, + List *publications, + RelationSyncEntry *entry); +static bool pgoutput_row_filter_exec_expr(ExprState *state, + ExprContext *econtext); +static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, + TupleTableSlot **new_slot_ptr, + RelationSyncEntry *entry, + ReorderBufferChangeType *action); /* * Specify output plugin callbacks @@ -303,6 +351,10 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, "logical replication output context", ALLOCSET_DEFAULT_SIZES); + data->cachectx = AllocSetContextCreate(ctx->context, + "logical replication cache context", + ALLOCSET_DEFAULT_SIZES); + ctx->output_plugin_private = data; /* This plugin uses binary protocol. */ @@ -543,37 +595,14 @@ maybe_send_schema(LogicalDecodingContext *ctx, return; /* - * Nope, so send the schema. If the changes will be published using an - * ancestor's schema, not the relation's own, send that ancestor's schema - * before sending relation's own (XXX - maybe sending only the former - * suffices?). This is also a good place to set the map that will be used - * to convert the relation's tuples into the ancestor's format, if needed. + * Send the schema. If the changes will be published using an ancestor's + * schema, not the relation's own, send that ancestor's schema before + * sending relation's own (XXX - maybe sending only the former suffices?). */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - TupleDesc indesc = RelationGetDescr(relation); - TupleDesc outdesc = RelationGetDescr(ancestor); - MemoryContext oldctx; - - /* Map must live as long as the session does. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - /* - * Make copies of the TupleDescs that will live as long as the map - * does before putting into the map. - */ - indesc = CreateTupleDescCopy(indesc); - outdesc = CreateTupleDescCopy(outdesc); - relentry->map = convert_tuples_by_name(indesc, outdesc); - if (relentry->map == NULL) - { - /* Map not necessary, so free the TupleDescs too. */ - FreeTupleDesc(indesc); - FreeTupleDesc(outdesc); - } - - MemoryContextSwitchTo(oldctx); send_relation_and_attrs(ancestor, xid, ctx); RelationClose(ancestor); } @@ -625,6 +654,484 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } /* + * Executor state preparation for evaluation of row filter expressions for the + * specified relation. + */ +static EState * +create_estate_for_relation(Relation rel) +{ + EState *estate; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + estate->es_output_cid = GetCurrentCommandId(false); + + return estate; +} + +/* + * Evaluates row filter. + * + * If the row filter evaluates to NULL, it is taken as false i.e. the change + * isn't replicated. + */ +static bool +pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) +{ + Datum ret; + bool isnull; + + Assert(state != NULL); + + ret = ExecEvalExprSwitchContext(state, econtext, &isnull); + + elog(DEBUG3, "row filter evaluates to %s (isnull: %s)", + isnull ? "false" : DatumGetBool(ret) ? "true" : "false", + isnull ? "true" : "false"); + + if (isnull) + return false; + + return DatumGetBool(ret); +} + +/* + * Initialize the row filter. + */ +static void +pgoutput_row_filter_init(PGOutputData *data, List *publications, + RelationSyncEntry *entry) +{ + ListCell *lc; + List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */ + bool no_filter[] = {false, false, false}; /* One per pubaction */ + MemoryContext oldctx; + int idx; + bool has_filter = true; + + /* + * Find if there are any row filters for this relation. If there are, then + * prepare the necessary ExprState and cache it in entry->exprstate. To + * build an expression state, we need to ensure the following: + * + * All the given publication-table mappings must be checked. + * + * Multiple publications might have multiple row filters for this + * relation. Since row filter usage depends on the DML operation, there + * are multiple lists (one for each operation) to which row filters will + * be appended. + * + * FOR ALL TABLES implies "don't use row filter expression" so it takes + * precedence. + */ + foreach(lc, publications) + { + Publication *pub = lfirst(lc); + HeapTuple rftuple = NULL; + Datum rfdatum = 0; + bool pub_no_filter = false; + + if (pub->alltables) + { + /* + * If the publication is FOR ALL TABLES then it is treated the + * same as if this table has no row filters (even if for other + * publications it does). + */ + pub_no_filter = true; + } + else + { + /* + * Check for the presence of a row filter in this publication. + */ + rftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(entry->publish_as_relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(rftuple)) + { + /* Null indicates no filter. */ + rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prqual, + &pub_no_filter); + } + else + { + pub_no_filter = true; + } + } + + if (pub_no_filter) + { + if (rftuple) + ReleaseSysCache(rftuple); + + no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert; + no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate; + no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete; + + /* + * Quick exit if all the DML actions are publicized via this + * publication. + */ + if (no_filter[PUBACTION_INSERT] && + no_filter[PUBACTION_UPDATE] && + no_filter[PUBACTION_DELETE]) + { + has_filter = false; + break; + } + + /* No additional work for this publication. Next one. */ + continue; + } + + /* Form the per pubaction row filter lists. */ + if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT]) + rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT], + TextDatumGetCString(rfdatum)); + if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE]) + rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE], + TextDatumGetCString(rfdatum)); + if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE]) + rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE], + TextDatumGetCString(rfdatum)); + + ReleaseSysCache(rftuple); + } /* loop all subscribed publications */ + + /* Clean the row filter */ + for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) + { + if (no_filter[idx]) + { + list_free_deep(rfnodes[idx]); + rfnodes[idx] = NIL; + } + } + + if (has_filter) + { + Relation relation = RelationIdGetRelation(entry->publish_as_relid); + + Assert(entry->cache_expr_cxt == NULL); + + /* Create the memory context for row filters */ + entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx, + "Row filter expressions", + ALLOCSET_DEFAULT_SIZES); + + MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt, + RelationGetRelationName(relation)); + + /* + * Now all the filters for all pubactions are known. Combine them when + * their pubactions are the same. + */ + oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt); + entry->estate = create_estate_for_relation(relation); + for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++) + { + List *filters = NIL; + Expr *rfnode; + + if (rfnodes[idx] == NIL) + continue; + + foreach(lc, rfnodes[idx]) + filters = lappend(filters, stringToNode((char *) lfirst(lc))); + + /* combine the row filter and cache the ExprState */ + rfnode = make_orclause(filters); + entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate); + } /* for each pubaction */ + MemoryContextSwitchTo(oldctx); + + RelationClose(relation); + } +} + +/* + * Initialize the slot for storing new and old tuples, and build the map that + * will be used to convert the relation's tuples into the ancestor's format. + */ +static void +init_tuple_slot(PGOutputData *data, Relation relation, + RelationSyncEntry *entry) +{ + MemoryContext oldctx; + TupleDesc oldtupdesc; + TupleDesc newtupdesc; + + oldctx = MemoryContextSwitchTo(data->cachectx); + + /* + * Create tuple table slots. Create a copy of the TupleDesc as it needs to + * live as long as the cache remains. + */ + oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); + newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); + + entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple); + entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple); + + MemoryContextSwitchTo(oldctx); + + /* + * Cache the map that will be used to convert the relation's tuples into + * the ancestor's format, if needed. + */ + if (entry->publish_as_relid != RelationGetRelid(relation)) + { + Relation ancestor = RelationIdGetRelation(entry->publish_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc); + + MemoryContextSwitchTo(oldctx); + RelationClose(ancestor); + } +} + +/* + * Change is checked against the row filter if any. + * + * Returns true if the change is to be replicated, else false. + * + * For inserts, evaluate the row filter for new tuple. + * For deletes, evaluate the row filter for old tuple. + * For updates, evaluate the row filter for old and new tuple. + * + * For updates, if both evaluations are true, we allow sending the UPDATE and + * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if + * only one of the tuples matches the row filter expression, we transform + * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the + * following rules: + * + * Case 1: old-row (no match) new-row (no match) -> (drop change) + * Case 2: old-row (no match) new row (match) -> INSERT + * Case 3: old-row (match) new-row (no match) -> DELETE + * Case 4: old-row (match) new row (match) -> UPDATE + * + * The new action is updated in the action parameter. + * + * The new slot could be updated when transforming the UPDATE into INSERT, + * because the original new tuple might not have column values from the replica + * identity. + * + * Examples: + * Let's say the old tuple satisfies the row filter but the new tuple doesn't. + * Since the old tuple satisfies, the initial table synchronization copied this + * row (or another method was used to guarantee that there is data + * consistency). However, after the UPDATE the new tuple doesn't satisfy the + * row filter, so from a data consistency perspective, that row should be + * removed on the subscriber. The UPDATE should be transformed into a DELETE + * statement and be sent to the subscriber. Keeping this row on the subscriber + * is undesirable because it doesn't reflect what was defined in the row filter + * expression on the publisher. This row on the subscriber would likely not be + * modified by replication again. If someone inserted a new row with the same + * old identifier, replication could stop due to a constraint violation. + * + * Let's say the old tuple doesn't match the row filter but the new tuple does. + * Since the old tuple doesn't satisfy, the initial table synchronization + * probably didn't copy this row. However, after the UPDATE the new tuple does + * satisfy the row filter, so from a data consistency perspective, that row + * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE + * statements have no effect (it matches no row -- see + * apply_handle_update_internal()). So, the UPDATE should be transformed into a + * INSERT statement and be sent to the subscriber. However, this might surprise + * someone who expects the data set to satisfy the row filter expression on the + * provider. + */ +static bool +pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, + TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, + ReorderBufferChangeType *action) +{ + TupleDesc desc; + int i; + bool old_matched, + new_matched, + result; + TupleTableSlot *tmp_new_slot; + TupleTableSlot *new_slot = *new_slot_ptr; + ExprContext *ecxt; + ExprState *filter_exprstate; + + /* + * We need this map to avoid relying on ReorderBufferChangeType enums + * having specific values. + */ + static const int map_changetype_pubaction[] = { + [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT, + [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE, + [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE + }; + + Assert(*action == REORDER_BUFFER_CHANGE_INSERT || + *action == REORDER_BUFFER_CHANGE_UPDATE || + *action == REORDER_BUFFER_CHANGE_DELETE); + + Assert(new_slot || old_slot); + + /* Get the corresponding row filter */ + filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]]; + + /* Bail out if there is no row filter */ + if (!filter_exprstate) + return true; + + elog(DEBUG3, "table \"%s.%s\" has row filter", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation)); + + ResetPerTupleExprContext(entry->estate); + + ecxt = GetPerTupleExprContext(entry->estate); + + /* + * For the following occasions where there is only one tuple, we can + * evaluate the row filter for that tuple and return. + * + * For inserts, we only have the new tuple. + * + * For updates, we can have only a new tuple when none of the replica + * identity columns changed but we still need to evaluate the row filter + * for new tuple as the existing values of those columns might not match + * the filter. Also, users can use constant expressions in the row filter, + * so we anyway need to evaluate it for the new tuple. + * + * For deletes, we only have the old tuple. + */ + if (!new_slot || !old_slot) + { + ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot; + result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt); + + return result; + } + + /* + * Both the old and new tuples must be valid only for updates and need to + * be checked against the row filter. + */ + Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE); + + slot_getallattrs(new_slot); + slot_getallattrs(old_slot); + + tmp_new_slot = NULL; + desc = RelationGetDescr(relation); + + /* + * The new tuple might not have all the replica identity columns, in which + * case it needs to be copied over from the old tuple. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + /* + * if the column in the new tuple or old tuple is null, nothing to do + */ + if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i]) + continue; + + /* + * Unchanged toasted replica identity columns are only logged in the + * old tuple. Copy this over to the new tuple. The changed (or WAL + * Logged) toast values are always assembled in memory and set as + * VARTAG_INDIRECT. See ReorderBufferToastReplace. + */ + if (att->attlen == -1 && + VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) && + !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i])) + { + if (!tmp_new_slot) + { + tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual); + ExecClearTuple(tmp_new_slot); + + memcpy(tmp_new_slot->tts_values, new_slot->tts_values, + desc->natts * sizeof(Datum)); + memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull, + desc->natts * sizeof(bool)); + } + + tmp_new_slot->tts_values[i] = old_slot->tts_values[i]; + tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i]; + } + } + + ecxt->ecxt_scantuple = old_slot; + old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt); + + if (tmp_new_slot) + { + ExecStoreVirtualTuple(tmp_new_slot); + ecxt->ecxt_scantuple = tmp_new_slot; + } + else + ecxt->ecxt_scantuple = new_slot; + + new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt); + + /* + * Case 1: if both tuples don't match the row filter, bailout. Send + * nothing. + */ + if (!old_matched && !new_matched) + return false; + + /* + * Case 2: if the old tuple doesn't satisfy the row filter but the new + * tuple does, transform the UPDATE into INSERT. + * + * Use the newly transformed tuple that must contain the column values for + * all the replica identity columns. This is required to ensure that the + * while inserting the tuple in the downstream node, we have all the + * required column values. + */ + if (!old_matched && new_matched) + { + *action = REORDER_BUFFER_CHANGE_INSERT; + + if (tmp_new_slot) + *new_slot_ptr = tmp_new_slot; + } + + /* + * Case 3: if the old tuple satisfies the row filter but the new tuple + * doesn't, transform the UPDATE into DELETE. + * + * This transformation does not require another tuple. The Old tuple will + * be used for DELETE. + */ + else if (old_matched && !new_matched) + *action = REORDER_BUFFER_CHANGE_DELETE; + + /* + * Case 4: if both tuples match the row filter, transformation isn't + * required. (*action is default UPDATE). + */ + + return true; +} + +/* * Sends the decoded DML over wire. * * This is called both in streaming and non-streaming modes. @@ -638,6 +1145,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + Relation targetrel = relation; + ReorderBufferChangeType action = change->action; + TupleTableSlot *old_slot = NULL; + TupleTableSlot *new_slot = NULL; if (!is_publishable_relation(relation)) return; @@ -651,10 +1162,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = change->txn->xid; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ - switch (change->action) + switch (action) { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) @@ -675,80 +1186,149 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, change, relation, relentry); - /* Send the data */ - switch (change->action) + switch (action) { case REORDER_BUFFER_CHANGE_INSERT: - { - HeapTuple tuple = &change->data.tp.newtuple->tuple; + new_slot = relentry->new_slot; + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, + new_slot, false); - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + targetrel = ancestor; + /* Convert tuple if needed. */ + if (relentry->attrmap) { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - relation = ancestor; - /* Convert tuple if needed. */ - if (relentry->map) - tuple = execute_attr_map_tuple(tuple, relentry->map); + TupleDesc tupdesc = RelationGetDescr(targetrel); + + new_slot = execute_attr_map_slot(relentry->attrmap, + new_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); } + } - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); - OutputPluginWrite(ctx, true); + /* Check row filter */ + if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, + &action)) break; - } + + /* + * Schema should be sent using the original relation because it + * also sends the ancestor's relation. + */ + maybe_send_schema(ctx, change, relation, relentry); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, + data->binary); + OutputPluginWrite(ctx, true); + break; case REORDER_BUFFER_CHANGE_UPDATE: + if (change->data.tp.oldtuple) { - HeapTuple oldtuple = change->data.tp.oldtuple ? - &change->data.tp.oldtuple->tuple : NULL; - HeapTuple newtuple = &change->data.tp.newtuple->tuple; + old_slot = relentry->old_slot; + ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, + old_slot, false); + } - /* Switch relation if publishing via root. */ - if (relentry->publish_as_relid != RelationGetRelid(relation)) + new_slot = relentry->new_slot; + ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, + new_slot, false); + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + targetrel = ancestor; + /* Convert tuples if needed. */ + if (relentry->attrmap) { - Assert(relation->rd_rel->relispartition); - ancestor = RelationIdGetRelation(relentry->publish_as_relid); - relation = ancestor; - /* Convert tuples if needed. */ - if (relentry->map) - { - if (oldtuple) - oldtuple = execute_attr_map_tuple(oldtuple, - relentry->map); - newtuple = execute_attr_map_tuple(newtuple, - relentry->map); - } + TupleDesc tupdesc = RelationGetDescr(targetrel); + + if (old_slot) + old_slot = execute_attr_map_slot(relentry->attrmap, + old_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); + + new_slot = execute_attr_map_slot(relentry->attrmap, + new_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); } + } - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); - OutputPluginWrite(ctx, true); + /* Check row filter */ + if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, + relentry, &action)) break; + + maybe_send_schema(ctx, change, relation, relentry); + + OutputPluginPrepareWrite(ctx, true); + + /* + * Updates could be transformed to inserts or deletes based on the + * results of the row filter for old and new tuple. + */ + switch (action) + { + case REORDER_BUFFER_CHANGE_INSERT: + logicalrep_write_insert(ctx->out, xid, targetrel, + new_slot, data->binary); + break; + case REORDER_BUFFER_CHANGE_UPDATE: + logicalrep_write_update(ctx->out, xid, targetrel, + old_slot, new_slot, data->binary); + break; + case REORDER_BUFFER_CHANGE_DELETE: + logicalrep_write_delete(ctx->out, xid, targetrel, + old_slot, data->binary); + break; + default: + Assert(false); } + + OutputPluginWrite(ctx, true); + break; case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { - HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + old_slot = relentry->old_slot; + + ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, + old_slot, false); /* Switch relation if publishing via root. */ if (relentry->publish_as_relid != RelationGetRelid(relation)) { Assert(relation->rd_rel->relispartition); ancestor = RelationIdGetRelation(relentry->publish_as_relid); - relation = ancestor; + targetrel = ancestor; /* Convert tuple if needed. */ - if (relentry->map) - oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + if (relentry->attrmap) + { + TupleDesc tupdesc = RelationGetDescr(targetrel); + + old_slot = execute_attr_map_slot(relentry->attrmap, + old_slot, + MakeTupleTableSlot(tupdesc, &TTSOpsVirtual)); + } } + /* Check row filter */ + if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, + relentry, &action)) + break; + + maybe_send_schema(ctx, change, relation, relentry); + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, xid, relation, oldtuple, - data->binary); + logicalrep_write_delete(ctx->out, xid, targetrel, + old_slot, data->binary); OutputPluginWrite(ctx, true); } else @@ -798,7 +1378,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -873,8 +1453,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, /* * Shutdown the output plugin. * - * Note, we don't need to clean the data->context as it's child context - * of the ctx->context so it will be cleaned up by logical decoding machinery. + * Note, we don't need to clean the data->context and data->cachectx as + * they are child context of the ctx->context so it will be cleaned up by + * logical decoding machinery. */ static void pgoutput_shutdown(LogicalDecodingContext *ctx) @@ -1122,11 +1703,12 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) * when publishing. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation relation) { RelationSyncEntry *entry; bool found; MemoryContext oldctx; + Oid relid = RelationGetRelid(relation); Assert(RelationSyncCache != NULL); @@ -1144,9 +1726,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->new_slot = NULL; + entry->old_slot = NULL; + memset(entry->exprstate, 0, sizeof(entry->exprstate)); + entry->cache_expr_cxt = NULL; entry->publish_as_relid = InvalidOid; - entry->map = NULL; /* will be set by maybe_send_schema() if - * needed */ + entry->attrmap = NULL; } /* Validate the entry */ @@ -1165,6 +1750,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid publish_as_relid = relid; bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); + List *rel_publications = NIL; /* Reload publications if needed before use. */ if (!publications_valid) @@ -1193,17 +1779,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - if (entry->map) - { - /* - * Must free the TupleDescs contained in the map explicitly, - * because free_conversion_map() doesn't. - */ - FreeTupleDesc(entry->map->indesc); - FreeTupleDesc(entry->map->outdesc); - free_conversion_map(entry->map); - } - entry->map = NULL; + + /* + * Tuple slots cleanups. (Will be rebuilt later if needed). + */ + if (entry->old_slot) + ExecDropSingleTupleTableSlot(entry->old_slot); + if (entry->new_slot) + ExecDropSingleTupleTableSlot(entry->new_slot); + + entry->old_slot = NULL; + entry->new_slot = NULL; + + if (entry->attrmap) + free_attrmap(entry->attrmap); + entry->attrmap = NULL; + + /* + * Row filter cache cleanups. + */ + if (entry->cache_expr_cxt) + MemoryContextDelete(entry->cache_expr_cxt); + + entry->cache_expr_cxt = NULL; + entry->estate = NULL; + memset(entry->exprstate, 0, sizeof(entry->exprstate)); /* * Build publication cache. We can't use one provided by relcache as @@ -1234,28 +1834,17 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) */ if (am_partition) { + Oid ancestor; List *ancestors = get_partition_ancestors(relid); - ListCell *lc2; - /* - * Find the "topmost" ancestor that is in this - * publication. - */ - foreach(lc2, ancestors) + ancestor = GetTopMostAncestorInPublication(pub->oid, + ancestors); + + if (ancestor != InvalidOid) { - Oid ancestor = lfirst_oid(lc2); - List *apubids = GetRelationPublications(ancestor); - List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); - - if (list_member_oid(apubids, pub->oid) || - list_member_oid(aschemaPubids, pub->oid)) - { - ancestor_published = true; - if (pub->pubviaroot) - publish_as_relid = ancestor; - } - list_free(apubids); - list_free(aschemaPubids); + ancestor_published = true; + if (pub->pubviaroot) + publish_as_relid = ancestor; } } @@ -1277,17 +1866,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + + rel_publications = lappend(rel_publications, pub); } + } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; + entry->publish_as_relid = publish_as_relid; + + /* + * Initialize the tuple slot, map, and row filter. These are only used + * when publishing inserts, updates, or deletes. + */ + if (entry->pubactions.pubinsert || entry->pubactions.pubupdate || + entry->pubactions.pubdelete) + { + /* Initialize the tuple slot and map */ + init_tuple_slot(data, relation, entry); + + /* Initialize the row filter */ + pgoutput_row_filter_init(data, rel_publications, entry); } list_free(pubids); list_free(schemaPubids); + list_free(rel_publications); - entry->publish_as_relid = publish_as_relid; entry->replicate_valid = true; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2707fed12f4..fccffce5729 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -66,6 +66,7 @@ #include "catalog/schemapg.h" #include "catalog/storage.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -2419,8 +2420,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc) bms_free(relation->rd_pkattr); bms_free(relation->rd_idattr); bms_free(relation->rd_hotblockingattr); - if (relation->rd_pubactions) - pfree(relation->rd_pubactions); + if (relation->rd_pubdesc) + pfree(relation->rd_pubdesc); if (relation->rd_options) pfree(relation->rd_options); if (relation->rd_indextuple) @@ -5523,38 +5524,57 @@ RelationGetExclusionInfo(Relation indexRelation, } /* - * Get publication actions for the given relation. + * Get the publication information for the given relation. + * + * Traverse all the publications which the relation is in to get the + * publication actions and validate the row filter expressions for such + * publications if any. We consider the row filter expression as invalid if it + * references any column which is not part of REPLICA IDENTITY. + * + * To avoid fetching the publication information repeatedly, we cache the + * publication actions and row filter validation information. */ -struct PublicationActions * -GetRelationPublicationActions(Relation relation) +void +RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) { List *puboids; ListCell *lc; MemoryContext oldcxt; Oid schemaid; - PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); + List *ancestors = NIL; + Oid relid = RelationGetRelid(relation); /* * If not publishable, it publishes no actions. (pgoutput_change() will * ignore it.) */ if (!is_publishable_relation(relation)) - return pubactions; + { + memset(pubdesc, 0, sizeof(PublicationDesc)); + pubdesc->rf_valid_for_update = true; + pubdesc->rf_valid_for_delete = true; + return; + } + + if (relation->rd_pubdesc) + { + memcpy(pubdesc, relation->rd_pubdesc, sizeof(PublicationDesc)); + return; + } - if (relation->rd_pubactions) - return memcpy(pubactions, relation->rd_pubactions, - sizeof(PublicationActions)); + memset(pubdesc, 0, sizeof(PublicationDesc)); + pubdesc->rf_valid_for_update = true; + pubdesc->rf_valid_for_delete = true; /* Fetch the publication membership info. */ - puboids = GetRelationPublications(RelationGetRelid(relation)); + puboids = GetRelationPublications(relid); schemaid = RelationGetNamespace(relation); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); if (relation->rd_rel->relispartition) { /* Add publications that the ancestors are in too. */ - List *ancestors = get_partition_ancestors(RelationGetRelid(relation)); - ListCell *lc; + ancestors = get_partition_ancestors(relid); foreach(lc, ancestors) { @@ -5582,35 +5602,53 @@ GetRelationPublicationActions(Relation relation) pubform = (Form_pg_publication) GETSTRUCT(tup); - pubactions->pubinsert |= pubform->pubinsert; - pubactions->pubupdate |= pubform->pubupdate; - pubactions->pubdelete |= pubform->pubdelete; - pubactions->pubtruncate |= pubform->pubtruncate; + pubdesc->pubactions.pubinsert |= pubform->pubinsert; + pubdesc->pubactions.pubupdate |= pubform->pubupdate; + pubdesc->pubactions.pubdelete |= pubform->pubdelete; + pubdesc->pubactions.pubtruncate |= pubform->pubtruncate; + + /* + * Check if all columns referenced in the filter expression are part of + * the REPLICA IDENTITY index or not. + * + * If the publication is FOR ALL TABLES then it means the table has no + * row filters and we can skip the validation. + */ + if (!pubform->puballtables && + (pubform->pubupdate || pubform->pubdelete) && + contain_invalid_rfcolumn(pubid, relation, ancestors, + pubform->pubviaroot)) + { + if (pubform->pubupdate) + pubdesc->rf_valid_for_update = false; + if (pubform->pubdelete) + pubdesc->rf_valid_for_delete = false; + } ReleaseSysCache(tup); /* - * If we know everything is replicated, there is no point to check for - * other publications. + * If we know everything is replicated and the row filter is invalid + * for update and delete, there is no point to check for other + * publications. */ - if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete && pubactions->pubtruncate) + if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate && + pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate && + !pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete) break; } - if (relation->rd_pubactions) + if (relation->rd_pubdesc) { - pfree(relation->rd_pubactions); - relation->rd_pubactions = NULL; + pfree(relation->rd_pubdesc); + relation->rd_pubdesc = NULL; } - /* Now save copy of the actions in the relcache entry. */ + /* Now save copy of the descriptor in the relcache entry. */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - relation->rd_pubactions = palloc(sizeof(PublicationActions)); - memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions)); + relation->rd_pubdesc = palloc(sizeof(PublicationDesc)); + memcpy(relation->rd_pubdesc, pubdesc, sizeof(PublicationDesc)); MemoryContextSwitchTo(oldcxt); - - return pubactions; } /* @@ -6163,7 +6201,7 @@ load_relcache_init_file(bool shared) rel->rd_pkattr = NULL; rel->rd_idattr = NULL; rel->rd_hotblockingattr = NULL; - rel->rd_pubactions = NULL; + rel->rd_pubdesc = NULL; rel->rd_statvalid = false; rel->rd_statlist = NIL; rel->rd_fkeyvalid = false; |
