summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
authorTomas Vondra <tomas.vondra@postgresql.org>2022-03-26 00:45:21 +0100
committerTomas Vondra <tomas.vondra@postgresql.org>2022-03-26 01:01:27 +0100
commit923def9a533a7d986acfb524139d8b9e5466d0a5 (patch)
treeb6ce8d5bfe8d932e3cc89e52aba68519558e8033 /src/backend/replication/logical/tablesync.c
parent05843b1aa49df2ecc9b97c693b755bd1b6f856a9 (diff)
Allow specifying column lists for logical replication
This allows specifying an optional column list when adding a table to logical replication. The column list may be specified after the table name, enclosed in parentheses. Columns not included in this list are not sent to the subscriber, allowing the schema on the subscriber to be a subset of the publisher schema. For UPDATE/DELETE publications, the column list needs to cover all REPLICA IDENTITY columns. For INSERT publications, the column list is arbitrary and may omit some REPLICA IDENTITY columns. Furthermore, if the table uses REPLICA IDENTITY FULL, column list is not allowed. The column list can contain only simple column references. Complex expressions, function calls etc. are not allowed. This restriction could be relaxed in the future. During the initial table synchronization, only columns included in the column list are copied to the subscriber. If the subscription has several publications, containing the same table with different column lists, columns specified in any of the lists will be copied. This means all columns are replicated if the table has no column list at all (which is treated as column list with all columns), or when of the publications is defined as FOR ALL TABLES (possibly IN SCHEMA that matches the schema of the table). For partitioned tables, publish_via_partition_root determines whether the column list for the root or the leaf relation will be used. If the parameter is 'false' (the default), the list defined for the leaf relation is used. Otherwise, the column list for the root partition will be used. Psql commands \dRp+ and \d <table-name> now display any column lists. Author: Tomas Vondra, Alvaro Herrera, Rahila Syed Reviewed-by: Peter Eisentraut, Alvaro Herrera, Vignesh C, Ibrar Ahmed, Amit Kapila, Hou zj, Peter Smith, Wang wei, Tang, Shi yu Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c153
1 files changed, 145 insertions, 8 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d8b12d94bc3..697fb23634c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -113,6 +113,7 @@
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -702,12 +703,13 @@ fetch_remote_table_info(char *nspname, char *relname,
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
ListCell *lc;
bool first;
+ Bitmapset *included_cols = NULL;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -748,10 +750,110 @@ fetch_remote_table_info(char *nspname, char *relname,
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
- /* Now fetch columns. */
+
+ /*
+ * Get column lists for each relation.
+ *
+ * For initial synchronization, column lists can be ignored in following
+ * cases:
+ *
+ * 1) one of the subscribed publications for the table hasn't specified
+ * any column list
+ *
+ * 2) one of the subscribed publications has puballtables set to true
+ *
+ * 3) one of the subscribed publications is declared as ALL TABLES IN
+ * SCHEMA that includes this relation
+ *
+ * We need to do this before fetching info about column names and types,
+ * so that we can skip columns that should not be replicated.
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ WalRcvExecResult *pubres;
+ TupleTableSlot *slot;
+ Oid attrsRow[] = {INT2OID};
+ StringInfoData pub_names;
+ bool first = true;
+
+ initStringInfo(&pub_names);
+ foreach(lc, MySubscription->publications)
+ {
+ if (!first)
+ appendStringInfo(&pub_names, ", ");
+ appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc))));
+ first = false;
+ }
+
+ /*
+ * Fetch info about column lists for the relation (from all the
+ * publications). We unnest the int2vector values, because that
+ * makes it easier to combine lists by simply adding the attnums
+ * to a new bitmap (without having to parse the int2vector data).
+ * This preserves NULL values, so that if one of the publications
+ * has no column list, we'll know that.
+ */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT unnest"
+ " FROM pg_publication p"
+ " LEFT OUTER JOIN pg_publication_rel pr"
+ " ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
+ " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt"
+ " WHERE gpt.relid = %u"
+ " AND p.pubname IN ( %s )",
+ lrel->remoteid,
+ lrel->remoteid,
+ pub_names.data);
+
+ pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+ lengthof(attrsRow), attrsRow);
+
+ if (pubres->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, pubres->err)));
+
+ /*
+ * Merge the column lists (from different publications) by creating
+ * a single bitmap with all the attnums. If we find a NULL value,
+ * that means one of the publications has no column list for the
+ * table we're syncing.
+ */
+ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+ {
+ Datum cfval = slot_getattr(slot, 1, &isnull);
+
+ /* NULL means empty column list, so we're done. */
+ if (isnull)
+ {
+ bms_free(included_cols);
+ included_cols = NULL;
+ break;
+ }
+
+ included_cols = bms_add_member(included_cols,
+ DatumGetInt16(cfval));
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(pubres);
+
+ pfree(pub_names.data);
+ }
+
+ /*
+ * Now fetch column names and types.
+ */
resetStringInfo(&cmd);
appendStringInfo(&cmd,
- "SELECT a.attname,"
+ "SELECT a.attnum,"
+ " a.attname,"
" a.atttypid,"
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
@@ -779,16 +881,35 @@ fetch_remote_table_info(char *nspname, char *relname,
lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
lrel->attkeys = NULL;
+ /*
+ * Store the columns as a list of names. Ignore those that are not
+ * present in the column list, if there is one.
+ */
natt = 0;
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
{
- lrel->attnames[natt] =
- TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ char *rel_colname;
+ AttrNumber attnum;
+
+ attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
- lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+ /* If the column is not in the column list, skip it. */
+ if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+ {
+ ExecClearTuple(slot);
+ continue;
+ }
+
+ rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ lrel->attnames[natt] = rel_colname;
+ lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
Assert(!isnull);
- if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+ if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
/* Should never happen. */
@@ -931,8 +1052,24 @@ copy_table(Relation rel)
/* Regular table with no row filter */
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
- appendStringInfo(&cmd, "COPY %s TO STDOUT",
+ {
+ appendStringInfo(&cmd, "COPY %s (",
quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're replicating
+ * all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
+
+ appendStringInfo(&cmd, ") TO STDOUT");
+ }
else
{
/*