summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c72
1 files changed, 38 insertions, 34 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 994c7a09d92..670c6fcada5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -753,17 +753,6 @@ fetch_remote_table_info(char *nspname, char *relname,
/*
* Get column lists for each relation.
*
- * For initial synchronization, column lists can be ignored in following
- * cases:
- *
- * 1) one of the subscribed publications for the table hasn't specified
- * any column list
- *
- * 2) one of the subscribed publications has puballtables set to true
- *
- * 3) one of the subscribed publications is declared as ALL TABLES IN
- * SCHEMA that includes this relation
- *
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
@@ -771,7 +760,7 @@ fetch_remote_table_info(char *nspname, char *relname,
{
WalRcvExecResult *pubres;
TupleTableSlot *slot;
- Oid attrsRow[] = {INT2OID};
+ Oid attrsRow[] = {INT2VECTOROID};
StringInfoData pub_names;
bool first = true;
@@ -786,19 +775,17 @@ fetch_remote_table_info(char *nspname, char *relname,
/*
* Fetch info about column lists for the relation (from all the
- * publications). We unnest the int2vector values, because that makes
- * it easier to combine lists by simply adding the attnums to a new
- * bitmap (without having to parse the int2vector data). This
- * preserves NULL values, so that if one of the publications has no
- * column list, we'll know that.
+ * publications).
*/
resetStringInfo(&cmd);
appendStringInfo(&cmd,
- "SELECT DISTINCT unnest"
+ "SELECT DISTINCT"
+ " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
+ " THEN NULL ELSE gpt.attrs END)"
" FROM pg_publication p,"
- " LATERAL pg_get_publication_tables(p.pubname) gpt"
- " LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
- " WHERE gpt.relid = %u"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt,"
+ " pg_class c"
+ " WHERE gpt.relid = %u AND c.oid = gpt.relid"
" AND p.pubname IN ( %s )",
lrel->remoteid,
pub_names.data);
@@ -813,26 +800,43 @@ fetch_remote_table_info(char *nspname, char *relname,
nspname, relname, pubres->err)));
/*
- * Merge the column lists (from different publications) by creating a
- * single bitmap with all the attnums. If we find a NULL value, that
- * means one of the publications has no column list for the table
- * we're syncing.
+ * We don't support the case where the column list is different for
+ * the same table when combining publications. See comments atop
+ * fetch_table_list. So there should be only one row returned.
+ * Although we already checked this when creating the subscription, we
+ * still need to check here in case the column list was changed after
+ * creating the subscription and before the sync worker is started.
+ */
+ if (tuplestore_tuple_count(pubres->tuplestore) > 1)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+ nspname, relname));
+
+ /*
+ * Get the column list and build a single bitmap with the attnums.
+ *
+ * If we find a NULL value, it means all the columns should be
+ * replicated.
*/
slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
- while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+ if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
{
Datum cfval = slot_getattr(slot, 1, &isnull);
- /* NULL means empty column list, so we're done. */
- if (isnull)
+ if (!isnull)
{
- bms_free(included_cols);
- included_cols = NULL;
- break;
- }
+ ArrayType *arr;
+ int nelems;
+ int16 *elems;
- included_cols = bms_add_member(included_cols,
- DatumGetInt16(cfval));
+ arr = DatumGetArrayTypeP(cfval);
+ nelems = ARR_DIMS(arr)[0];
+ elems = (int16 *) ARR_DATA_PTR(arr);
+
+ for (natt = 0; natt < nelems; natt++)
+ included_cols = bms_add_member(included_cols, elems[natt]);
+ }
ExecClearTuple(slot);
}