diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 994c7a09d92..670c6fcada5 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -753,17 +753,6 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Get column lists for each relation. * - * For initial synchronization, column lists can be ignored in following - * cases: - * - * 1) one of the subscribed publications for the table hasn't specified - * any column list - * - * 2) one of the subscribed publications has puballtables set to true - * - * 3) one of the subscribed publications is declared as ALL TABLES IN - * SCHEMA that includes this relation - * * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ @@ -771,7 +760,7 @@ fetch_remote_table_info(char *nspname, char *relname, { WalRcvExecResult *pubres; TupleTableSlot *slot; - Oid attrsRow[] = {INT2OID}; + Oid attrsRow[] = {INT2VECTOROID}; StringInfoData pub_names; bool first = true; @@ -786,19 +775,17 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Fetch info about column lists for the relation (from all the - * publications). We unnest the int2vector values, because that makes - * it easier to combine lists by simply adding the attnums to a new - * bitmap (without having to parse the int2vector data). This - * preserves NULL values, so that if one of the publications has no - * column list, we'll know that. + * publications). */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT DISTINCT unnest" + "SELECT DISTINCT" + " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" + " THEN NULL ELSE gpt.attrs END)" " FROM pg_publication p," - " LATERAL pg_get_publication_tables(p.pubname) gpt" - " LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE" - " WHERE gpt.relid = %u" + " LATERAL pg_get_publication_tables(p.pubname) gpt," + " pg_class c" + " WHERE gpt.relid = %u AND c.oid = gpt.relid" " AND p.pubname IN ( %s )", lrel->remoteid, pub_names.data); @@ -813,26 +800,43 @@ fetch_remote_table_info(char *nspname, char *relname, nspname, relname, pubres->err))); /* - * Merge the column lists (from different publications) by creating a - * single bitmap with all the attnums. If we find a NULL value, that - * means one of the publications has no column list for the table - * we're syncing. + * We don't support the case where the column list is different for + * the same table when combining publications. See comments atop + * fetch_table_list. So there should be only one row returned. + * Although we already checked this when creating the subscription, we + * still need to check here in case the column list was changed after + * creating the subscription and before the sync worker is started. + */ + if (tuplestore_tuple_count(pubres->tuplestore) > 1) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + + /* + * Get the column list and build a single bitmap with the attnums. + * + * If we find a NULL value, it means all the columns should be + * replicated. */ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) { Datum cfval = slot_getattr(slot, 1, &isnull); - /* NULL means empty column list, so we're done. */ - if (isnull) + if (!isnull) { - bms_free(included_cols); - included_cols = NULL; - break; - } + ArrayType *arr; + int nelems; + int16 *elems; - included_cols = bms_add_member(included_cols, - DatumGetInt16(cfval)); + arr = DatumGetArrayTypeP(cfval); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (natt = 0; natt < nelems; natt++) + included_cols = bms_add_member(included_cols, elems[natt]); + } ExecClearTuple(slot); } |