diff options
Diffstat (limited to 'extmod/asyncio')
| -rw-r--r-- | extmod/asyncio/__init__.py | 31 | ||||
| -rw-r--r-- | extmod/asyncio/core.py | 302 | ||||
| -rw-r--r-- | extmod/asyncio/event.py | 66 | ||||
| -rw-r--r-- | extmod/asyncio/funcs.py | 130 | ||||
| -rw-r--r-- | extmod/asyncio/lock.py | 55 | ||||
| -rw-r--r-- | extmod/asyncio/manifest.py | 15 | ||||
| -rw-r--r-- | extmod/asyncio/stream.py | 189 | ||||
| -rw-r--r-- | extmod/asyncio/task.py | 177 |
8 files changed, 965 insertions, 0 deletions
diff --git a/extmod/asyncio/__init__.py b/extmod/asyncio/__init__.py new file mode 100644 index 000000000..1f83750c5 --- /dev/null +++ b/extmod/asyncio/__init__.py @@ -0,0 +1,31 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019 Damien P. George + +from .core import * + +__version__ = (3, 0, 0) + +_attrs = { + "wait_for": "funcs", + "wait_for_ms": "funcs", + "gather": "funcs", + "Event": "event", + "ThreadSafeFlag": "event", + "Lock": "lock", + "open_connection": "stream", + "start_server": "stream", + "StreamReader": "stream", + "StreamWriter": "stream", +} + + +# Lazy loader, effectively does: +# global attr +# from .mod import attr +def __getattr__(attr): + mod = _attrs.get(attr, None) + if mod is None: + raise AttributeError(attr) + value = getattr(__import__(mod, None, None, True, 1), attr) + globals()[attr] = value + return value diff --git a/extmod/asyncio/core.py b/extmod/asyncio/core.py new file mode 100644 index 000000000..be5119ba6 --- /dev/null +++ b/extmod/asyncio/core.py @@ -0,0 +1,302 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019 Damien P. George + +from time import ticks_ms as ticks, ticks_diff, ticks_add +import sys, select + +# Import TaskQueue and Task, preferring built-in C code over Python code +try: + from _asyncio import TaskQueue, Task +except: + from .task import TaskQueue, Task + + +################################################################################ +# Exceptions + + +class CancelledError(BaseException): + pass + + +class TimeoutError(Exception): + pass + + +# Used when calling Loop.call_exception_handler +_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None} + + +################################################################################ +# Sleep functions + + +# "Yield" once, then raise StopIteration +class SingletonGenerator: + def __init__(self): + self.state = None + self.exc = StopIteration() + + def __iter__(self): + return self + + def __next__(self): + if self.state is not None: + _task_queue.push(cur_task, self.state) + self.state = None + return None + else: + self.exc.__traceback__ = None + raise self.exc + + +# Pause task execution for the given time (integer in milliseconds, uPy extension) +# Use a SingletonGenerator to do it without allocating on the heap +def sleep_ms(t, sgen=SingletonGenerator()): + assert sgen.state is None + sgen.state = ticks_add(ticks(), max(0, t)) + return sgen + + +# Pause task execution for the given time (in seconds) +def sleep(t): + return sleep_ms(int(t * 1000)) + + +################################################################################ +# Queue and poller for stream IO + + +class IOQueue: + def __init__(self): + self.poller = select.poll() + self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] + + def _enqueue(self, s, idx): + if id(s) not in self.map: + entry = [None, None, s] + entry[idx] = cur_task + self.map[id(s)] = entry + self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) + else: + sm = self.map[id(s)] + assert sm[idx] is None + assert sm[1 - idx] is not None + sm[idx] = cur_task + self.poller.modify(s, select.POLLIN | select.POLLOUT) + # Link task to this IOQueue so it can be removed if needed + cur_task.data = self + + def _dequeue(self, s): + del self.map[id(s)] + self.poller.unregister(s) + + def queue_read(self, s): + self._enqueue(s, 0) + + def queue_write(self, s): + self._enqueue(s, 1) + + def remove(self, task): + while True: + del_s = None + for k in self.map: # Iterate without allocating on the heap + q0, q1, s = self.map[k] + if q0 is task or q1 is task: + del_s = s + break + if del_s is not None: + self._dequeue(s) + else: + break + + def wait_io_event(self, dt): + for s, ev in self.poller.ipoll(dt): + sm = self.map[id(s)] + # print('poll', s, sm, ev) + if ev & ~select.POLLOUT and sm[0] is not None: + # POLLIN or error + _task_queue.push(sm[0]) + sm[0] = None + if ev & ~select.POLLIN and sm[1] is not None: + # POLLOUT or error + _task_queue.push(sm[1]) + sm[1] = None + if sm[0] is None and sm[1] is None: + self._dequeue(s) + elif sm[0] is None: + self.poller.modify(s, select.POLLOUT) + else: + self.poller.modify(s, select.POLLIN) + + +################################################################################ +# Main run loop + + +# Ensure the awaitable is a task +def _promote_to_task(aw): + return aw if isinstance(aw, Task) else create_task(aw) + + +# Create and schedule a new task from a coroutine +def create_task(coro): + if not hasattr(coro, "send"): + raise TypeError("coroutine expected") + t = Task(coro, globals()) + _task_queue.push(t) + return t + + +# Keep scheduling tasks until there are none left to schedule +def run_until_complete(main_task=None): + global cur_task + excs_all = (CancelledError, Exception) # To prevent heap allocation in loop + excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop + while True: + # Wait until the head of _task_queue is ready to run + dt = 1 + while dt > 0: + dt = -1 + t = _task_queue.peek() + if t: + # A task waiting on _task_queue; "ph_key" is time to schedule task at + dt = max(0, ticks_diff(t.ph_key, ticks())) + elif not _io_queue.map: + # No tasks can be woken so finished running + return + # print('(poll {})'.format(dt), len(_io_queue.map)) + _io_queue.wait_io_event(dt) + + # Get next task to run and continue it + t = _task_queue.pop() + cur_task = t + try: + # Continue running the coroutine, it's responsible for rescheduling itself + exc = t.data + if not exc: + t.coro.send(None) + else: + # If the task is finished and on the run queue and gets here, then it + # had an exception and was not await'ed on. Throwing into it now will + # raise StopIteration and the code below will catch this and run the + # call_exception_handler function. + t.data = None + t.coro.throw(exc) + except excs_all as er: + # Check the task is not on any event queue + assert t.data is None + # This task is done, check if it's the main task and then loop should stop + if t is main_task: + if isinstance(er, StopIteration): + return er.value + raise er + if t.state: + # Task was running but is now finished. + waiting = False + if t.state is True: + # "None" indicates that the task is complete and not await'ed on (yet). + t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False + waiting = True + else: + # Schedule any other tasks waiting on the completion of this task. + while t.state.peek(): + _task_queue.push(t.state.pop()) + waiting = True + # "False" indicates that the task is complete and has been await'ed on. + t.state = False + if not waiting and not isinstance(er, excs_stop): + # An exception ended this detached task, so queue it for later + # execution to handle the uncaught exception if no other task retrieves + # the exception in the meantime (this is handled by Task.throw). + _task_queue.push(t) + # Save return value of coro to pass up to caller. + t.data = er + elif t.state is None: + # Task is already finished and nothing await'ed on the task, + # so call the exception handler. + _exc_context["exception"] = exc + _exc_context["future"] = t + Loop.call_exception_handler(_exc_context) + + +# Create a new task from a coroutine and run it until it finishes +def run(coro): + return run_until_complete(create_task(coro)) + + +################################################################################ +# Event loop wrapper + + +async def _stopper(): + pass + + +_stop_task = None + + +class Loop: + _exc_handler = None + + def create_task(coro): + return create_task(coro) + + def run_forever(): + global _stop_task + _stop_task = Task(_stopper(), globals()) + run_until_complete(_stop_task) + # TODO should keep running until .stop() is called, even if there're no tasks left + + def run_until_complete(aw): + return run_until_complete(_promote_to_task(aw)) + + def stop(): + global _stop_task + if _stop_task is not None: + _task_queue.push(_stop_task) + # If stop() is called again, do nothing + _stop_task = None + + def close(): + pass + + def set_exception_handler(handler): + Loop._exc_handler = handler + + def get_exception_handler(): + return Loop._exc_handler + + def default_exception_handler(loop, context): + print(context["message"]) + print("future:", context["future"], "coro=", context["future"].coro) + sys.print_exception(context["exception"]) + + def call_exception_handler(context): + (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) + + +# The runq_len and waitq_len arguments are for legacy uasyncio compatibility +def get_event_loop(runq_len=0, waitq_len=0): + return Loop + + +def current_task(): + return cur_task + + +def new_event_loop(): + global _task_queue, _io_queue + # TaskQueue of Task instances + _task_queue = TaskQueue() + # Task queue and poller for stream IO + _io_queue = IOQueue() + return Loop + + +# Initialise default event loop +new_event_loop() diff --git a/extmod/asyncio/event.py b/extmod/asyncio/event.py new file mode 100644 index 000000000..e0b41f732 --- /dev/null +++ b/extmod/asyncio/event.py @@ -0,0 +1,66 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + + +# Event class for primitive events that can be waited on, set, and cleared +class Event: + def __init__(self): + self.state = False # False=unset; True=set + self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event + + def is_set(self): + return self.state + + def set(self): + # Event becomes set, schedule any tasks waiting on it + # Note: This must not be called from anything except the thread running + # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). + while self.waiting.peek(): + core._task_queue.push(self.waiting.pop()) + self.state = True + + def clear(self): + self.state = False + + # async + def wait(self): + if not self.state: + # Event not set, put the calling task on the event's waiting queue + self.waiting.push(core.cur_task) + # Set calling task's data to the event's queue so it can be removed if needed + core.cur_task.data = self.waiting + yield + return True + + +# MicroPython-extension: This can be set from outside the asyncio event loop, +# such as other threads, IRQs or scheduler context. Implementation is a stream +# that asyncio will poll until a flag is set. +# Note: Unlike Event, this is self-clearing after a wait(). +try: + import io + + class ThreadSafeFlag(io.IOBase): + def __init__(self): + self.state = 0 + + def ioctl(self, req, flags): + if req == 3: # MP_STREAM_POLL + return self.state * flags + return None + + def set(self): + self.state = 1 + + def clear(self): + self.state = 0 + + async def wait(self): + if not self.state: + yield core._io_queue.queue_read(self) + self.state = 0 + +except ImportError: + pass diff --git a/extmod/asyncio/funcs.py b/extmod/asyncio/funcs.py new file mode 100644 index 000000000..599091dfb --- /dev/null +++ b/extmod/asyncio/funcs.py @@ -0,0 +1,130 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019-2022 Damien P. George + +from . import core + + +async def _run(waiter, aw): + try: + result = await aw + status = True + except BaseException as er: + result = None + status = er + if waiter.data is None: + # The waiter is still waiting, cancel it. + if waiter.cancel(): + # Waiter was cancelled by us, change its CancelledError to an instance of + # CancelledError that contains the status and result of waiting on aw. + # If the wait_for task subsequently gets cancelled externally then this + # instance will be reset to a CancelledError instance without arguments. + waiter.data = core.CancelledError(status, result) + + +async def wait_for(aw, timeout, sleep=core.sleep): + aw = core._promote_to_task(aw) + if timeout is None: + return await aw + + # Run aw in a separate runner task that manages its exceptions. + runner_task = core.create_task(_run(core.cur_task, aw)) + + try: + # Wait for the timeout to elapse. + await sleep(timeout) + except core.CancelledError as er: + status = er.value + if status is None: + # This wait_for was cancelled externally, so cancel aw and re-raise. + runner_task.cancel() + raise er + elif status is True: + # aw completed successfully and cancelled the sleep, so return aw's result. + return er.args[1] + else: + # aw raised an exception, propagate it out to the caller. + raise status + + # The sleep finished before aw, so cancel aw and raise TimeoutError. + runner_task.cancel() + await runner_task + raise core.TimeoutError + + +def wait_for_ms(aw, timeout): + return wait_for(aw, timeout, core.sleep_ms) + + +class _Remove: + @staticmethod + def remove(t): + pass + + +# async +def gather(*aws, return_exceptions=False): + if not aws: + return [] + + def done(t, er): + # Sub-task "t" has finished, with exception "er". + nonlocal state + if gather_task.data is not _Remove: + # The main gather task has already been scheduled, so do nothing. + # This happens if another sub-task already raised an exception and + # woke the main gather task (via this done function), or if the main + # gather task was cancelled externally. + return + elif not return_exceptions and not isinstance(er, StopIteration): + # A sub-task raised an exception, indicate that to the gather task. + state = er + else: + state -= 1 + if state: + # Still some sub-tasks running. + return + # Gather waiting is done, schedule the main gather task. + core._task_queue.push(gather_task) + + ts = [core._promote_to_task(aw) for aw in aws] + for i in range(len(ts)): + if ts[i].state is not True: + # Task is not running, gather not currently supported for this case. + raise RuntimeError("can't gather") + # Register the callback to call when the task is done. + ts[i].state = done + + # Set the state for execution of the gather. + gather_task = core.cur_task + state = len(ts) + cancel_all = False + + # Wait for the a sub-task to need attention. + gather_task.data = _Remove + try: + yield + except core.CancelledError as er: + cancel_all = True + state = er + + # Clean up tasks. + for i in range(len(ts)): + if ts[i].state is done: + # Sub-task is still running, deregister the callback and cancel if needed. + ts[i].state = True + if cancel_all: + ts[i].cancel() + elif isinstance(ts[i].data, StopIteration): + # Sub-task ran to completion, get its return value. + ts[i] = ts[i].data.value + else: + # Sub-task had an exception with return_exceptions==True, so get its exception. + ts[i] = ts[i].data + + # Either this gather was cancelled, or one of the sub-tasks raised an exception with + # return_exceptions==False, so reraise the exception here. + if state: + raise state + + # Return the list of return values of each sub-task. + return ts diff --git a/extmod/asyncio/lock.py b/extmod/asyncio/lock.py new file mode 100644 index 000000000..0a46ac326 --- /dev/null +++ b/extmod/asyncio/lock.py @@ -0,0 +1,55 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + + +# Lock class for primitive mutex capability +class Lock: + def __init__(self): + # The state can take the following values: + # - 0: unlocked + # - 1: locked + # - <Task>: unlocked but this task has been scheduled to acquire the lock next + self.state = 0 + # Queue of Tasks waiting to acquire this Lock + self.waiting = core.TaskQueue() + + def locked(self): + return self.state == 1 + + def release(self): + if self.state != 1: + raise RuntimeError("Lock not acquired") + if self.waiting.peek(): + # Task(s) waiting on lock, schedule next Task + self.state = self.waiting.pop() + core._task_queue.push(self.state) + else: + # No Task waiting so unlock + self.state = 0 + + # async + def acquire(self): + if self.state != 0: + # Lock unavailable, put the calling Task on the waiting queue + self.waiting.push(core.cur_task) + # Set calling task's data to the lock's queue so it can be removed if needed + core.cur_task.data = self.waiting + try: + yield + except core.CancelledError as er: + if self.state == core.cur_task: + # Cancelled while pending on resume, schedule next waiting Task + self.state = 1 + self.release() + raise er + # Lock available, set it as locked + self.state = 1 + return True + + async def __aenter__(self): + return await self.acquire() + + async def __aexit__(self, exc_type, exc, tb): + return self.release() diff --git a/extmod/asyncio/manifest.py b/extmod/asyncio/manifest.py new file mode 100644 index 000000000..0aba5b185 --- /dev/null +++ b/extmod/asyncio/manifest.py @@ -0,0 +1,15 @@ +# This list of package files doesn't include task.py because that's provided +# by the C module. +package( + "asyncio", + ( + "__init__.py", + "core.py", + "event.py", + "funcs.py", + "lock.py", + "stream.py", + ), + base_path="..", + opt=3, +) diff --git a/extmod/asyncio/stream.py b/extmod/asyncio/stream.py new file mode 100644 index 000000000..c47c48cf0 --- /dev/null +++ b/extmod/asyncio/stream.py @@ -0,0 +1,189 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +from . import core + + +class Stream: + def __init__(self, s, e={}): + self.s = s + self.e = e + self.out_buf = b"" + + def get_extra_info(self, v): + return self.e[v] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.close() + + def close(self): + pass + + async def wait_closed(self): + # TODO yield? + self.s.close() + + # async + def read(self, n=-1): + r = b"" + while True: + yield core._io_queue.queue_read(self.s) + r2 = self.s.read(n) + if r2 is not None: + if n >= 0: + return r2 + if not len(r2): + return r + r += r2 + + # async + def readinto(self, buf): + yield core._io_queue.queue_read(self.s) + return self.s.readinto(buf) + + # async + def readexactly(self, n): + r = b"" + while n: + yield core._io_queue.queue_read(self.s) + r2 = self.s.read(n) + if r2 is not None: + if not len(r2): + raise EOFError + r += r2 + n -= len(r2) + return r + + # async + def readline(self): + l = b"" + while True: + yield core._io_queue.queue_read(self.s) + l2 = self.s.readline() # may do multiple reads but won't block + l += l2 + if not l2 or l[-1] == 10: # \n (check l in case l2 is str) + return l + + def write(self, buf): + if not self.out_buf: + # Try to write immediately to the underlying stream. + ret = self.s.write(buf) + if ret == len(buf): + return + if ret is not None: + buf = buf[ret:] + self.out_buf += buf + + # async + def drain(self): + if not self.out_buf: + # Drain must always yield, so a tight loop of write+drain can't block the scheduler. + return (yield from core.sleep_ms(0)) + mv = memoryview(self.out_buf) + off = 0 + while off < len(mv): + yield core._io_queue.queue_write(self.s) + ret = self.s.write(mv[off:]) + if ret is not None: + off += ret + self.out_buf = b"" + + +# Stream can be used for both reading and writing to save code size +StreamReader = Stream +StreamWriter = Stream + + +# Create a TCP stream connection to a remote host +# +# async +def open_connection(host, port): + from errno import EINPROGRESS + import socket + + ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! + s = socket.socket(ai[0], ai[1], ai[2]) + s.setblocking(False) + ss = Stream(s) + try: + s.connect(ai[-1]) + except OSError as er: + if er.errno != EINPROGRESS: + raise er + yield core._io_queue.queue_write(s) + return ss, ss + + +# Class representing a TCP stream server, can be closed and used in "async with" +class Server: + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + self.close() + await self.wait_closed() + + def close(self): + self.task.cancel() + + async def wait_closed(self): + await self.task + + async def _serve(self, s, cb): + # Accept incoming connections + while True: + try: + yield core._io_queue.queue_read(s) + except core.CancelledError: + # Shutdown server + s.close() + return + try: + s2, addr = s.accept() + except: + # Ignore a failed accept + continue + s2.setblocking(False) + s2s = Stream(s2, {"peername": addr}) + core.create_task(cb(s2s, s2s)) + + +# Helper function to start a TCP stream server, running as a new task +# TODO could use an accept-callback on socket read activity instead of creating a task +async def start_server(cb, host, port, backlog=5): + import socket + + # Create and bind server socket. + host = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(host[-1]) + s.listen(backlog) + + # Create and return server object and task. + srv = Server() + srv.task = core.create_task(srv._serve(s, cb)) + return srv + + +################################################################################ +# Legacy uasyncio compatibility + + +async def stream_awrite(self, buf, off=0, sz=-1): + if off != 0 or sz != -1: + buf = memoryview(buf) + if sz == -1: + sz = len(buf) + buf = buf[off : off + sz] + self.write(buf) + await self.drain() + + +Stream.aclose = Stream.wait_closed +Stream.awrite = stream_awrite +Stream.awritestr = stream_awrite # TODO explicitly convert to bytes? diff --git a/extmod/asyncio/task.py b/extmod/asyncio/task.py new file mode 100644 index 000000000..30be2170e --- /dev/null +++ b/extmod/asyncio/task.py @@ -0,0 +1,177 @@ +# MicroPython asyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George + +# This file contains the core TaskQueue based on a pairing heap, and the core Task class. +# They can optionally be replaced by C implementations. + +from . import core + + +# pairing-heap meld of 2 heaps; O(1) +def ph_meld(h1, h2): + if h1 is None: + return h2 + if h2 is None: + return h1 + lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 + if lt: + if h1.ph_child is None: + h1.ph_child = h2 + else: + h1.ph_child_last.ph_next = h2 + h1.ph_child_last = h2 + h2.ph_next = None + h2.ph_rightmost_parent = h1 + return h1 + else: + h1.ph_next = h2.ph_child + h2.ph_child = h1 + if h1.ph_next is None: + h2.ph_child_last = h1 + h1.ph_rightmost_parent = h2 + return h2 + + +# pairing-heap pairing operation; amortised O(log N) +def ph_pairing(child): + heap = None + while child is not None: + n1 = child + child = child.ph_next + n1.ph_next = None + if child is not None: + n2 = child + child = child.ph_next + n2.ph_next = None + n1 = ph_meld(n1, n2) + heap = ph_meld(heap, n1) + return heap + + +# pairing-heap delete of a node; stable, amortised O(log N) +def ph_delete(heap, node): + if node is heap: + child = heap.ph_child + node.ph_child = None + return ph_pairing(child) + # Find parent of node + parent = node + while parent.ph_next is not None: + parent = parent.ph_next + parent = parent.ph_rightmost_parent + # Replace node with pairing of its children + if node is parent.ph_child and node.ph_child is None: + parent.ph_child = node.ph_next + node.ph_next = None + return heap + elif node is parent.ph_child: + child = node.ph_child + next = node.ph_next + node.ph_child = None + node.ph_next = None + node = ph_pairing(child) + parent.ph_child = node + else: + n = parent.ph_child + while node is not n.ph_next: + n = n.ph_next + child = node.ph_child + next = node.ph_next + node.ph_child = None + node.ph_next = None + node = ph_pairing(child) + if node is None: + node = n + else: + n.ph_next = node + node.ph_next = next + if next is None: + node.ph_rightmost_parent = parent + parent.ph_child_last = node + return heap + + +# TaskQueue class based on the above pairing-heap functions. +class TaskQueue: + def __init__(self): + self.heap = None + + def peek(self): + return self.heap + + def push(self, v, key=None): + assert v.ph_child is None + assert v.ph_next is None + v.data = None + v.ph_key = key if key is not None else core.ticks() + self.heap = ph_meld(v, self.heap) + + def pop(self): + v = self.heap + assert v.ph_next is None + self.heap = ph_pairing(v.ph_child) + v.ph_child = None + return v + + def remove(self, v): + self.heap = ph_delete(self.heap, v) + + +# Task class representing a coroutine, can be waited on and cancelled. +class Task: + def __init__(self, coro, globals=None): + self.coro = coro # Coroutine of this Task + self.data = None # General data for queue it is waiting on + self.state = True # None, False, True, a callable, or a TaskQueue instance + self.ph_key = 0 # Pairing heap + self.ph_child = None # Paring heap + self.ph_child_last = None # Paring heap + self.ph_next = None # Paring heap + self.ph_rightmost_parent = None # Paring heap + + def __iter__(self): + if not self.state: + # Task finished, signal that is has been await'ed on. + self.state = False + elif self.state is True: + # Allocated head of linked list of Tasks waiting on completion of this task. + self.state = TaskQueue() + elif type(self.state) is not TaskQueue: + # Task has state used for another purpose, so can't also wait on it. + raise RuntimeError("can't wait") + return self + + def __next__(self): + if not self.state: + # Task finished, raise return value to caller so it can continue. + raise self.data + else: + # Put calling task on waiting queue. + self.state.push(core.cur_task) + # Set calling task's data to this task that it waits on, to double-link it. + core.cur_task.data = self + + def done(self): + return not self.state + + def cancel(self): + # Check if task is already finished. + if not self.state: + return False + # Can't cancel self (not supported yet). + if self is core.cur_task: + raise RuntimeError("can't cancel self") + # If Task waits on another task then forward the cancel to the one it's waiting on. + while isinstance(self.data, Task): + self = self.data + # Reschedule Task as a cancelled task. + if hasattr(self.data, "remove"): + # Not on the main running queue, remove the task from the queue it's on. + self.data.remove(self) + core._task_queue.push(self) + elif core.ticks_diff(self.ph_key, core.ticks()) > 0: + # On the main running queue but scheduled in the future, so bring it forward to now. + core._task_queue.remove(self) + core._task_queue.push(self) + self.data = core.CancelledError + return True |
