summaryrefslogtreecommitdiff
path: root/src/include/replication/worker_internal.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication/worker_internal.h')
-rw-r--r--src/include/replication/worker_internal.h22
1 files changed, 19 insertions, 3 deletions
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index e23fa9a4514..f081619f151 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType
{
WORKERTYPE_UNKNOWN = 0,
WORKERTYPE_TABLESYNC,
+ WORKERTYPE_SEQUENCESYNC,
WORKERTYPE_APPLY,
WORKERTYPE_PARALLEL_APPLY,
} LogicalRepWorkerType;
@@ -106,6 +107,8 @@ typedef struct LogicalRepWorker
TimestampTz last_recv_time;
XLogRecPtr reply_lsn;
TimestampTz reply_time;
+
+ TimestampTz last_seqsync_start_time;
} LogicalRepWorker;
/*
@@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_reset_seqsync_start_time(void);
extern int logicalrep_sync_worker_count(Oid subid);
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
@@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+extern void ProcessSequencesForSync(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
+extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
+ Oid relid, TimestampTz *last_start_time);
extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
-extern bool FetchRelationStates(bool *started_tx);
+extern void FetchRelationStates(bool *has_pending_subtables,
+ bool *has_pending_sequences, bool *started_tx);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
@@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->in_use && \
+#define isTableSyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+#define isSequenceSyncWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_SEQUENCESYNC)
static inline bool
am_tablesync_worker(void)
{
- return isTablesyncWorker(MyLogicalRepWorker);
+ return isTableSyncWorker(MyLogicalRepWorker);
+}
+
+static inline bool
+am_sequencesync_worker(void)
+{
+ return isSequenceSyncWorker(MyLogicalRepWorker);
}
static inline bool