From 2c7ea57e56ca5f668c32d4266e0a3e45b455bef5 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 7 Apr 2022 18:13:13 +0200 Subject: Revert "Logical decoding of sequences" This reverts a sequence of commits, implementing features related to logical decoding and replication of sequences: - 0da92dc530c9251735fc70b20cd004d9630a1266 - 80901b32913ffa59bf157a4d88284b2b3a7511d9 - b779d7d8fdae088d70da5ed9fcd8205035676df3 - d5ed9da41d96988d905b49bebb273a9b2d6e2915 - a180c2b34de0989269fdb819bff241a249bf5380 - 75b1521dae1ff1fde17fda2e30e591f2e5d64b6a - 2d2232933b02d9396113662e44dca5f120d6830e - 002c9dd97a0c874fd1693a570383e2dd38cd40d5 - 05843b1aa49df2ecc9b97c693b755bd1b6f856a9 The implementation has issues, mostly due to combining transactional and non-transactional behavior of sequences. It's not clear how this could be fixed, but it'll require reworking significant part of the patch. Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com --- src/backend/commands/subscriptioncmds.c | 101 ++++---------------------------- 1 file changed, 13 insertions(+), 88 deletions(-) (limited to 'src/backend/commands/subscriptioncmds.c') diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 057ab4b6a3f..2e8d8afead8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -90,7 +90,6 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); -static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -639,9 +638,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *relations; + List *tables; ListCell *lc; - char sync_state; + char table_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -658,17 +657,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* - * Get the table and sequence list from publisher and build - * local relation sync status info. + * Get the table list from publisher and build local table status + * info. */ - relations = fetch_table_list(wrconn, publications); - relations = list_concat(relations, - fetch_sequence_list(wrconn, publications)); - - foreach(lc, relations) + tables = fetch_table_list(wrconn, publications); + foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; @@ -679,7 +675,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - AddSubscriptionRelState(subid, relid, sync_state, + AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -705,12 +701,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * * Note that if tables were specified but copy_data is false * then it is safe to enable two_phase up-front because those - * relations are already initially in READY state. When the - * subscription has no relations, we leave the twophase state - * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH + * tables are already initially in READY state. When the + * subscription has no tables, we leave the twophase state as + * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH * PUBLICATION to work. */ - if (opts.twophase && !opts.copy_data && relations != NIL) + if (opts.twophase && !opts.copy_data && tables != NIL) twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, @@ -786,10 +782,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); - /* Get the list of relations from publisher. */ + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); - pubrel_names = list_concat(pubrel_names, - fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid); @@ -1813,75 +1807,6 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } -/* - * Get the list of sequences which belong to specified publications on the - * publisher connection. - */ -static List * -fetch_sequence_list(WalReceiverConn *wrconn, List *publications) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; - List *tablelist = NIL; - - Assert(list_length(publications) > 0); - - initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" - " FROM pg_catalog.pg_publication_sequences s\n" - " WHERE s.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } - appendStringInfoChar(&cmd, ')'); - - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); - pfree(cmd.data); - - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not receive list of replicated sequences from the publisher: %s", - res->err))); - - /* Process sequences. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - char *nspname; - char *relname; - bool isnull; - RangeVar *rv; - - nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); - Assert(!isnull); - relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); - Assert(!isnull); - - rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); - - ExecClearTuple(slot); - } - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); - - return tablelist; -} - /* * This is to report the connection failure while dropping replication slots. * Here, we report the WARNING for all tablesync slots so that user can drop -- cgit v1.2.3