summaryrefslogtreecommitdiff
path: root/tests/extmod/select_poll_custom.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/extmod/select_poll_custom.py')
-rw-r--r--tests/extmod/select_poll_custom.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/tests/extmod/select_poll_custom.py b/tests/extmod/select_poll_custom.py
new file mode 100644
index 000000000..0cbb61032
--- /dev/null
+++ b/tests/extmod/select_poll_custom.py
@@ -0,0 +1,83 @@
+# Test custom pollable objects implemented in Python.
+
+from micropython import const
+
+try:
+ import socket, select, io
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+_MP_STREAM_POLL = const(3)
+_MP_STREAM_GET_FILENO = const(10)
+
+_MP_STREAM_POLL_RD = const(0x0001)
+_MP_STREAM_POLL_WR = const(0x0004)
+
+
+def print_poll_output(lst):
+ print([(type(obj), flags) for obj, flags in lst])
+
+
+class CustomPollable(io.IOBase):
+ def __init__(self):
+ self.poll_state = 0
+
+ def ioctl(self, cmd, arg):
+ if cmd == _MP_STREAM_GET_FILENO:
+ # Bare-metal ports don't call this ioctl, so don't print it.
+ return -1
+
+ print("CustomPollable.ioctl", cmd, arg)
+ if cmd == _MP_STREAM_POLL:
+ if self.poll_state == "delay_rd":
+ self.poll_state = _MP_STREAM_POLL_RD
+ return 0
+ elif self.poll_state < 0:
+ return self.poll_state
+ else:
+ return self.poll_state & arg
+
+
+poller = select.poll()
+
+# Use a new UDP socket for tests, which should be writable but not readable.
+try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
+except OSError:
+ print("SKIP")
+ raise SystemExit
+
+x = CustomPollable()
+
+# Register both a file-descriptor-based object and a custom pure-Python object.
+poller.register(s)
+poller.register(x)
+
+# Modify the flags for the custom object.
+poller.modify(x, select.POLLIN)
+
+# Test polling.
+print_poll_output(poller.poll(0))
+x.poll_state = _MP_STREAM_POLL_WR
+print_poll_output(poller.poll(0))
+x.poll_state = _MP_STREAM_POLL_RD
+print_poll_output(poller.poll(0))
+
+# The custom object becomes readable only after being polled.
+poller.modify(s, select.POLLIN)
+x.poll_state = "delay_rd"
+print_poll_output(poller.poll())
+
+# The custom object returns an error.
+x.poll_state = -1000
+try:
+ poller.poll(0)
+except OSError as er:
+ print("OSError", er.errno)
+
+poller.unregister(x)
+poller.unregister(s)
+
+s.close()