diff options
Diffstat (limited to 'src/backend/replication/logical')
| -rw-r--r-- | src/backend/replication/logical/Makefile | 1 | ||||
| -rw-r--r-- | src/backend/replication/logical/launcher.c | 57 | ||||
| -rw-r--r-- | src/backend/replication/logical/meson.build | 1 | ||||
| -rw-r--r-- | src/backend/replication/logical/sequencesync.c | 745 | ||||
| -rw-r--r-- | src/backend/replication/logical/syncutils.c | 139 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 77 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 76 |
7 files changed, 997 insertions, 99 deletions
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c62c8c67521..c719af1f8a9 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -26,6 +26,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ syncutils.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 95b5cae9a55..6c6d4015ba7 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -248,9 +248,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given worker type, * subscription id, and relation id. * - * For apply workers, the relid should be set to InvalidOid, as they manage - * changes across all tables. For table sync workers, the relid should be set - * to the OID of the relation being synchronized. + * For both apply workers and sequencesync workers, the relid should be set to + * InvalidOid, as these workers handle changes across all tables and sequences + * respectively, rather than targeting a specific relation. For tablesync + * workers, the relid should be set to the OID of the relation being + * synchronized. */ LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, @@ -334,6 +336,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -422,7 +425,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -478,6 +482,7 @@ retry: TIMESTAMP_NOBEGIN(worker->last_recv_time); worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); + worker->last_seqsync_start_time = 0; /* Before releasing lock, remember generation for future identification. */ generation = worker->generation; @@ -511,8 +516,16 @@ retry: memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -849,6 +862,33 @@ logicalrep_launcher_onexit(int code, Datum arg) } /* + * Reset the last_seqsync_start_time of the sequencesync worker in the + * subscription's apply worker. + * + * Note that this value is not stored in the sequencesync worker, because that + * has finished already and is about to exit. + */ +void +logicalrep_reset_seqsync_start_time(void) +{ + LogicalRepWorker *worker; + + /* + * The apply worker can't access last_seqsync_start_time concurrently, so + * it is okay to use SHARED lock here. See ProcessSequencesForSync(). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + true); + if (worker) + worker->last_seqsync_start_time = 0; + + LWLockRelease(LogicalRepWorkerLock); +} + +/* * Cleanup function. * * Called on logical replication worker exit. @@ -896,7 +936,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1610,7 +1650,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1650,6 +1690,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 9283e996ef4..a2268d8361e 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -12,6 +12,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'syncutils.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 00000000000..717c82328f2 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,745 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences requiring synchronization are tracked in the pg_subscription_rel + * catalog. + * + * Sequences to be synchronized will be added with state INIT when either of + * the following commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * + * Executing the following command resets all sequences in the subscription to + * state INIT, triggering re-synchronization: + * ALTER SUBSCRIPTION ... REFRESH SEQUENCES + * + * The apply worker periodically scans pg_subscription_rel for sequences in + * INIT state. When such sequences are found, it spawns a sequencesync worker + * to handle synchronization. + * + * A single sequencesync worker is responsible for synchronizing all sequences. + * It begins by retrieving the list of sequences that are flagged for + * synchronization, i.e., those in the INIT state. These sequences are then + * processed in batches, allowing multiple entries to be synchronized within a + * single transaction. The worker fetches the current sequence values and page + * LSNs from the remote publisher, updates the corresponding sequences on the + * local subscriber, and finally marks each sequence as READY upon successful + * synchronization. + * + * Sequence state transitions follow this pattern: + * INIT -> READY + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: We didn't choose launcher process to maintain the launch of sequencesync + * worker as it didn't have database connection to access the sequences from the + * pg_subscription_rel system catalog that need to be synchronized. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +#define REMOTE_SEQ_COL_COUNT 10 + +typedef enum CopySeqResult +{ + COPYSEQ_SUCCESS, + COPYSEQ_MISMATCH, + COPYSEQ_INSUFFICIENT_PERM, + COPYSEQ_SKIPPED +} CopySeqResult; + +static List *seqinfos = NIL; + +/* + * Apply worker determines if sequence synchronization is needed. + * + * Start a sequencesync worker if one is not already running. The active + * sequencesync worker will handle all pending sequence synchronization. If any + * sequences remain unsynchronized after it exits, a new worker can be started + * in the next iteration. + */ +void +ProcessSequencesForSync(void) +{ + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + bool has_pending_sequences; + bool started_tx; + + FetchRelationStates(NULL, &has_pending_sequences, &started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + if (!has_pending_sequences) + return; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if there is a sequencesync worker already running? */ + sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->subid, + InvalidOid, true); + if (sequencesync_worker) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + /* + * It is okay to read/update last_seqsync_start_time here in apply worker + * as we have already ensured that sync worker doesn't exist. + */ + launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid, + &MyLogicalRepWorker->last_seqsync_start_time); +} + +/* + * get_sequences_string + * + * Build a comma-separated string of schema-qualified sequence names + * for the given list of sequence indexes. + */ +static void +get_sequences_string(List *seqindexes, StringInfo buf) +{ + resetStringInfo(buf); + foreach_int(seqidx, seqindexes) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx); + + if (buf->len > 0) + appendStringInfoString(buf, ", "); + + appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname); + } +} + +/* + * report_sequence_errors + * + * Report discrepancies found during sequence synchronization between + * the publisher and subscriber. Emits warnings for: + * a) mismatched definitions or concurrent rename + * b) insufficient privileges + * c) missing sequences on the subscriber + * Then raises an ERROR to indicate synchronization failure. + */ +static void +report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, + List *missing_seqs_idx) +{ + StringInfo seqstr; + + /* Quick exit if there are no errors to report */ + if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx) + return; + + seqstr = makeStringInfo(); + + if (mismatched_seqs_idx) + { + get_sequences_string(mismatched_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("mismatched or renamed sequence on subscriber (%s)", + "mismatched or renamed sequences on subscriber (%s)", + list_length(mismatched_seqs_idx), + seqstr->data)); + } + + if (insuffperm_seqs_idx) + { + get_sequences_string(insuffperm_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("insufficient privileges on sequence (%s)", + "insufficient privileges on sequences (%s)", + list_length(insuffperm_seqs_idx), + seqstr->data)); + } + + if (missing_seqs_idx) + { + get_sequences_string(missing_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("missing sequence on publisher (%s)", + "missing sequences on publisher (%s)", + list_length(missing_seqs_idx), + seqstr->data)); + } + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", + MySubscription->name)); +} + +/* + * get_and_validate_seq_info + * + * Extracts remote sequence information from the tuple slot received from the + * publisher, and validates it against the corresponding local sequence + * definition. + */ +static CopySeqResult +get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, + LogicalRepSequenceInfo **seqinfo, int *seqidx) +{ + bool isnull; + int col = 0; + Oid remote_typid; + int64 remote_start; + int64 remote_increment; + int64 remote_min; + int64 remote_max; + bool remote_cycle; + CopySeqResult result = COPYSEQ_SUCCESS; + HeapTuple tup; + Form_pg_sequence local_seq; + LogicalRepSequenceInfo *seqinfo_local; + + *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Identify the corresponding local sequence for the given index. */ + *seqinfo = seqinfo_local = + (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); + + seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + seqinfo_local->found_on_pub = true; + + *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); + + /* Sequence was concurrently dropped? */ + if (!*sequence_rel) + return COPYSEQ_SKIPPED; + + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid)); + + /* Sequence was concurrently dropped? */ + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for sequence %u", + seqinfo_local->localrelid); + + local_seq = (Form_pg_sequence) GETSTRUCT(tup); + + /* Sequence parameters for remote/local are the same? */ + if (local_seq->seqtypid != remote_typid || + local_seq->seqstart != remote_start || + local_seq->seqincrement != remote_increment || + local_seq->seqmin != remote_min || + local_seq->seqmax != remote_max || + local_seq->seqcycle != remote_cycle) + result = COPYSEQ_MISMATCH; + + /* Sequence was concurrently renamed? */ + if (strcmp(seqinfo_local->nspname, + get_namespace_name(RelationGetNamespace(*sequence_rel))) || + strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel))) + result = COPYSEQ_MISMATCH; + + ReleaseSysCache(tup); + return result; +} + +/* + * Apply remote sequence state to local sequence and mark it as + * synchronized (READY). + */ +static CopySeqResult +copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) +{ + UserContext ucxt; + AclResult aclresult; + bool run_as_owner = MySubscription->runasowner; + Oid seqoid = seqinfo->localrelid; + + /* + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the sequence as the owner of the sequence. + */ + if (!run_as_owner) + SwitchToUntrustedUser(seqowner, &ucxt); + + aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE); + + if (aclresult != ACLCHECK_OK) + { + if (!run_as_owner) + RestoreUserContext(&ucxt); + + return COPYSEQ_INSUFFICIENT_PERM; + } + + /* + * The log counter (log_cnt) tracks how many sequence values are still + * unused locally. It is only relevant to the local node and managed + * internally by nextval() when allocating new ranges. Since log_cnt does + * not affect the visible sequence state (like last_value or is_called) + * and is only used for local caching, it need not be copied to the + * subscriber during synchronization. + */ + SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called); + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + /* + * Record the remote sequence's LSN in pg_subscription_rel and mark the + * sequence as READY. + */ + UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, + seqinfo->page_lsn, false); + + return COPYSEQ_SUCCESS; +} + +/* + * Copy existing data of sequences from the publisher. + */ +static void +copy_sequences(WalReceiverConn *conn) +{ + int cur_batch_base_index = 0; + int n_seqinfos = list_length(seqinfos); + List *mismatched_seqs_idx = NIL; + List *missing_seqs_idx = NIL; + List *insuffperm_seqs_idx = NIL; + StringInfo seqstr = makeStringInfo(); + StringInfo cmd = makeStringInfo(); + MemoryContext oldctx; + +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + elog(DEBUG1, + "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", + MySubscription->name, n_seqinfos); + + while (cur_batch_base_index < n_seqinfos) + { + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, + BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + int batch_size = 0; + int batch_succeeded_count = 0; + int batch_mismatched_count = 0; + int batch_skipped_count = 0; + int batch_insuffperm_count = 0; + int batch_missing_count; + Relation sequence_rel; + + WalRcvExecResult *res; + TupleTableSlot *slot; + + StartTransactionCommand(); + + for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + if (seqstr->len > 0) + appendStringInfoString(seqstr, ", "); + + appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)", + seqinfo->nspname, seqinfo->seqname, idx); + + if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH) + break; + } + + /* + * We deliberately avoid acquiring a local lock on the sequence before + * querying the publisher to prevent potential distributed deadlocks + * in bi-directional replication setups. + * + * Example scenario: + * + * - On each node, a background worker acquires a lock on a sequence + * as part of a sync operation. + * + * - Concurrently, a user transaction attempts to alter the same + * sequence, waiting on the background worker's lock. + * + * - Meanwhile, a query from the other node tries to access metadata + * that depends on the completion of the alter operation. + * + * - This creates a circular wait across nodes: + * + * Node-1: Query -> waits on Alter -> waits on Sync Worker + * + * Node-2: Query -> waits on Alter -> waits on Sync Worker + * + * Since each node only sees part of the wait graph, the deadlock may + * go undetected, leading to indefinite blocking. + * + * Note: Each entry in VALUES includes an index 'seqidx' that + * represents the sequence's position in the local 'seqinfos' list. + * This index is propagated to the query results and later used to + * directly map the fetched publisher sequence rows back to their + * corresponding local entries without relying on result order or name + * matching. + */ + appendStringInfo(cmd, + "SELECT s.seqidx, ps.*, seq.seqtypid,\n" + " seq.seqstart, seq.seqincrement, seq.seqmin,\n" + " seq.seqmax, seq.seqcycle\n" + "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" + "JOIN pg_namespace n ON n.nspname = s.schname\n" + "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n", + seqstr->data); + + res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch sequence information from the publisher: %s", + res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + CopySeqResult sync_status; + LogicalRepSequenceInfo *seqinfo; + int seqidx; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + sync_status = get_and_validate_seq_info(slot, &sequence_rel, + &seqinfo, &seqidx); + if (sync_status == COPYSEQ_SUCCESS) + sync_status = copy_sequence(seqinfo, + sequence_rel->rd_rel->relowner); + + switch (sync_status) + { + case COPYSEQ_SUCCESS: + elog(DEBUG1, + "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, seqinfo->nspname, + seqinfo->seqname); + batch_succeeded_count++; + break; + case COPYSEQ_MISMATCH: + + /* + * Remember mismatched sequences in a long-lived memory + * context since these will be used after the transaction + * is committed. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_mismatched_count++; + break; + case COPYSEQ_INSUFFICIENT_PERM: + + /* + * Remember sequences with insufficient privileges in a + * long-lived memory context since these will be used + * after the transaction is committed. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_insuffperm_count++; + break; + case COPYSEQ_SKIPPED: + ereport(LOG, + errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently", + seqinfo->nspname, + seqinfo->seqname)); + batch_skipped_count++; + break; + } + + if (sequence_rel) + table_close(sequence_rel, NoLock); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + resetStringInfo(seqstr); + resetStringInfo(cmd); + + batch_missing_count = batch_size - (batch_succeeded_count + + batch_mismatched_count + + batch_insuffperm_count + + batch_skipped_count); + + elog(DEBUG1, + "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped", + MySubscription->name, + (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, + batch_size, batch_succeeded_count, batch_mismatched_count, + batch_insuffperm_count, batch_missing_count, batch_skipped_count); + + /* Commit this batch, and prepare for next batch */ + CommitTransactionCommand(); + + if (batch_missing_count) + { + for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + /* If the sequence was not found on publisher, record it */ + if (!seqinfo->found_on_pub) + missing_seqs_idx = lappend_int(missing_seqs_idx, idx); + } + } + + /* + * cur_batch_base_index is not incremented sequentially because some + * sequences may be missing, and the number of fetched rows may not + * match the batch size. + */ + cur_batch_base_index += batch_size; + } + + /* Report mismatches, permission issues, or missing sequences */ + report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, + missing_seqs_idx); +} + +/* + * Identifies sequences that require synchronization and initiates the + * synchronization process. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + Relation rel; + HeapTuple tup; + ScanKeyData skey[2]; + SysScanDesc scan; + Oid subid = MyLogicalRepWorker->subid; + StringInfoData app_name; + + StartTransactionCommand(); + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_INIT)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 2, skey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + LogicalRepSequenceInfo *seq; + Relation sequence_rel; + MemoryContext oldctx; + + CHECK_FOR_INTERRUPTS(); + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); + + /* Skip if sequence was dropped concurrently */ + if (!sequence_rel) + continue; + + /* Skip if the relation is not a sequence */ + if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE) + { + table_close(sequence_rel, NoLock); + continue; + } + + /* + * Worker needs to process sequences across transaction boundary, so + * allocate them under long-lived context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + + seq = palloc0_object(LogicalRepSequenceInfo); + seq->localrelid = subrel->srrelid; + seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + seq->seqname = pstrdup(RelationGetRelationName(sequence_rel)); + seqinfos = lappend(seqinfos, seq); + + MemoryContextSwitchTo(oldctx); + + table_close(sequence_rel, NoLock); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + /* + * Exit early if no catalog entries found, likely due to concurrent drops. + */ + if (!seqinfos) + return; + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT, + MySubscription->oid, GetSystemIdentifier()); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", + MySubscription->name, err)); + + pfree(app_name.data); + + copy_sequences(LogRepWorkerWalRcvConn); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Note that we don't handle FATAL errors which are probably because of system + * resource error and are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + FinishSyncWorker(); +} diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index ae8c9385916..a696b1f2dc0 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -16,6 +16,7 @@ #include "catalog/pg_subscription_rel.h" #include "pgstat.h" +#include "replication/logicallauncher.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "utils/lsyscache.h" @@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE pg_noreturn void FinishSyncWorker(void) { + Assert(am_sequencesync_worker() || am_tablesync_worker()); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -61,16 +64,31 @@ FinishSyncWorker(void) /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); + if (am_sequencesync_worker()) + { + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); + + /* + * Reset last_seqsync_start_time, so that next time a sequencesync + * worker is needed it can be started promptly. + */ + logicalrep_reset_seqsync_start_time(); + } + else + { + StartTransactionCommand(); + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, - InvalidOid); + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, + InvalidOid); + } /* Stop gracefully */ proc_exit(0); @@ -86,7 +104,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Attempt to launch a sync worker for one or more sequences or a table, if + * a worker slot is available and the retry interval has elapsed. + * + * wtype: sync worker type. + * nsyncworkers: Number of currently running sync workers for the subscription. + * relid: InvalidOid for sequencesync worker, actual relid for tablesync + * worker. + * last_start_time: Pointer to the last start time of the worker. + */ +void +launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, + TimestampTz *last_start_time) +{ + TimestampTz now; + + Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) || + (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid))); + + /* If there is a free sync worker slot, start a new sync worker */ + if (nsyncworkers >= max_sync_workers_per_subscription) + return; + + now = GetCurrentTimestamp(); + + if (!(*last_start_time) || + TimestampDifferenceExceeds(*last_start_time, now, + wal_retrieve_retry_interval)) + { + /* + * Set the last_start_time even if we fail to start the worker, so + * that we won't retry until wal_retrieve_retry_interval has elapsed. + */ + *last_start_time = now; + (void) logicalrep_worker_launch(wtype, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + relid, DSM_HANDLE_INVALID, false); + } +} + +/* + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) @@ -108,6 +171,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) case WORKERTYPE_APPLY: ProcessSyncingTablesForApply(current_lsn); + ProcessSequencesForSync(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to process relations"); break; case WORKERTYPE_UNKNOWN: @@ -117,17 +186,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) } /* - * Common code to fetch the up-to-date sync state info into the static lists. + * Common code to fetch the up-to-date sync state info for tables and sequences. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we identify non-READY tables and non-READY sequences together to ensure + * consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * has_pending_subtables: true if the subscription has one or more tables that + * are not in READY state, otherwise false. + * has_pending_subsequences: true if the subscription has one or more sequences + * that are not in READY state, otherwise false. */ -bool -FetchRelationStates(bool *started_tx) +void +FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_subsequences, + bool *started_tx) { + /* + * has_subtables and has_subsequences_non_ready are declared as static, + * since the same value can be used until the system table is invalidated. + */ static bool has_subtables = false; + static bool has_subsequences_non_ready = false; *started_tx = false; @@ -135,10 +216,10 @@ FetchRelationStates(bool *started_tx) { MemoryContext oldctx; List *rstates; - ListCell *lc; SubscriptionRelState *rstate; relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + has_subsequences_non_ready = false; /* Clean the old lists. */ list_free_deep(table_states_not_ready); @@ -150,17 +231,23 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true, false, + /* Fetch tables and sequences that are in non-READY state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) + foreach_ptr(SubscriptionRelState, subrel, rstates) { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE) + has_subsequences_non_ready = true; + else + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, subrel, sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, + rstate); + } } MemoryContextSwitchTo(oldctx); @@ -185,5 +272,9 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } - return has_subtables; + if (has_pending_subtables) + *has_pending_subtables = has_subtables; + + if (has_pending_subsequences) + *has_pending_subsequences = has_subsequences_non_ready; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 58c98488d7b..e5a2856fd17 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -374,14 +374,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) }; static HTAB *last_start_times = NULL; ListCell *lc; - bool started_tx = false; + bool started_tx; bool should_exit = false; Relation rel = NULL; Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchRelationStates(&started_tx); + FetchRelationStates(NULL, NULL, &started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -415,6 +415,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE); + if (rstate->state == SUBREL_STATE_SYNCDONE) { /* @@ -428,11 +436,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -552,43 +555,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ int nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + struct tablesync_start_time_mapping *hentry; + bool found; /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); - /* - * If there are free sync worker slot(s), start a new sync - * worker for the table. - */ - if (nsyncworkers < max_sync_workers_per_subscription) - { - TimestampTz now = GetCurrentTimestamp(); - struct tablesync_start_time_mapping *hentry; - bool found; + hentry = hash_search(last_start_times, &rstate->relid, + HASH_ENTER, &found); + if (!found) + hentry->last_start_time = 0; - hentry = hash_search(last_start_times, &rstate->relid, - HASH_ENTER, &found); - - if (!found || - TimestampDifferenceExceeds(hentry->last_start_time, now, - wal_retrieve_retry_interval)) - { - /* - * Set the last_start_time even if we fail to start - * the worker, so that we won't retry until - * wal_retrieve_retry_interval has elapsed. - */ - hentry->last_start_time = now; - (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC, - MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid, - DSM_HANDLE_INVALID, - false); - } - } + launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers, + rstate->relid, &hentry->last_start_time); } } } @@ -1432,8 +1411,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) } /* - * Make sure that the copy command runs as the table owner, unless the - * user has opted out of that behaviour. + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the table as the owner of the table. */ run_as_owner = MySubscription->runasowner; if (!run_as_owner) @@ -1596,7 +1575,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1618,11 +1597,11 @@ TablesyncWorkerMain(Datum main_arg) bool AllTablesyncsReady(void) { - bool started_tx = false; - bool has_subrels = false; + bool started_tx; + bool has_tables; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchRelationStates(&started_tx); + FetchRelationStates(&has_tables, NULL, &started_tx); if (started_tx) { @@ -1634,7 +1613,7 @@ AllTablesyncsReady(void) * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return has_tables && (table_states_not_ready == NIL); } /* @@ -1649,10 +1628,10 @@ bool HasSubscriptionTablesCached(void) { bool started_tx; - bool has_subrels; + bool has_tables; /* We need up-to-date subscription tables info here */ - has_subrels = FetchRelationStates(&started_tx); + FetchRelationStates(&has_tables, NULL, &started_tx); if (started_tx) { @@ -1660,7 +1639,7 @@ HasSubscriptionTablesCached(void) pgstat_report_stat(true); } - return has_subrels; + return has_tables; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7edd1c9cf06..e1c757c911e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to apply changes"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any relations that are being synchronized in parallel + * and any newly added tables or sequences. + */ ProcessSyncingRelations(last_received); } @@ -5700,8 +5726,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -5809,13 +5835,17 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else if (am_sequencesync_worker()) + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name)); else ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" has started", - MySubscription->name))); + errmsg("logical replication apply worker for subscription \"%s\" has started", + MySubscription->name)); CommitTransactionCommand(); } @@ -5831,14 +5861,16 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync and sequencesync worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -5921,9 +5953,15 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); + if (am_leader_apply_worker() || am_tablesync_worker()) + { + /* + * Report the worker failed during either table synchronization or + * apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + !am_tablesync_worker()); + } /* Disable the subscription */ StartTransactionCommand(); |
