| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
 | /*-------------------------------------------------------------------------
 *
 * basebackup_throttle.c
 *	  Basebackup sink implementing throttling. Data is forwarded to the
 *	  next base backup sink in the chain at a rate no greater than the
 *	  configured maximum.
 *
 * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/replication/basebackup_throttle.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"
#include "miscadmin.h"
#include "replication/basebackup_sink.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
typedef struct bbsink_throttle
{
	/* Common information for all types of sink. */
	bbsink		base;
	/* The actual number of bytes, transfer of which may cause sleep. */
	uint64		throttling_sample;
	/* Amount of data already transferred but not yet throttled.  */
	int64		throttling_counter;
	/* The minimum time required to transfer throttling_sample bytes. */
	TimeOffset	elapsed_min_unit;
	/* The last check of the transfer rate. */
	TimestampTz throttled_last;
} bbsink_throttle;
static void bbsink_throttle_begin_backup(bbsink *sink);
static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
static void throttle(bbsink_throttle *sink, size_t increment);
const bbsink_ops bbsink_throttle_ops = {
	.begin_backup = bbsink_throttle_begin_backup,
	.begin_archive = bbsink_forward_begin_archive,
	.archive_contents = bbsink_throttle_archive_contents,
	.end_archive = bbsink_forward_end_archive,
	.begin_manifest = bbsink_forward_begin_manifest,
	.manifest_contents = bbsink_throttle_manifest_contents,
	.end_manifest = bbsink_forward_end_manifest,
	.end_backup = bbsink_forward_end_backup,
	.cleanup = bbsink_forward_cleanup
};
/*
 * How frequently to throttle, as a fraction of the specified rate-second.
 */
#define THROTTLING_FREQUENCY	8
/*
 * Create a new basebackup sink that performs throttling and forwards data
 * to a successor sink.
 */
bbsink *
bbsink_throttle_new(bbsink *next, uint32 maxrate)
{
	bbsink_throttle *sink;
	Assert(next != NULL);
	Assert(maxrate > 0);
	sink = palloc0(sizeof(bbsink_throttle));
	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
	sink->base.bbs_next = next;
	sink->throttling_sample =
		(int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
	/*
	 * The minimum amount of time for throttling_sample bytes to be
	 * transferred.
	 */
	sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
	return &sink->base;
}
/*
 * There's no real work to do here, but we need to record the current time so
 * that it can be used for future calculations.
 */
static void
bbsink_throttle_begin_backup(bbsink *sink)
{
	bbsink_throttle *mysink = (bbsink_throttle *) sink;
	bbsink_forward_begin_backup(sink);
	/* The 'real data' starts now (header was ignored). */
	mysink->throttled_last = GetCurrentTimestamp();
}
/*
 * First throttle, and then pass archive contents to next sink.
 */
static void
bbsink_throttle_archive_contents(bbsink *sink, size_t len)
{
	throttle((bbsink_throttle *) sink, len);
	bbsink_forward_archive_contents(sink, len);
}
/*
 * First throttle, and then pass manifest contents to next sink.
 */
static void
bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
{
	throttle((bbsink_throttle *) sink, len);
	bbsink_forward_manifest_contents(sink, len);
}
/*
 * Increment the network transfer counter by the given number of bytes,
 * and sleep if necessary to comply with the requested network transfer
 * rate.
 */
static void
throttle(bbsink_throttle *sink, size_t increment)
{
	TimeOffset	elapsed_min;
	Assert(sink->throttling_counter >= 0);
	sink->throttling_counter += increment;
	if (sink->throttling_counter < sink->throttling_sample)
		return;
	/* How much time should have elapsed at minimum? */
	elapsed_min = sink->elapsed_min_unit *
		(sink->throttling_counter / sink->throttling_sample);
	/*
	 * Since the latch could be set repeatedly because of concurrently WAL
	 * activity, sleep in a loop to ensure enough time has passed.
	 */
	for (;;)
	{
		TimeOffset	elapsed,
					sleep;
		int			wait_result;
		/* Time elapsed since the last measurement (and possible wake up). */
		elapsed = GetCurrentTimestamp() - sink->throttled_last;
		/* sleep if the transfer is faster than it should be */
		sleep = elapsed_min - elapsed;
		if (sleep <= 0)
			break;
		ResetLatch(MyLatch);
		/* We're eating a potentially set latch, so check for interrupts */
		CHECK_FOR_INTERRUPTS();
		/*
		 * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
		 * the maximum time to sleep. Thus the cast to long is safe.
		 */
		wait_result = WaitLatch(MyLatch,
								WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
								(long) (sleep / 1000),
								WAIT_EVENT_BASE_BACKUP_THROTTLE);
		if (wait_result & WL_LATCH_SET)
			CHECK_FOR_INTERRUPTS();
		/* Done waiting? */
		if (wait_result & WL_TIMEOUT)
			break;
	}
	/*
	 * As we work with integers, only whole multiple of throttling_sample was
	 * processed. The rest will be done during the next call of this function.
	 */
	sink->throttling_counter %= sink->throttling_sample;
	/*
	 * Time interval for the remaining amount and possible next increments
	 * starts now.
	 */
	sink->throttled_last = GetCurrentTimestamp();
}
 |