diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 142 |
1 files changed, 134 insertions, 8 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e596b69d466..1659964571c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. This function also returns the relation + * qualifications to be used in the COPY command. */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[] = {TEXTOID}; bool isnull; int natt; + ListCell *lc; + bool first; lrel->nspname = nspname; lrel->relname = relname; @@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->natts = natt; walrcv_clear_result(res); + + /* + * Get relation's row filter expressions. DISTINCT avoids the same + * expression of a table in multiple publications from being included + * multiple times in the final expression. + * + * We need to copy the row even if it matches just one of the + * publications, so we later combine all the quals with OR. + * + * For initial synchronization, row filtering can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any row filter + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as ALL TABLES IN + * SCHEMA that includes this relation + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + StringInfoData pub_names; + + /* Build the pubname list. */ + initStringInfo(&pub_names); + first = true; + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&pub_names, ", "); + + appendStringInfoString(&pub_names, quote_literal_cstr(pubname)); + } + + /* Check for row filters. */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)" + " FROM pg_publication p" + " LEFT OUTER JOIN pg_publication_rel pr" + " ON (p.oid = pr.prpubid AND pr.prrelid = %u)," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + lrel->remoteid, + pub_names.data); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + /* + * Multiple row filter expressions for the same table will be combined + * by COPY using OR. If any of the filter expressions for this table + * are null, it means the whole table will be copied. In this case it + * is not necessary to construct a unified row filter expression at + * all. + */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + else + { + /* Ignore filters and cleanup as necessary. */ + if (*qual) + { + list_free_deep(*qual); + *qual = NIL; + } + break; + } + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + pfree(cmd.data); } @@ -811,6 +907,7 @@ copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; + List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; @@ -819,7 +916,7 @@ copy_table(Relation rel) /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -830,14 +927,18 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - if (lrel.relkind == RELKIND_RELATION) + + /* Regular table with no row filter */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL) appendStringInfo(&cmd, "COPY %s TO STDOUT", quote_qualified_identifier(lrel.nspname, lrel.relname)); else { /* - * For non-tables, we need to do COPY (SELECT ...), but we can't just - * do SELECT * because we need to not copy generated columns. + * For non-tables and tables with row filters, we need to do COPY + * (SELECT ...), but we can't just do SELECT * because we need to not + * copy generated columns. For tables with any row filters, build a + * SELECT query with OR'ed row filters for COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) @@ -846,8 +947,33 @@ copy_table(Relation rel) if (i < lrel.natts - 1) appendStringInfoString(&cmd, ", "); } - appendStringInfo(&cmd, " FROM %s) TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + + appendStringInfoString(&cmd, " FROM "); + + /* + * For regular tables, make sure we don't copy data from a child that + * inherits the named table as those will be copied separately. + */ + if (lrel.relkind == RELKIND_RELATION) + appendStringInfoString(&cmd, "ONLY "); + + appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* list of OR'ed filters */ + if (qual != NIL) + { + ListCell *lc; + char *q = strVal(linitial(qual)); + + appendStringInfo(&cmd, " WHERE %s", q); + for_each_from(lc, qual, 1) + { + q = strVal(lfirst(lc)); + appendStringInfo(&cmd, " OR %s", q); + } + list_free_deep(qual); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); |