/*------------------------------------------------------------------------- * sequencesync.c * PostgreSQL logical replication: sequence synchronization * * Copyright (c) 2025, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/sequencesync.c * * NOTES * This file contains code for sequence synchronization for * logical replication. * * Sequences requiring synchronization are tracked in the pg_subscription_rel * catalog. * * Sequences to be synchronized will be added with state INIT when either of * the following commands is executed: * CREATE SUBSCRIPTION * ALTER SUBSCRIPTION ... REFRESH PUBLICATION * * Executing the following command resets all sequences in the subscription to * state INIT, triggering re-synchronization: * ALTER SUBSCRIPTION ... REFRESH SEQUENCES * * The apply worker periodically scans pg_subscription_rel for sequences in * INIT state. When such sequences are found, it spawns a sequencesync worker * to handle synchronization. * * A single sequencesync worker is responsible for synchronizing all sequences. * It begins by retrieving the list of sequences that are flagged for * synchronization, i.e., those in the INIT state. These sequences are then * processed in batches, allowing multiple entries to be synchronized within a * single transaction. The worker fetches the current sequence values and page * LSNs from the remote publisher, updates the corresponding sequences on the * local subscriber, and finally marks each sequence as READY upon successful * synchronization. * * Sequence state transitions follow this pattern: * INIT -> READY * * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH * sequences are synchronized per transaction. The locks on the sequence * relation will be periodically released at each transaction commit. * * XXX: We didn't choose launcher process to maintain the launch of sequencesync * worker as it didn't have database connection to access the sequences from the * pg_subscription_rel system catalog that need to be synchronized. *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/table.h" #include "catalog/pg_sequence.h" #include "catalog/pg_subscription_rel.h" #include "commands/sequence.h" #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logicalworker.h" #include "replication/worker_internal.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/syscache.h" #include "utils/usercontext.h" #define REMOTE_SEQ_COL_COUNT 10 typedef enum CopySeqResult { COPYSEQ_SUCCESS, COPYSEQ_MISMATCH, COPYSEQ_INSUFFICIENT_PERM, COPYSEQ_SKIPPED } CopySeqResult; static List *seqinfos = NIL; /* * Apply worker determines if sequence synchronization is needed. * * Start a sequencesync worker if one is not already running. The active * sequencesync worker will handle all pending sequence synchronization. If any * sequences remain unsynchronized after it exits, a new worker can be started * in the next iteration. */ void ProcessSequencesForSync(void) { LogicalRepWorker *sequencesync_worker; int nsyncworkers; bool has_pending_sequences; bool started_tx; FetchRelationStates(NULL, &has_pending_sequences, &started_tx); if (started_tx) { CommitTransactionCommand(); pgstat_report_stat(true); } if (!has_pending_sequences) return; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); /* Check if there is a sequencesync worker already running? */ sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC, MyLogicalRepWorker->subid, InvalidOid, true); if (sequencesync_worker) { LWLockRelease(LogicalRepWorkerLock); return; } /* * Count running sync workers for this subscription, while we have the * lock. */ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); /* * It is okay to read/update last_seqsync_start_time here in apply worker * as we have already ensured that sync worker doesn't exist. */ launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid, &MyLogicalRepWorker->last_seqsync_start_time); } /* * get_sequences_string * * Build a comma-separated string of schema-qualified sequence names * for the given list of sequence indexes. */ static void get_sequences_string(List *seqindexes, StringInfo buf) { resetStringInfo(buf); foreach_int(seqidx, seqindexes) { LogicalRepSequenceInfo *seqinfo = (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx); if (buf->len > 0) appendStringInfoString(buf, ", "); appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname); } } /* * report_sequence_errors * * Report discrepancies found during sequence synchronization between * the publisher and subscriber. Emits warnings for: * a) mismatched definitions or concurrent rename * b) insufficient privileges * c) missing sequences on the subscriber * Then raises an ERROR to indicate synchronization failure. */ static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx) { StringInfo seqstr; /* Quick exit if there are no errors to report */ if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx) return; seqstr = makeStringInfo(); if (mismatched_seqs_idx) { get_sequences_string(mismatched_seqs_idx, seqstr); ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg_plural("mismatched or renamed sequence on subscriber (%s)", "mismatched or renamed sequences on subscriber (%s)", list_length(mismatched_seqs_idx), seqstr->data)); } if (insuffperm_seqs_idx) { get_sequences_string(insuffperm_seqs_idx, seqstr); ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg_plural("insufficient privileges on sequence (%s)", "insufficient privileges on sequences (%s)", list_length(insuffperm_seqs_idx), seqstr->data)); } if (missing_seqs_idx) { get_sequences_string(missing_seqs_idx, seqstr); ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg_plural("missing sequence on publisher (%s)", "missing sequences on publisher (%s)", list_length(missing_seqs_idx), seqstr->data)); } ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical replication sequence synchronization failed for subscription \"%s\"", MySubscription->name)); } /* * get_and_validate_seq_info * * Extracts remote sequence information from the tuple slot received from the * publisher, and validates it against the corresponding local sequence * definition. */ static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx) { bool isnull; int col = 0; Oid remote_typid; int64 remote_start; int64 remote_increment; int64 remote_min; int64 remote_max; bool remote_cycle; CopySeqResult result = COPYSEQ_SUCCESS; HeapTuple tup; Form_pg_sequence local_seq; LogicalRepSequenceInfo *seqinfo_local; *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); /* Identify the corresponding local sequence for the given index. */ *seqinfo = seqinfo_local = (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); Assert(!isnull); /* Sanity check */ Assert(col == REMOTE_SEQ_COL_COUNT); seqinfo_local->found_on_pub = true; *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); /* Sequence was concurrently dropped? */ if (!*sequence_rel) return COPYSEQ_SKIPPED; tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid)); /* Sequence was concurrently dropped? */ if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for sequence %u", seqinfo_local->localrelid); local_seq = (Form_pg_sequence) GETSTRUCT(tup); /* Sequence parameters for remote/local are the same? */ if (local_seq->seqtypid != remote_typid || local_seq->seqstart != remote_start || local_seq->seqincrement != remote_increment || local_seq->seqmin != remote_min || local_seq->seqmax != remote_max || local_seq->seqcycle != remote_cycle) result = COPYSEQ_MISMATCH; /* Sequence was concurrently renamed? */ if (strcmp(seqinfo_local->nspname, get_namespace_name(RelationGetNamespace(*sequence_rel))) || strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel))) result = COPYSEQ_MISMATCH; ReleaseSysCache(tup); return result; } /* * Apply remote sequence state to local sequence and mark it as * synchronized (READY). */ static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) { UserContext ucxt; AclResult aclresult; bool run_as_owner = MySubscription->runasowner; Oid seqoid = seqinfo->localrelid; /* * If the user did not opt to run as the owner of the subscription * ('run_as_owner'), then copy the sequence as the owner of the sequence. */ if (!run_as_owner) SwitchToUntrustedUser(seqowner, &ucxt); aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE); if (aclresult != ACLCHECK_OK) { if (!run_as_owner) RestoreUserContext(&ucxt); return COPYSEQ_INSUFFICIENT_PERM; } /* * The log counter (log_cnt) tracks how many sequence values are still * unused locally. It is only relevant to the local node and managed * internally by nextval() when allocating new ranges. Since log_cnt does * not affect the visible sequence state (like last_value or is_called) * and is only used for local caching, it need not be copied to the * subscriber during synchronization. */ SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called); if (!run_as_owner) RestoreUserContext(&ucxt); /* * Record the remote sequence's LSN in pg_subscription_rel and mark the * sequence as READY. */ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, seqinfo->page_lsn, false); return COPYSEQ_SUCCESS; } /* * Copy existing data of sequences from the publisher. */ static void copy_sequences(WalReceiverConn *conn) { int cur_batch_base_index = 0; int n_seqinfos = list_length(seqinfos); List *mismatched_seqs_idx = NIL; List *missing_seqs_idx = NIL; List *insuffperm_seqs_idx = NIL; StringInfo seqstr = makeStringInfo(); StringInfo cmd = makeStringInfo(); MemoryContext oldctx; #define MAX_SEQUENCES_SYNC_PER_BATCH 100 elog(DEBUG1, "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", MySubscription->name, n_seqinfos); while (cur_batch_base_index < n_seqinfos) { Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; int batch_size = 0; int batch_succeeded_count = 0; int batch_mismatched_count = 0; int batch_skipped_count = 0; int batch_insuffperm_count = 0; int batch_missing_count; Relation sequence_rel; WalRcvExecResult *res; TupleTableSlot *slot; StartTransactionCommand(); for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) { char *nspname_literal; char *seqname_literal; LogicalRepSequenceInfo *seqinfo = (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); if (seqstr->len > 0) appendStringInfoString(seqstr, ", "); nspname_literal = quote_literal_cstr(seqinfo->nspname); seqname_literal = quote_literal_cstr(seqinfo->seqname); appendStringInfo(seqstr, "(%s, %s, %d)", nspname_literal, seqname_literal, idx); if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH) break; } /* * We deliberately avoid acquiring a local lock on the sequence before * querying the publisher to prevent potential distributed deadlocks * in bi-directional replication setups. * * Example scenario: * * - On each node, a background worker acquires a lock on a sequence * as part of a sync operation. * * - Concurrently, a user transaction attempts to alter the same * sequence, waiting on the background worker's lock. * * - Meanwhile, a query from the other node tries to access metadata * that depends on the completion of the alter operation. * * - This creates a circular wait across nodes: * * Node-1: Query -> waits on Alter -> waits on Sync Worker * * Node-2: Query -> waits on Alter -> waits on Sync Worker * * Since each node only sees part of the wait graph, the deadlock may * go undetected, leading to indefinite blocking. * * Note: Each entry in VALUES includes an index 'seqidx' that * represents the sequence's position in the local 'seqinfos' list. * This index is propagated to the query results and later used to * directly map the fetched publisher sequence rows back to their * corresponding local entries without relying on result order or name * matching. */ appendStringInfo(cmd, "SELECT s.seqidx, ps.*, seq.seqtypid,\n" " seq.seqstart, seq.seqincrement, seq.seqmin,\n" " seq.seqmax, seq.seqcycle\n" "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" "JOIN pg_namespace n ON n.nspname = s.schname\n" "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n", seqstr->data); res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not fetch sequence information from the publisher: %s", res->err)); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { CopySeqResult sync_status; LogicalRepSequenceInfo *seqinfo; int seqidx; CHECK_FOR_INTERRUPTS(); if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } sync_status = get_and_validate_seq_info(slot, &sequence_rel, &seqinfo, &seqidx); if (sync_status == COPYSEQ_SUCCESS) sync_status = copy_sequence(seqinfo, sequence_rel->rd_rel->relowner); switch (sync_status) { case COPYSEQ_SUCCESS: elog(DEBUG1, "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", MySubscription->name, seqinfo->nspname, seqinfo->seqname); batch_succeeded_count++; break; case COPYSEQ_MISMATCH: /* * Remember mismatched sequences in a long-lived memory * context since these will be used after the transaction * is committed. */ oldctx = MemoryContextSwitchTo(ApplyContext); mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, seqidx); MemoryContextSwitchTo(oldctx); batch_mismatched_count++; break; case COPYSEQ_INSUFFICIENT_PERM: /* * Remember sequences with insufficient privileges in a * long-lived memory context since these will be used * after the transaction is committed. */ oldctx = MemoryContextSwitchTo(ApplyContext); insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx, seqidx); MemoryContextSwitchTo(oldctx); batch_insuffperm_count++; break; case COPYSEQ_SKIPPED: ereport(LOG, errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently", seqinfo->nspname, seqinfo->seqname)); batch_skipped_count++; break; } if (sequence_rel) table_close(sequence_rel, NoLock); } ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); resetStringInfo(seqstr); resetStringInfo(cmd); batch_missing_count = batch_size - (batch_succeeded_count + batch_mismatched_count + batch_insuffperm_count + batch_skipped_count); elog(DEBUG1, "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped", MySubscription->name, (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, batch_succeeded_count, batch_mismatched_count, batch_insuffperm_count, batch_missing_count, batch_skipped_count); /* Commit this batch, and prepare for next batch */ CommitTransactionCommand(); if (batch_missing_count) { for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++) { LogicalRepSequenceInfo *seqinfo = (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); /* If the sequence was not found on publisher, record it */ if (!seqinfo->found_on_pub) missing_seqs_idx = lappend_int(missing_seqs_idx, idx); } } /* * cur_batch_base_index is not incremented sequentially because some * sequences may be missing, and the number of fetched rows may not * match the batch size. */ cur_batch_base_index += batch_size; } /* Report mismatches, permission issues, or missing sequences */ report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, missing_seqs_idx); } /* * Identifies sequences that require synchronization and initiates the * synchronization process. */ static void LogicalRepSyncSequences(void) { char *err; bool must_use_password; Relation rel; HeapTuple tup; ScanKeyData skey[2]; SysScanDesc scan; Oid subid = MyLogicalRepWorker->subid; StringInfoData app_name; StartTransactionCommand(); rel = table_open(SubscriptionRelRelationId, AccessShareLock); ScanKeyInit(&skey[0], Anum_pg_subscription_rel_srsubid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(subid)); ScanKeyInit(&skey[1], Anum_pg_subscription_rel_srsubstate, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(SUBREL_STATE_INIT)); scan = systable_beginscan(rel, InvalidOid, false, NULL, 2, skey); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_subscription_rel subrel; LogicalRepSequenceInfo *seq; Relation sequence_rel; MemoryContext oldctx; CHECK_FOR_INTERRUPTS(); subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); /* Skip if sequence was dropped concurrently */ if (!sequence_rel) continue; /* Skip if the relation is not a sequence */ if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE) { table_close(sequence_rel, NoLock); continue; } /* * Worker needs to process sequences across transaction boundary, so * allocate them under long-lived context. */ oldctx = MemoryContextSwitchTo(ApplyContext); seq = palloc0_object(LogicalRepSequenceInfo); seq->localrelid = subrel->srrelid; seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); seq->seqname = pstrdup(RelationGetRelationName(sequence_rel)); seqinfos = lappend(seqinfos, seq); MemoryContextSwitchTo(oldctx); table_close(sequence_rel, NoLock); } /* Cleanup */ systable_endscan(scan); table_close(rel, AccessShareLock); CommitTransactionCommand(); /* * Exit early if no catalog entries found, likely due to concurrent drops. */ if (!seqinfos) return; /* Is the use of a password mandatory? */ must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; initStringInfo(&app_name); appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT, MySubscription->oid, GetSystemIdentifier()); /* * Establish the connection to the publisher for sequence synchronization. */ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, true, must_use_password, app_name.data, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, errcode(ERRCODE_CONNECTION_FAILURE), errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", MySubscription->name, err)); pfree(app_name.data); copy_sequences(LogRepWorkerWalRcvConn); } /* * Execute the initial sync with error handling. Disable the subscription, * if required. * * Note that we don't handle FATAL errors which are probably because of system * resource error and are not repeatable. */ static void start_sequence_sync() { Assert(am_sequencesync_worker()); PG_TRY(); { /* Call initial sync. */ LogicalRepSyncSequences(); } PG_CATCH(); { if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else { /* * Report the worker failed during sequence synchronization. Abort * the current transaction so that the stats message is sent in an * idle state. */ AbortOutOfAnyTransaction(); pgstat_report_subscription_error(MySubscription->oid, WORKERTYPE_SEQUENCESYNC); PG_RE_THROW(); } } PG_END_TRY(); } /* Logical Replication sequencesync worker entry point */ void SequenceSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); SetupApplyOrSyncWorker(worker_slot); start_sequence_sync(); FinishSyncWorker(); }