summaryrefslogtreecommitdiff
path: root/tests/micropython/ringio_async.py
blob: 2a4befc35276642820732b132e01715c7d0de9d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Check that micropython.RingIO works correctly with asyncio.Stream.

import micropython

try:
    import asyncio

    asyncio.StreamWriter
    micropython.RingIO
except (AttributeError, ImportError):
    print("SKIP")
    raise SystemExit

rb = micropython.RingIO(16)
rba = asyncio.StreamWriter(rb)

data = b"ABC123" * 20
print("w", len(data))


async def data_writer():
    global data
    rba.write(data)
    await rba.drain()


async def main():
    task = asyncio.create_task(data_writer())
    await asyncio.sleep_ms(10)
    read = await rba.readexactly(len(data))
    print(read)
    print("r", len(read))
    print(read == data)


asyncio.run(main())