diff options
Diffstat (limited to 'src/include/replication/worker_internal.h')
| -rw-r--r-- | src/include/replication/worker_internal.h | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e23fa9a4514..f081619f151 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -106,6 +107,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz last_seqsync_start_time; } LogicalRepWorker; /* @@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_reset_seqsync_start_time(void); extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, @@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSequencesForSync(void); pg_noreturn extern void FinishSyncWorker(void); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, + Oid relid, TimestampTz *last_start_time); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern void FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_sequences, bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool |
