diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 153 |
1 files changed, 145 insertions, 8 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d8b12d94bc3..697fb23634c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -113,6 +113,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "utils/acl.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -702,12 +703,13 @@ fetch_remote_table_info(char *nspname, char *relname, StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; ListCell *lc; bool first; + Bitmapset *included_cols = NULL; lrel->nspname = nspname; lrel->relname = relname; @@ -748,10 +750,110 @@ fetch_remote_table_info(char *nspname, char *relname, ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - /* Now fetch columns. */ + + /* + * Get column lists for each relation. + * + * For initial synchronization, column lists can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any column list + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as ALL TABLES IN + * SCHEMA that includes this relation + * + * We need to do this before fetching info about column names and types, + * so that we can skip columns that should not be replicated. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + WalRcvExecResult *pubres; + TupleTableSlot *slot; + Oid attrsRow[] = {INT2OID}; + StringInfoData pub_names; + bool first = true; + + initStringInfo(&pub_names); + foreach(lc, MySubscription->publications) + { + if (!first) + appendStringInfo(&pub_names, ", "); + appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc)))); + first = false; + } + + /* + * Fetch info about column lists for the relation (from all the + * publications). We unnest the int2vector values, because that + * makes it easier to combine lists by simply adding the attnums + * to a new bitmap (without having to parse the int2vector data). + * This preserves NULL values, so that if one of the publications + * has no column list, we'll know that. + */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT unnest" + " FROM pg_publication p" + " LEFT OUTER JOIN pg_publication_rel pr" + " ON (p.oid = pr.prpubid AND pr.prrelid = %u)" + " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + lrel->remoteid, + pub_names.data); + + pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrsRow), attrsRow); + + if (pubres->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s", + nspname, relname, pubres->err))); + + /* + * Merge the column lists (from different publications) by creating + * a single bitmap with all the attnums. If we find a NULL value, + * that means one of the publications has no column list for the + * table we're syncing. + */ + slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + { + Datum cfval = slot_getattr(slot, 1, &isnull); + + /* NULL means empty column list, so we're done. */ + if (isnull) + { + bms_free(included_cols); + included_cols = NULL; + break; + } + + included_cols = bms_add_member(included_cols, + DatumGetInt16(cfval)); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(pubres); + + pfree(pub_names.data); + } + + /* + * Now fetch column names and types. + */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT a.attname," + "SELECT a.attnum," + " a.attname," " a.atttypid," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" @@ -779,16 +881,35 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + /* + * Store the columns as a list of names. Ignore those that are not + * present in the column list, if there is one. + */ natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + + /* If the column is not in the column list, skip it. */ + if (included_cols != NULL && !bms_is_member(attnum, included_cols)) + { + ExecClearTuple(slot); + continue; + } + + rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + lrel->attnames[natt] = rel_colname; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 3, &isnull))) + + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ @@ -931,8 +1052,24 @@ copy_table(Relation rel) /* Regular table with no row filter */ if (lrel.relkind == RELKIND_RELATION && qual == NIL) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + + /* + * XXX Do we need to list the columns in all cases? Maybe we're replicating + * all columns? + */ + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* |