diff options
author | Jim Mussared <jim.mussared@gmail.com> | 2023-06-08 16:01:38 +1000 |
---|---|---|
committer | Damien George <damien@micropython.org> | 2023-06-19 17:33:03 +1000 |
commit | 6027c41c8f5b8f1a9e7b85b2bb93b3e6f2718e54 (patch) | |
tree | 08f41a4d0cd48fa5c0bc49519832ac2faba6923a /tests/extmod/asyncio_heaplock.py | |
parent | 2fbc08c462e247e7f78460783c9a07c76c5b762e (diff) |
tests: Rename uasyncio to asyncio.
This work was funded through GitHub Sponsors.
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
Diffstat (limited to 'tests/extmod/asyncio_heaplock.py')
-rw-r--r-- | tests/extmod/asyncio_heaplock.py | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/tests/extmod/asyncio_heaplock.py b/tests/extmod/asyncio_heaplock.py new file mode 100644 index 000000000..8326443f0 --- /dev/null +++ b/tests/extmod/asyncio_heaplock.py @@ -0,0 +1,79 @@ +# test that the following do not use the heap: +# - basic scheduling of tasks +# - asyncio.sleep_ms +# - StreamWriter.write, stream is blocked and data to write is a bytes object +# - StreamWriter.write, when stream is not blocked + +import micropython + +# strict stackless builds can't call functions without allocating a frame on the heap +try: + # force bytecode (in case we're running with emit=native) and verify + # that bytecode-calling-bytecode doesn't allocate + @micropython.bytecode + def f(x): + x and f(x - 1) + + micropython.heap_lock() + f(1) + micropython.heap_unlock() +except RuntimeError: + # RuntimeError (max recursion depth) not MemoryError because effectively + # the recursion depth is at the limit while the heap is locked with + # stackless + print("SKIP") + raise SystemExit + +try: + import asyncio +except ImportError: + print("SKIP") + raise SystemExit + + +class TestStream: + def __init__(self, blocked): + self.blocked = blocked + + def write(self, data): + print("TestStream.write", data) + if self.blocked: + return None + return len(data) + + +async def task(id, n, t): + for i in range(n): + print(id, i) + await asyncio.sleep_ms(t) + + +async def main(): + t1 = asyncio.create_task(task(1, 4, 100)) + t2 = asyncio.create_task(task(2, 2, 250)) + + # test scheduling tasks, and calling sleep_ms + micropython.heap_lock() + print("start") + await asyncio.sleep_ms(5) + print("sleep") + await asyncio.sleep_ms(350) + print("finish") + micropython.heap_unlock() + + # test writing to a stream, when the underlying stream is blocked + s = asyncio.StreamWriter(TestStream(True), None) + micropython.heap_lock() + s.write(b"12") + micropython.heap_unlock() + + # test writing to a stream, when the underlying stream is not blocked + buf = bytearray(b"56") + s = asyncio.StreamWriter(TestStream(False), None) + micropython.heap_lock() + s.write(b"34") + s.write(buf) + micropython.heap_unlock() + + +asyncio.run(main()) |