diff options
| author | Amit Kapila <akapila@postgresql.org> | 2025-11-05 05:54:25 +0000 |
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2025-11-05 05:59:58 +0000 |
| commit | 5509055d6956745532e65ab218e15b99d87d66ce (patch) | |
| tree | 6d3776b6e028dbe5a465657395c5c624b0b21a75 /src/include/replication/worker_internal.h | |
| parent | 1fd981f05369340a8afa4d013a350b0b2ac6e33e (diff) | |
Add sequence synchronization for logical replication.
This patch introduces sequence synchronization. Sequences that are synced
will have 2 states:
- INIT (needs [re]synchronizing)
- READY (is already synchronized)
A new sequencesync worker is launched as needed to synchronize sequences.
A single sequencesync worker is responsible for synchronizing all
sequences. It begins by retrieving the list of sequences that are flagged
for synchronization, i.e., those in the INIT state. These sequences are
then processed in batches, allowing multiple entries to be synchronized
within a single transaction. The worker fetches the current sequence
values and page LSNs from the remote publisher, updates the corresponding
sequences on the local subscriber, and finally marks each sequence as
READY upon successful synchronization.
Sequence synchronization occurs in 3 places:
1) CREATE SUBSCRIPTION
- The command syntax remains unchanged.
- The subscriber retrieves sequences associated with publications.
- Published sequences are added to pg_subscription_rel with INIT
state.
- Initiate the sequencesync worker to synchronize all sequences.
2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION
- The command syntax remains unchanged.
- Dropped published sequences are removed from pg_subscription_rel.
- Newly published sequences are added to pg_subscription_rel with INIT
state.
- Initiate the sequencesync worker to synchronize only newly added
sequences.
3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES
- A new command introduced for PG19 by f0b3573c3a.
- All sequences in pg_subscription_rel are reset to INIT state.
- Initiate the sequencesync worker to synchronize all sequences.
- Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command,
addition and removal of missing sequences will not be done in this
case.
Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
Diffstat (limited to 'src/include/replication/worker_internal.h')
| -rw-r--r-- | src/include/replication/worker_internal.h | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e23fa9a4514..f081619f151 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -106,6 +107,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz last_seqsync_start_time; } LogicalRepWorker; /* @@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_reset_seqsync_start_time(void); extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, @@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSequencesForSync(void); pg_noreturn extern void FinishSyncWorker(void); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, + Oid relid, TimestampTz *last_start_time); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern void FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_sequences, bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool |
