summaryrefslogtreecommitdiff
path: root/extmod/uasyncio/funcs.py
blob: a1d38fbcbf975888c5839a881182aa4326232584 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2022 Damien P. George

from . import core


def _run(waiter, aw):
    try:
        result = await aw
        status = True
    except BaseException as er:
        result = None
        status = er
    if waiter.data is None:
        # The waiter is still waiting, cancel it.
        if waiter.cancel():
            # Waiter was cancelled by us, change its CancelledError to an instance of
            # CancelledError that contains the status and result of waiting on aw.
            # If the wait_for task subsequently gets cancelled externally then this
            # instance will be reset to a CancelledError instance without arguments.
            waiter.data = core.CancelledError(status, result)


async def wait_for(aw, timeout, sleep=core.sleep):
    aw = core._promote_to_task(aw)
    if timeout is None:
        return await aw

    # Run aw in a separate runner task that manages its exceptions.
    runner_task = core.create_task(_run(core.cur_task, aw))

    try:
        # Wait for the timeout to elapse.
        await sleep(timeout)
    except core.CancelledError as er:
        status = er.value
        if status is None:
            # This wait_for was cancelled externally, so cancel aw and re-raise.
            runner_task.cancel()
            raise er
        elif status is True:
            # aw completed successfully and cancelled the sleep, so return aw's result.
            return er.args[1]
        else:
            # aw raised an exception, propagate it out to the caller.
            raise status

    # The sleep finished before aw, so cancel aw and raise TimeoutError.
    runner_task.cancel()
    await runner_task
    raise core.TimeoutError


def wait_for_ms(aw, timeout):
    return wait_for(aw, timeout, core.sleep_ms)


class _Remove:
    @staticmethod
    def remove(t):
        pass


async def gather(*aws, return_exceptions=False):
    def done(t, er):
        # Sub-task "t" has finished, with exception "er".
        nonlocal state
        if gather_task.data is not _Remove:
            # The main gather task has already been scheduled, so do nothing.
            # This happens if another sub-task already raised an exception and
            # woke the main gather task (via this done function), or if the main
            # gather task was cancelled externally.
            return
        elif not return_exceptions and not isinstance(er, StopIteration):
            # A sub-task raised an exception, indicate that to the gather task.
            state = er
        else:
            state -= 1
            if state:
                # Still some sub-tasks running.
                return
        # Gather waiting is done, schedule the main gather task.
        core._task_queue.push(gather_task)

    ts = [core._promote_to_task(aw) for aw in aws]
    for i in range(len(ts)):
        if ts[i].state is not True:
            # Task is not running, gather not currently supported for this case.
            raise RuntimeError("can't gather")
        # Register the callback to call when the task is done.
        ts[i].state = done

    # Set the state for execution of the gather.
    gather_task = core.cur_task
    state = len(ts)
    cancel_all = False

    # Wait for the a sub-task to need attention.
    gather_task.data = _Remove
    try:
        yield
    except core.CancelledError as er:
        cancel_all = True
        state = er

    # Clean up tasks.
    for i in range(len(ts)):
        if ts[i].state is done:
            # Sub-task is still running, deregister the callback and cancel if needed.
            ts[i].state = True
            if cancel_all:
                ts[i].cancel()
        elif isinstance(ts[i].data, StopIteration):
            # Sub-task ran to completion, get its return value.
            ts[i] = ts[i].data.value
        else:
            # Sub-task had an exception with return_exceptions==True, so get its exception.
            ts[i] = ts[i].data

    # Either this gather was cancelled, or one of the sub-tasks raised an exception with
    # return_exceptions==False, so reraise the exception here.
    if state is not 0:
        raise state

    # Return the list of return values of each sub-task.
    return ts