diff options
Diffstat (limited to 'src/backend/replication/logical/sequencesync.c')
| -rw-r--r-- | src/backend/replication/logical/sequencesync.c | 745 |
1 files changed, 745 insertions, 0 deletions
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 00000000000..717c82328f2 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,745 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences requiring synchronization are tracked in the pg_subscription_rel + * catalog. + * + * Sequences to be synchronized will be added with state INIT when either of + * the following commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * + * Executing the following command resets all sequences in the subscription to + * state INIT, triggering re-synchronization: + * ALTER SUBSCRIPTION ... REFRESH SEQUENCES + * + * The apply worker periodically scans pg_subscription_rel for sequences in + * INIT state. When such sequences are found, it spawns a sequencesync worker + * to handle synchronization. + * + * A single sequencesync worker is responsible for synchronizing all sequences. + * It begins by retrieving the list of sequences that are flagged for + * synchronization, i.e., those in the INIT state. These sequences are then + * processed in batches, allowing multiple entries to be synchronized within a + * single transaction. The worker fetches the current sequence values and page + * LSNs from the remote publisher, updates the corresponding sequences on the + * local subscriber, and finally marks each sequence as READY upon successful + * synchronization. + * + * Sequence state transitions follow this pattern: + * INIT -> READY + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: We didn't choose launcher process to maintain the launch of sequencesync + * worker as it didn't have database connection to access the sequences from the + * pg_subscription_rel system catalog that need to be synchronized. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +#define REMOTE_SEQ_COL_COUNT 10 + +typedef enum CopySeqResult +{ + COPYSEQ_SUCCESS, + COPYSEQ_MISMATCH, + COPYSEQ_INSUFFICIENT_PERM, + COPYSEQ_SKIPPED +} CopySeqResult; + +static List *seqinfos = NIL; + +/* + * Apply worker determines if sequence synchronization is needed. + * + * Start a sequencesync worker if one is not already running. The active + * sequencesync worker will handle all pending sequence synchronization. If any + * sequences remain unsynchronized after it exits, a new worker can be started + * in the next iteration. + */ +void +ProcessSequencesForSync(void) +{ + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + bool has_pending_sequences; + bool started_tx; + + FetchRelationStates(NULL, &has_pending_sequences, &started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + if (!has_pending_sequences) + return; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if there is a sequencesync worker already running? */ + sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->subid, + InvalidOid, true); + if (sequencesync_worker) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + /* + * It is okay to read/update last_seqsync_start_time here in apply worker + * as we have already ensured that sync worker doesn't exist. + */ + launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid, + &MyLogicalRepWorker->last_seqsync_start_time); +} + +/* + * get_sequences_string + * + * Build a comma-separated string of schema-qualified sequence names + * for the given list of sequence indexes. + */ +static void +get_sequences_string(List *seqindexes, StringInfo buf) +{ + resetStringInfo(buf); + foreach_int(seqidx, seqindexes) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx); + + if (buf->len > 0) + appendStringInfoString(buf, ", "); + + appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname); + } +} + +/* + * report_sequence_errors + * + * Report discrepancies found during sequence synchronization between + * the publisher and subscriber. Emits warnings for: + * a) mismatched definitions or concurrent rename + * b) insufficient privileges + * c) missing sequences on the subscriber + * Then raises an ERROR to indicate synchronization failure. + */ +static void +report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, + List *missing_seqs_idx) +{ + StringInfo seqstr; + + /* Quick exit if there are no errors to report */ + if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx) + return; + + seqstr = makeStringInfo(); + + if (mismatched_seqs_idx) + { + get_sequences_string(mismatched_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("mismatched or renamed sequence on subscriber (%s)", + "mismatched or renamed sequences on subscriber (%s)", + list_length(mismatched_seqs_idx), + seqstr->data)); + } + + if (insuffperm_seqs_idx) + { + get_sequences_string(insuffperm_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("insufficient privileges on sequence (%s)", + "insufficient privileges on sequences (%s)", + list_length(insuffperm_seqs_idx), + seqstr->data)); + } + + if (missing_seqs_idx) + { + get_sequences_string(missing_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("missing sequence on publisher (%s)", + "missing sequences on publisher (%s)", + list_length(missing_seqs_idx), + seqstr->data)); + } + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", + MySubscription->name)); +} + +/* + * get_and_validate_seq_info + * + * Extracts remote sequence information from the tuple slot received from the + * publisher, and validates it against the corresponding local sequence + * definition. + */ +static CopySeqResult +get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, + LogicalRepSequenceInfo **seqinfo, int *seqidx) +{ + bool isnull; + int col = 0; + Oid remote_typid; + int64 remote_start; + int64 remote_increment; + int64 remote_min; + int64 remote_max; + bool remote_cycle; + CopySeqResult result = COPYSEQ_SUCCESS; + HeapTuple tup; + Form_pg_sequence local_seq; + LogicalRepSequenceInfo *seqinfo_local; + + *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Identify the corresponding local sequence for the given index. */ + *seqinfo = seqinfo_local = + (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); + + seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + seqinfo_local->found_on_pub = true; + + *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); + + /* Sequence was concurrently dropped? */ + if (!*sequence_rel) + return COPYSEQ_SKIPPED; + + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid)); + + /* Sequence was concurrently dropped? */ + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for sequence %u", + seqinfo_local->localrelid); + + local_seq = (Form_pg_sequence) GETSTRUCT(tup); + + /* Sequence parameters for remote/local are the same? */ + if (local_seq->seqtypid != remote_typid || + local_seq->seqstart != remote_start || + local_seq->seqincrement != remote_increment || + local_seq->seqmin != remote_min || + local_seq->seqmax != remote_max || + local_seq->seqcycle != remote_cycle) + result = COPYSEQ_MISMATCH; + + /* Sequence was concurrently renamed? */ + if (strcmp(seqinfo_local->nspname, + get_namespace_name(RelationGetNamespace(*sequence_rel))) || + strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel))) + result = COPYSEQ_MISMATCH; + + ReleaseSysCache(tup); + return result; +} + +/* + * Apply remote sequence state to local sequence and mark it as + * synchronized (READY). + */ +static CopySeqResult +copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) +{ + UserContext ucxt; + AclResult aclresult; + bool run_as_owner = MySubscription->runasowner; + Oid seqoid = seqinfo->localrelid; + + /* + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the sequence as the owner of the sequence. + */ + if (!run_as_owner) + SwitchToUntrustedUser(seqowner, &ucxt); + + aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE); + + if (aclresult != ACLCHECK_OK) + { + if (!run_as_owner) + RestoreUserContext(&ucxt); + + return COPYSEQ_INSUFFICIENT_PERM; + } + + /* + * The log counter (log_cnt) tracks how many sequence values are still + * unused locally. It is only relevant to the local node and managed + * internally by nextval() when allocating new ranges. Since log_cnt does + * not affect the visible sequence state (like last_value or is_called) + * and is only used for local caching, it need not be copied to the + * subscriber during synchronization. + */ + SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called); + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + /* + * Record the remote sequence's LSN in pg_subscription_rel and mark the + * sequence as READY. + */ + UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, + seqinfo->page_lsn, false); + + return COPYSEQ_SUCCESS; +} + +/* + * Copy existing data of sequences from the publisher. + */ +static void +copy_sequences(WalReceiverConn *conn) +{ + int cur_batch_base_index = 0; + int n_seqinfos = list_length(seqinfos); + List *mismatched_seqs_idx = NIL; + List *missing_seqs_idx = NIL; + List *insuffperm_seqs_idx = NIL; + StringInfo seqstr = makeStringInfo(); + StringInfo cmd = makeStringInfo(); + MemoryContext oldctx; + +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + elog(DEBUG1, + "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", + MySubscription->name, n_seqinfos); + + while (cur_batch_base_index < n_seqinfos) + { + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, + BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + int batch_size = 0; + int batch_succeeded_count = 0; + int batch_mismatched_count = 0; + int batch_skipped_count = 0; + int batch_insuffperm_count = 0; + int batch_missing_count; + Relation sequence_rel; + + WalRcvExecResult *res; + TupleTableSlot *slot; + + StartTransactionCommand(); + + for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + if (seqstr->len > 0) + appendStringInfoString(seqstr, ", "); + + appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)", + seqinfo->nspname, seqinfo->seqname, idx); + + if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH) + break; + } + + /* + * We deliberately avoid acquiring a local lock on the sequence before + * querying the publisher to prevent potential distributed deadlocks + * in bi-directional replication setups. + * + * Example scenario: + * + * - On each node, a background worker acquires a lock on a sequence + * as part of a sync operation. + * + * - Concurrently, a user transaction attempts to alter the same + * sequence, waiting on the background worker's lock. + * + * - Meanwhile, a query from the other node tries to access metadata + * that depends on the completion of the alter operation. + * + * - This creates a circular wait across nodes: + * + * Node-1: Query -> waits on Alter -> waits on Sync Worker + * + * Node-2: Query -> waits on Alter -> waits on Sync Worker + * + * Since each node only sees part of the wait graph, the deadlock may + * go undetected, leading to indefinite blocking. + * + * Note: Each entry in VALUES includes an index 'seqidx' that + * represents the sequence's position in the local 'seqinfos' list. + * This index is propagated to the query results and later used to + * directly map the fetched publisher sequence rows back to their + * corresponding local entries without relying on result order or name + * matching. + */ + appendStringInfo(cmd, + "SELECT s.seqidx, ps.*, seq.seqtypid,\n" + " seq.seqstart, seq.seqincrement, seq.seqmin,\n" + " seq.seqmax, seq.seqcycle\n" + "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" + "JOIN pg_namespace n ON n.nspname = s.schname\n" + "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n", + seqstr->data); + + res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch sequence information from the publisher: %s", + res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + CopySeqResult sync_status; + LogicalRepSequenceInfo *seqinfo; + int seqidx; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + sync_status = get_and_validate_seq_info(slot, &sequence_rel, + &seqinfo, &seqidx); + if (sync_status == COPYSEQ_SUCCESS) + sync_status = copy_sequence(seqinfo, + sequence_rel->rd_rel->relowner); + + switch (sync_status) + { + case COPYSEQ_SUCCESS: + elog(DEBUG1, + "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, seqinfo->nspname, + seqinfo->seqname); + batch_succeeded_count++; + break; + case COPYSEQ_MISMATCH: + + /* + * Remember mismatched sequences in a long-lived memory + * context since these will be used after the transaction + * is committed. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_mismatched_count++; + break; + case COPYSEQ_INSUFFICIENT_PERM: + + /* + * Remember sequences with insufficient privileges in a + * long-lived memory context since these will be used + * after the transaction is committed. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_insuffperm_count++; + break; + case COPYSEQ_SKIPPED: + ereport(LOG, + errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently", + seqinfo->nspname, + seqinfo->seqname)); + batch_skipped_count++; + break; + } + + if (sequence_rel) + table_close(sequence_rel, NoLock); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + resetStringInfo(seqstr); + resetStringInfo(cmd); + + batch_missing_count = batch_size - (batch_succeeded_count + + batch_mismatched_count + + batch_insuffperm_count + + batch_skipped_count); + + elog(DEBUG1, + "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped", + MySubscription->name, + (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, + batch_size, batch_succeeded_count, batch_mismatched_count, + batch_insuffperm_count, batch_missing_count, batch_skipped_count); + + /* Commit this batch, and prepare for next batch */ + CommitTransactionCommand(); + + if (batch_missing_count) + { + for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + /* If the sequence was not found on publisher, record it */ + if (!seqinfo->found_on_pub) + missing_seqs_idx = lappend_int(missing_seqs_idx, idx); + } + } + + /* + * cur_batch_base_index is not incremented sequentially because some + * sequences may be missing, and the number of fetched rows may not + * match the batch size. + */ + cur_batch_base_index += batch_size; + } + + /* Report mismatches, permission issues, or missing sequences */ + report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, + missing_seqs_idx); +} + +/* + * Identifies sequences that require synchronization and initiates the + * synchronization process. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + Relation rel; + HeapTuple tup; + ScanKeyData skey[2]; + SysScanDesc scan; + Oid subid = MyLogicalRepWorker->subid; + StringInfoData app_name; + + StartTransactionCommand(); + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_INIT)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 2, skey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + LogicalRepSequenceInfo *seq; + Relation sequence_rel; + MemoryContext oldctx; + + CHECK_FOR_INTERRUPTS(); + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); + + /* Skip if sequence was dropped concurrently */ + if (!sequence_rel) + continue; + + /* Skip if the relation is not a sequence */ + if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE) + { + table_close(sequence_rel, NoLock); + continue; + } + + /* + * Worker needs to process sequences across transaction boundary, so + * allocate them under long-lived context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + + seq = palloc0_object(LogicalRepSequenceInfo); + seq->localrelid = subrel->srrelid; + seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + seq->seqname = pstrdup(RelationGetRelationName(sequence_rel)); + seqinfos = lappend(seqinfos, seq); + + MemoryContextSwitchTo(oldctx); + + table_close(sequence_rel, NoLock); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + /* + * Exit early if no catalog entries found, likely due to concurrent drops. + */ + if (!seqinfos) + return; + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT, + MySubscription->oid, GetSystemIdentifier()); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", + MySubscription->name, err)); + + pfree(app_name.data); + + copy_sequences(LogRepWorkerWalRcvConn); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Note that we don't handle FATAL errors which are probably because of system + * resource error and are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + FinishSyncWorker(); +} |
