summaryrefslogtreecommitdiff
path: root/tests/extmod/select_poll_eintr.py
blob: fdc5ee5074a6182d56fea753a32726be1e9f0cbc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Test interruption of select.poll by EINTR signal, when
# MICROPY_PY_SELECT_POSIX_OPTIMISATIONS is enabled.

try:
    import time, gc, select, socket, _thread

    time.time_ns  # Check for time_ns on MicroPython
    select.poll  # Raises AttributeError for CPython implementations without poll()
except (ImportError, AttributeError):
    print("SKIP")
    raise SystemExit

# Use a new UDP socket for tests, which should be writable but not readable.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
localhost_addr_info = socket.getaddrinfo("127.0.0.1", 8000)
try:
    s.bind(localhost_addr_info[0][-1])
except OSError:
    # Target can't bind to localhost.
    # Most likely it doesn't have a NIC and the test cannot be run.
    s.close()
    print("SKIP")
    raise SystemExit


def thread_main():
    lock.acquire()
    time.sleep(0.2)
    print("thread gc start")
    # The unix gc.collect() implementation will raise EINTR on other threads.
    # Could possibly use _thread._interrupt_main() instead if MicroPython had it.
    gc.collect()
    print("thread gc end")


# Pre-allocate global variables here so the global dict is not resized by the main
# thread while the secondary thread runs.  This is a workaround for the bug described
# in https://github.com/micropython/micropython/pull/11604
poller = None
t0 = None
result = None
dt_ms = None

# Start a thread to interrupt the main thread during its call to poll.
lock = _thread.allocate_lock()
lock.acquire()
_thread.start_new_thread(thread_main, ())

# Create the poller object.
poller = select.poll()
poller.register(s, select.POLLIN)

# Poll on the UDP socket for a set timeout, which should be reached.
print("poll")
lock.release()
t0 = time.time_ns()
result = poller.poll(400)
dt_ms = (time.time_ns() - t0) / 1e6
print("result:", result)
if 380 <= dt_ms <= 600:
    print("dt in range")
else:
    print("dt not in range:", dt_ms)

# Clean up.
s.close()