1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
// SPDX-License-Identifier: GPL-2.0
/*
* Test for DSQ operations including create, destroy, and peek operations.
*
* Copyright (c) 2025 Meta Platforms, Inc. and affiliates.
* Copyright (c) 2025 Ryan Newton <ryan.newton@alum.mit.edu>
*/
#include <bpf/bpf.h>
#include <scx/common.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sched.h>
#include "peek_dsq.bpf.skel.h"
#include "scx_test.h"
#define NUM_WORKERS 4
static bool workload_running = true;
static pthread_t workload_threads[NUM_WORKERS];
/**
* Background workload thread that sleeps and wakes rapidly to exercise
* the scheduler's enqueue operations and ensure DSQ operations get tested.
*/
static void *workload_thread_fn(void *arg)
{
while (workload_running) {
/* Sleep for a very short time to trigger scheduler activity */
usleep(1000); /* 1ms sleep */
/* Yield to ensure we go through the scheduler */
sched_yield();
}
return NULL;
}
static enum scx_test_status setup(void **ctx)
{
struct peek_dsq *skel;
skel = peek_dsq__open();
SCX_FAIL_IF(!skel, "Failed to open");
SCX_ENUM_INIT(skel);
SCX_FAIL_IF(peek_dsq__load(skel), "Failed to load skel");
*ctx = skel;
return SCX_TEST_PASS;
}
static int print_observed_pids(struct bpf_map *map, int max_samples, const char *dsq_name)
{
long count = 0;
printf("Observed %s DSQ peek pids:\n", dsq_name);
for (int i = 0; i < max_samples; i++) {
long pid;
int err;
err = bpf_map_lookup_elem(bpf_map__fd(map), &i, &pid);
if (err == 0) {
if (pid == 0) {
printf(" Sample %d: NULL peek\n", i);
} else if (pid > 0) {
printf(" Sample %d: pid %ld\n", i, pid);
count++;
}
} else {
printf(" Sample %d: error reading pid (err=%d)\n", i, err);
}
}
printf("Observed ~%ld pids in the %s DSQ(s)\n", count, dsq_name);
return count;
}
static enum scx_test_status run(void *ctx)
{
struct peek_dsq *skel = ctx;
bool failed = false;
int seconds = 3;
int err;
/* Enable the scheduler to test DSQ operations */
printf("Enabling scheduler to test DSQ insert operations...\n");
struct bpf_link *link =
bpf_map__attach_struct_ops(skel->maps.peek_dsq_ops);
if (!link) {
SCX_ERR("Failed to attach struct_ops");
return SCX_TEST_FAIL;
}
printf("Starting %d background workload threads...\n", NUM_WORKERS);
workload_running = true;
for (int i = 0; i < NUM_WORKERS; i++) {
err = pthread_create(&workload_threads[i], NULL, workload_thread_fn, NULL);
if (err) {
SCX_ERR("Failed to create workload thread %d: %s", i, strerror(err));
/* Stop already created threads */
workload_running = false;
for (int j = 0; j < i; j++)
pthread_join(workload_threads[j], NULL);
bpf_link__destroy(link);
return SCX_TEST_FAIL;
}
}
printf("Waiting for enqueue events.\n");
sleep(seconds);
while (skel->data->enqueue_count <= 0) {
printf(".");
fflush(stdout);
sleep(1);
seconds++;
if (seconds >= 30) {
printf("\n\u2717 Timeout waiting for enqueue events\n");
/* Stop workload threads and cleanup */
workload_running = false;
for (int i = 0; i < NUM_WORKERS; i++)
pthread_join(workload_threads[i], NULL);
bpf_link__destroy(link);
return SCX_TEST_FAIL;
}
}
workload_running = false;
for (int i = 0; i < NUM_WORKERS; i++) {
err = pthread_join(workload_threads[i], NULL);
if (err) {
SCX_ERR("Failed to join workload thread %d: %s", i, strerror(err));
bpf_link__destroy(link);
return SCX_TEST_FAIL;
}
}
printf("Background workload threads stopped.\n");
SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_NONE));
/* Detach the scheduler */
bpf_link__destroy(link);
printf("Enqueue/dispatch count over %d seconds: %d / %d\n", seconds,
skel->data->enqueue_count, skel->data->dispatch_count);
printf("Debug: ksym_exists=%d\n",
skel->bss->debug_ksym_exists);
/* Check DSQ insert result */
printf("DSQ insert test done on cpu: %d\n", skel->data->insert_test_cpu);
if (skel->data->insert_test_cpu != -1)
printf("\u2713 DSQ insert succeeded !\n");
else {
printf("\u2717 DSQ insert failed or not attempted\n");
failed = true;
}
/* Check DSQ peek results */
printf(" DSQ peek result 1 (before insert): %d\n",
skel->data->dsq_peek_result1);
if (skel->data->dsq_peek_result1 == 0)
printf("\u2713 DSQ peek verification success: peek returned NULL!\n");
else {
printf("\u2717 DSQ peek verification failed\n");
failed = true;
}
printf(" DSQ peek result 2 (after insert): %ld\n",
skel->data->dsq_peek_result2);
printf(" DSQ peek result 2, expected: %ld\n",
skel->data->dsq_peek_result2_expected);
if (skel->data->dsq_peek_result2 ==
skel->data->dsq_peek_result2_expected)
printf("\u2713 DSQ peek verification success: peek returned the inserted task!\n");
else {
printf("\u2717 DSQ peek verification failed\n");
failed = true;
}
printf(" Inserted test task -> pid: %ld\n", skel->data->dsq_inserted_pid);
printf(" DSQ peek result 2 -> pid: %ld\n", skel->data->dsq_peek_result2_pid);
int pid_count;
pid_count = print_observed_pids(skel->maps.peek_results,
skel->data->max_samples, "DSQ pool");
printf("Total non-null peek observations: %ld out of %ld\n",
skel->data->successful_peeks, skel->data->total_peek_attempts);
if (skel->bss->debug_ksym_exists && pid_count == 0) {
printf("\u2717 DSQ pool test failed: no successful peeks in native mode\n");
failed = true;
}
if (skel->bss->debug_ksym_exists && pid_count > 0)
printf("\u2713 DSQ pool test success: observed successful peeks in native mode\n");
if (failed)
return SCX_TEST_FAIL;
else
return SCX_TEST_PASS;
}
static void cleanup(void *ctx)
{
struct peek_dsq *skel = ctx;
if (workload_running) {
workload_running = false;
for (int i = 0; i < NUM_WORKERS; i++)
pthread_join(workload_threads[i], NULL);
}
peek_dsq__destroy(skel);
}
struct scx_test peek_dsq = {
.name = "peek_dsq",
.description =
"Test DSQ create/destroy operations and future peek functionality",
.setup = setup,
.run = run,
.cleanup = cleanup,
};
REGISTER_SCX_TEST(&peek_dsq)
|