summaryrefslogtreecommitdiff
path: root/tests/extmod/asyncio_heaplock.py
diff options
context:
space:
mode:
authorJim Mussared <jim.mussared@gmail.com>2023-06-08 16:01:38 +1000
committerDamien George <damien@micropython.org>2023-06-19 17:33:03 +1000
commit6027c41c8f5b8f1a9e7b85b2bb93b3e6f2718e54 (patch)
tree08f41a4d0cd48fa5c0bc49519832ac2faba6923a /tests/extmod/asyncio_heaplock.py
parent2fbc08c462e247e7f78460783c9a07c76c5b762e (diff)
tests: Rename uasyncio to asyncio.
This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
Diffstat (limited to 'tests/extmod/asyncio_heaplock.py')
-rw-r--r--tests/extmod/asyncio_heaplock.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/tests/extmod/asyncio_heaplock.py b/tests/extmod/asyncio_heaplock.py
new file mode 100644
index 000000000..8326443f0
--- /dev/null
+++ b/tests/extmod/asyncio_heaplock.py
@@ -0,0 +1,79 @@
+# test that the following do not use the heap:
+# - basic scheduling of tasks
+# - asyncio.sleep_ms
+# - StreamWriter.write, stream is blocked and data to write is a bytes object
+# - StreamWriter.write, when stream is not blocked
+
+import micropython
+
+# strict stackless builds can't call functions without allocating a frame on the heap
+try:
+ # force bytecode (in case we're running with emit=native) and verify
+ # that bytecode-calling-bytecode doesn't allocate
+ @micropython.bytecode
+ def f(x):
+ x and f(x - 1)
+
+ micropython.heap_lock()
+ f(1)
+ micropython.heap_unlock()
+except RuntimeError:
+ # RuntimeError (max recursion depth) not MemoryError because effectively
+ # the recursion depth is at the limit while the heap is locked with
+ # stackless
+ print("SKIP")
+ raise SystemExit
+
+try:
+ import asyncio
+except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+
+class TestStream:
+ def __init__(self, blocked):
+ self.blocked = blocked
+
+ def write(self, data):
+ print("TestStream.write", data)
+ if self.blocked:
+ return None
+ return len(data)
+
+
+async def task(id, n, t):
+ for i in range(n):
+ print(id, i)
+ await asyncio.sleep_ms(t)
+
+
+async def main():
+ t1 = asyncio.create_task(task(1, 4, 100))
+ t2 = asyncio.create_task(task(2, 2, 250))
+
+ # test scheduling tasks, and calling sleep_ms
+ micropython.heap_lock()
+ print("start")
+ await asyncio.sleep_ms(5)
+ print("sleep")
+ await asyncio.sleep_ms(350)
+ print("finish")
+ micropython.heap_unlock()
+
+ # test writing to a stream, when the underlying stream is blocked
+ s = asyncio.StreamWriter(TestStream(True), None)
+ micropython.heap_lock()
+ s.write(b"12")
+ micropython.heap_unlock()
+
+ # test writing to a stream, when the underlying stream is not blocked
+ buf = bytearray(b"56")
+ s = asyncio.StreamWriter(TestStream(False), None)
+ micropython.heap_lock()
+ s.write(b"34")
+ s.write(buf)
+ micropython.heap_unlock()
+
+
+asyncio.run(main())