1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
/*-------------------------------------------------------------------------
* syncutils.c
* PostgreSQL logical replication: common synchronization code
*
* Copyright (c) 2025, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/syncutils.c
*
* NOTES
* This file contains code common for synchronization workers.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_subscription_rel.h"
#include "pgstat.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
/*
* Enum for phases of the subscription relations state.
*
* SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
* state is no longer valid, and the subscription relations should be rebuilt.
*
* SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
* relations state is being rebuilt.
*
* SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
* up-to-date and valid.
*/
typedef enum
{
SYNC_RELATIONS_STATE_NEEDS_REBUILD,
SYNC_RELATIONS_STATE_REBUILD_STARTED,
SYNC_RELATIONS_STATE_VALID,
} SyncingRelationsState;
static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
/*
* Exit routine for synchronization worker.
*/
pg_noreturn void
FinishSyncWorker(void)
{
/*
* Commit any outstanding transaction. This is the usual case, unless
* there was nothing to do for the table.
*/
if (IsTransactionState())
{
CommitTransactionCommand();
pgstat_report_stat(true);
}
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
StartTransactionCommand();
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
/* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
proc_exit(0);
}
/*
* Callback from syscache invalidation.
*/
void
InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
{
relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
}
/*
* Process possible state change(s) of relations that are being synchronized.
*/
void
ProcessSyncingRelations(XLogRecPtr current_lsn)
{
switch (MyLogicalRepWorker->type)
{
case WORKERTYPE_PARALLEL_APPLY:
/*
* Skip for parallel apply workers because they only operate on
* tables that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
break;
case WORKERTYPE_TABLESYNC:
ProcessSyncingTablesForSync(current_lsn);
break;
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
break;
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
}
}
/*
* Common code to fetch the up-to-date sync state info into the static lists.
*
* Returns true if subscription has 1 or more tables, else false.
*
* Note: If this function started the transaction (indicated by the parameter)
* then it is the caller's responsibility to commit it.
*/
bool
FetchRelationStates(bool *started_tx)
{
static bool has_subtables = false;
*started_tx = false;
if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
if (!IsTransactionState())
{
StartTransactionCommand();
*started_tx = true;
}
/* Fetch tables and sequences that are in non-ready state. */
rstates = GetSubscriptionRelations(MySubscription->oid, true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach(lc, rstates)
{
rstate = palloc(sizeof(SubscriptionRelState));
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
table_states_not_ready = lappend(table_states_not_ready, rstate);
}
MemoryContextSwitchTo(oldctx);
/*
* Does the subscription have tables?
*
* If there were not-READY tables found then we know it does. But if
* table_states_not_ready was empty we still need to check again to
* see if there are 0 tables.
*/
has_subtables = (table_states_not_ready != NIL) ||
HasSubscriptionTables(MySubscription->oid);
/*
* If the subscription relation cache has been invalidated since we
* entered this routine, we still use and return the relations we just
* finished constructing, to avoid infinite loops, but we leave the
* table states marked as stale so that we'll rebuild it again on next
* access. Otherwise, we mark the table states as valid.
*/
if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
relation_states_validity = SYNC_RELATIONS_STATE_VALID;
}
return has_subtables;
}
|