summaryrefslogtreecommitdiff
path: root/tests/ports/webassembly/asyncio_top_level_await.mjs
blob: 234b7a6ce6fb9071432070696219cda12318fed2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Test top-level await on asyncio primitives: Task, Event.

const mp = await (await import(process.argv[2])).loadMicroPython();

/**********************************************************/
// Top-level await for an Event which is set by a JavaScript
// callback.

console.log("= TEST 1 ==========");

await mp.runPythonAsync(`
import asyncio
import js

event = asyncio.Event()

def callback():
    print("callback set event")
    event.set()

js.setTimeout(callback, 100)

print("top-level wait event")
await event.wait()
print("top-level end")
`);

console.log("finished");

/**********************************************************/
// Top-level await for a Task which is cancelled by a
// JavaScript callback.

console.log("= TEST 2 ==========");

await mp.runPythonAsync(`
import asyncio
import js
import time

async def task():
    print("task start")
    await asyncio.sleep(5)
    print("task end")

def callback():
    print("callback cancel task")
    t.cancel()

t  = asyncio.create_task(task())
js.setTimeout(callback, 100)

print("top-level wait task")
try:
    t0 = time.time()
    await t
except asyncio.CancelledError:
    dt = time.time() - t0
    print("top-level task CancelledError", dt < 1)
`);

console.log("finished");

/**********************************************************/
// Top-level await for an Event and a Task, with the task
// setting the event.

console.log("= TEST 3 ==========");

await mp.runPythonAsync(`
import asyncio

async def task(event):
    print("task set event")
    event.set()
    print("task sleep")
    await asyncio.sleep(0.1)
    print("task end")

event = asyncio.Event()
t = asyncio.create_task(task(event))

print("top-level wait event")
await event.wait()
print("top-level wait task")
await t
print("top-level end")
`);

console.log("finished");