| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
 | /* -------------------------------------------------------------------------
 *
 * pgstat_replslot.c
 *	  Implementation of replication slot statistics.
 *
 * This file contains the implementation of replication slot statistics. It is kept
 * separate from pgstat.c to enforce the line between the statistics access /
 * storage implementation and the details about individual types of
 * statistics.
 *
 * Replication slot stats work a bit different than other variable-numbered
 * stats. Slots do not have oids (so they can be created on physical
 * replicas). Use the slot index as object id while running. However, the slot
 * index can change when restarting. That is addressed by using the name when
 * (de-)serializing. After a restart it is possible for slots to have been
 * dropped while shut down, which is addressed by not restoring stats for
 * slots that cannot be found by name when starting up.
 *
 * Copyright (c) 2001-2025, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/utils/activity/pgstat_replslot.c
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "replication/slot.h"
#include "utils/pgstat_internal.h"
static int	get_replslot_index(const char *name, bool need_lock);
/*
 * Reset counters for a single replication slot.
 *
 * Permission checking for this function is managed through the normal
 * GRANT system.
 */
void
pgstat_reset_replslot(const char *name)
{
	ReplicationSlot *slot;
	Assert(name != NULL);
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	/* Check if the slot exits with the given name. */
	slot = SearchNamedReplicationSlot(name, false);
	if (!slot)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("replication slot \"%s\" does not exist",
						name)));
	/*
	 * Reset stats if it is a logical slot. Nothing to do for physical slots
	 * as we collect stats only for logical slots.
	 */
	if (SlotIsLogical(slot))
		pgstat_reset(PGSTAT_KIND_REPLSLOT, InvalidOid,
					 ReplicationSlotIndex(slot));
	LWLockRelease(ReplicationSlotControlLock);
}
/*
 * Report replication slot statistics.
 *
 * We can rely on the stats for the slot to exist and to belong to this
 * slot. We can only get here if pgstat_create_replslot() or
 * pgstat_acquire_replslot() have already been called.
 */
void
pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
{
	PgStat_EntryRef *entry_ref;
	PgStatShared_ReplSlot *shstatent;
	PgStat_StatReplSlotEntry *statent;
	entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
											ReplicationSlotIndex(slot), false);
	shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
	statent = &shstatent->stats;
	/* Update the replication slot statistics */
#define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld
	REPLSLOT_ACC(spill_txns);
	REPLSLOT_ACC(spill_count);
	REPLSLOT_ACC(spill_bytes);
	REPLSLOT_ACC(stream_txns);
	REPLSLOT_ACC(stream_count);
	REPLSLOT_ACC(stream_bytes);
	REPLSLOT_ACC(mem_exceeded_count);
	REPLSLOT_ACC(total_txns);
	REPLSLOT_ACC(total_bytes);
#undef REPLSLOT_ACC
	pgstat_unlock_entry(entry_ref);
}
/*
 * Report replication slot creation.
 *
 * NB: This gets called with ReplicationSlotAllocationLock already held, be
 * careful about calling back into slot.c.
 */
void
pgstat_create_replslot(ReplicationSlot *slot)
{
	PgStat_EntryRef *entry_ref;
	PgStatShared_ReplSlot *shstatent;
	Assert(LWLockHeldByMeInMode(ReplicationSlotAllocationLock, LW_EXCLUSIVE));
	entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
											ReplicationSlotIndex(slot), false);
	shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
	/*
	 * NB: need to accept that there might be stats from an older slot, e.g.
	 * if we previously crashed after dropping a slot.
	 */
	memset(&shstatent->stats, 0, sizeof(shstatent->stats));
	pgstat_unlock_entry(entry_ref);
}
/*
 * Report replication slot has been acquired.
 *
 * This guarantees that a stats entry exists during later
 * pgstat_report_replslot() calls.
 *
 * If we previously crashed, no stats data exists. But if we did not crash,
 * the stats do belong to this slot:
 * - the stats cannot belong to a dropped slot, pgstat_drop_replslot() would
 *   have been called
 * - if the slot was removed while shut down,
 *   pgstat_replslot_from_serialized_name_cb() returning false would have
 *   caused the stats to be dropped
 */
void
pgstat_acquire_replslot(ReplicationSlot *slot)
{
	pgstat_get_entry_ref(PGSTAT_KIND_REPLSLOT, InvalidOid,
						 ReplicationSlotIndex(slot), true, NULL);
}
/*
 * Report replication slot drop.
 */
void
pgstat_drop_replslot(ReplicationSlot *slot)
{
	Assert(LWLockHeldByMeInMode(ReplicationSlotAllocationLock, LW_EXCLUSIVE));
	if (!pgstat_drop_entry(PGSTAT_KIND_REPLSLOT, InvalidOid,
						   ReplicationSlotIndex(slot)))
		pgstat_request_entry_refs_gc();
}
/*
 * Support function for the SQL-callable pgstat* functions. Returns
 * a pointer to the replication slot statistics struct.
 */
PgStat_StatReplSlotEntry *
pgstat_fetch_replslot(NameData slotname)
{
	int			idx;
	PgStat_StatReplSlotEntry *slotentry = NULL;
	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
	idx = get_replslot_index(NameStr(slotname), false);
	if (idx != -1)
		slotentry = (PgStat_StatReplSlotEntry *) pgstat_fetch_entry(PGSTAT_KIND_REPLSLOT,
																	InvalidOid, idx);
	LWLockRelease(ReplicationSlotControlLock);
	return slotentry;
}
void
pgstat_replslot_to_serialized_name_cb(const PgStat_HashKey *key, const PgStatShared_Common *header, NameData *name)
{
	/*
	 * This is only called late during shutdown. The set of existing slots
	 * isn't allowed to change at this point, we can assume that a slot exists
	 * at the offset.
	 */
	if (!ReplicationSlotName(key->objid, name))
		elog(ERROR, "could not find name for replication slot index %" PRIu64,
			 key->objid);
}
bool
pgstat_replslot_from_serialized_name_cb(const NameData *name, PgStat_HashKey *key)
{
	int			idx = get_replslot_index(NameStr(*name), true);
	/* slot might have been deleted */
	if (idx == -1)
		return false;
	key->kind = PGSTAT_KIND_REPLSLOT;
	key->dboid = InvalidOid;
	key->objid = idx;
	return true;
}
void
pgstat_replslot_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts)
{
	((PgStatShared_ReplSlot *) header)->stats.stat_reset_timestamp = ts;
}
static int
get_replslot_index(const char *name, bool need_lock)
{
	ReplicationSlot *slot;
	Assert(name != NULL);
	slot = SearchNamedReplicationSlot(name, need_lock);
	if (!slot)
		return -1;
	return ReplicationSlotIndex(slot);
}
 |