diff options
| author | Amit Kapila <akapila@postgresql.org> | 2022-02-22 07:54:12 +0530 |
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2022-02-22 08:11:50 +0530 |
| commit | 52e4f0cd472d39d07732b99559989ea3b615be78 (patch) | |
| tree | e40cc7b7690f82c7cfb945fd55afdf55e9bc944f /src/backend/replication/logical/proto.c | |
| parent | ebf6c5249b7db525e59563fb149642665c88f747 (diff) | |
Allow specifying row filters for logical replication of tables.
This feature adds row filtering for publication tables. When a publication
is defined or modified, an optional WHERE clause can be specified. Rows
that don't satisfy this WHERE clause will be filtered out. This allows a
set of tables to be partially replicated. The row filter is per table. A
new row filter can be added simply by specifying a WHERE clause after the
table name. The WHERE clause must be enclosed by parentheses.
The row filter WHERE clause for a table added to a publication that
publishes UPDATE and/or DELETE operations must contain only columns that
are covered by REPLICA IDENTITY. The row filter WHERE clause for a table
added to a publication that publishes INSERT can use any column. If the
row filter evaluates to NULL, it is regarded as "false". The WHERE clause
only allows simple expressions that don't have user-defined functions,
user-defined operators, user-defined types, user-defined collations,
non-immutable built-in functions, or references to system columns. These
restrictions could be addressed in the future.
If you choose to do the initial table synchronization, only data that
satisfies the row filters is copied to the subscriber. If the subscription
has several publications in which a table has been published with
different WHERE clauses, rows that satisfy ANY of the expressions will be
copied. If a subscriber is a pre-15 version, the initial table
synchronization won't use row filters even if they are defined in the
publisher.
The row filters are applied before publishing the changes. If the
subscription has several publications in which the same table has been
published with different filters (for the same publish operation), those
expressions get OR'ed together so that rows satisfying any of the
expressions will be replicated.
This means all the other filters become redundant if (a) one of the
publications have no filter at all, (b) one of the publications was
created using FOR ALL TABLES, (c) one of the publications was created
using FOR ALL TABLES IN SCHEMA and the table belongs to that same schema.
If your publication contains a partitioned table, the publication
parameter publish_via_partition_root determines if it uses the partition's
row filter (if the parameter is false, the default) or the root
partitioned table's row filter.
Psql commands \dRp+ and \d <table-name> will display any row filters.
Author: Hou Zhijie, Euler Taveira, Peter Smith, Ajin Cherian
Reviewed-by: Greg Nancarrow, Haiying Tang, Amit Kapila, Tomas Vondra, Dilip Kumar, Vignesh C, Alvaro Herrera, Andres Freund, Wei Wang
Discussion: https://www.postgresql.org/message-id/flat/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/proto.c')
| -rw-r--r-- | src/backend/replication/logical/proto.c | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 953942692ce..c9b0eeefd7e 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -31,8 +31,8 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); - + TupleTableSlot *slot, + bool binary); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + TupleTableSlot *newslot, bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newslot, binary); } /* @@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + TupleTableSlot *oldslot, TupleTableSlot *newslot, + bool binary) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); - if (oldtuple != NULL) + if (oldslot != NULL) { if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldslot, binary); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newslot, binary); } /* @@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, */ void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, bool binary) + TupleTableSlot *oldslot, bool binary) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldslot, binary); } /* @@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, + bool binary) { TupleDesc desc; - Datum values[MaxTupleAttributeNumber]; - bool isnull[MaxTupleAttributeNumber]; + Datum *values; + bool *isnull; int i; uint16 nliveatts = 0; @@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar } pq_sendint16(out, nliveatts); - /* try to allocate enough memory from the get-go */ - enlargeStringInfo(out, tuple->t_len + - nliveatts * (1 + 4)); - - heap_deform_tuple(tuple, desc, values, isnull); + slot_getallattrs(slot); + values = slot->tts_values; + isnull = slot->tts_isnull; /* Write the values */ for (i = 0; i < desc->natts; i++) |
