summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--extmod/moduasyncio.c69
-rw-r--r--extmod/uasyncio/core.py46
-rw-r--r--extmod/uasyncio/task.py31
-rw-r--r--tests/extmod/uasyncio_cancel_wait_on_finished.py41
-rw-r--r--tests/extmod/uasyncio_cancel_wait_on_finished.py.exp7
5 files changed, 111 insertions, 83 deletions
diff --git a/extmod/moduasyncio.c b/extmod/moduasyncio.c
index fe0f748ca..9717e3856 100644
--- a/extmod/moduasyncio.c
+++ b/extmod/moduasyncio.c
@@ -31,12 +31,19 @@
#if MICROPY_PY_UASYNCIO
+#define TASK_STATE_RUNNING_NOT_WAITED_ON (mp_const_true)
+#define TASK_STATE_DONE_NOT_WAITED_ON (mp_const_none)
+#define TASK_STATE_DONE_WAS_WAITED_ON (mp_const_false)
+
+#define TASK_IS_DONE(task) ( \
+ (task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
+ || (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)
+
typedef struct _mp_obj_task_t {
mp_pairheap_t pairheap;
mp_obj_t coro;
mp_obj_t data;
- mp_obj_t waiting;
-
+ mp_obj_t state;
mp_obj_t ph_key;
} mp_obj_task_t;
@@ -146,9 +153,6 @@ STATIC const mp_obj_type_t task_queue_type = {
/******************************************************************************/
// Task class
-// For efficiency, the task object is stored to the coro entry when the task is done.
-#define TASK_IS_DONE(task) ((task)->coro == MP_OBJ_FROM_PTR(task))
-
// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
@@ -159,7 +163,7 @@ STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n
mp_pairheap_init_node(task_lt, &self->pairheap);
self->coro = args[0];
self->data = mp_const_none;
- self->waiting = mp_const_none;
+ self->state = TASK_STATE_RUNNING_NOT_WAITED_ON;
self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
if (n_args == 2) {
uasyncio_context = args[1];
@@ -218,24 +222,6 @@ STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
-STATIC mp_obj_t task_throw(mp_obj_t self_in, mp_obj_t value_in) {
- // This task raised an exception which was uncaught; handle that now.
- mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
- // Set the data because it was cleared by the main scheduling loop.
- self->data = value_in;
- if (self->waiting == mp_const_none) {
- // Nothing await'ed on the task so call the exception handler.
- mp_obj_t _exc_context = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__exc_context));
- mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_exception), value_in);
- mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_future), self_in);
- mp_obj_t Loop = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_Loop));
- mp_obj_t call_exception_handler = mp_load_attr(Loop, MP_QSTR_call_exception_handler);
- mp_call_function_1(call_exception_handler, _exc_context);
- }
- return mp_const_none;
-}
-STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_throw_obj, task_throw);
-
STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
if (dest[0] == MP_OBJ_NULL) {
@@ -244,32 +230,24 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
dest[0] = self->coro;
} else if (attr == MP_QSTR_data) {
dest[0] = self->data;
- } else if (attr == MP_QSTR_waiting) {
- if (self->waiting != mp_const_none && self->waiting != mp_const_false) {
- dest[0] = self->waiting;
- }
+ } else if (attr == MP_QSTR_state) {
+ dest[0] = self->state;
} else if (attr == MP_QSTR_done) {
dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
dest[1] = self_in;
} else if (attr == MP_QSTR_cancel) {
dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
dest[1] = self_in;
- } else if (attr == MP_QSTR_throw) {
- dest[0] = MP_OBJ_FROM_PTR(&task_throw_obj);
- dest[1] = self_in;
} else if (attr == MP_QSTR_ph_key) {
dest[0] = self->ph_key;
}
} else if (dest[1] != MP_OBJ_NULL) {
// Store
- if (attr == MP_QSTR_coro) {
- self->coro = dest[1];
- dest[0] = MP_OBJ_NULL;
- } else if (attr == MP_QSTR_data) {
+ if (attr == MP_QSTR_data) {
self->data = dest[1];
dest[0] = MP_OBJ_NULL;
- } else if (attr == MP_QSTR_waiting) {
- self->waiting = dest[1];
+ } else if (attr == MP_QSTR_state) {
+ self->state = dest[1];
dest[0] = MP_OBJ_NULL;
}
}
@@ -278,15 +256,12 @@ STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
(void)iter_buf;
mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
- if (self->waiting == mp_const_none) {
- // The is the first access of the "waiting" entry.
- if (TASK_IS_DONE(self)) {
- // Signal that the completed-task has been await'ed on.
- self->waiting = mp_const_false;
- } else {
- // Lazily allocate the waiting queue.
- self->waiting = task_queue_make_new(&task_queue_type, 0, 0, NULL);
- }
+ if (TASK_IS_DONE(self)) {
+ // Signal that the completed-task has been await'ed on.
+ self->state = TASK_STATE_DONE_WAS_WAITED_ON;
+ } else if (self->state == TASK_STATE_RUNNING_NOT_WAITED_ON) {
+ // Allocate the waiting queue.
+ self->state = task_queue_make_new(&task_queue_type, 0, 0, NULL);
}
return self_in;
}
@@ -299,7 +274,7 @@ STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
} else {
// Put calling task on waiting queue.
mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
- mp_obj_t args[2] = { self->waiting, cur_task };
+ mp_obj_t args[2] = { self->state, cur_task };
task_queue_push_sorted(2, args);
// Set calling task's data to this task that it waits on, to double-link it.
((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py
index d74763f6a..12833cf0c 100644
--- a/extmod/uasyncio/core.py
+++ b/extmod/uasyncio/core.py
@@ -175,6 +175,10 @@ def run_until_complete(main_task=None):
if not exc:
t.coro.send(None)
else:
+ # If the task is finished and on the run queue and gets here, then it
+ # had an exception and was not await'ed on. Throwing into it now will
+ # raise StopIteration and the code below will catch this and run the
+ # call_exception_handler function.
t.data = None
t.coro.throw(exc)
except excs_all as er:
@@ -185,22 +189,32 @@ def run_until_complete(main_task=None):
if isinstance(er, StopIteration):
return er.value
raise er
- # Schedule any other tasks waiting on the completion of this task
- waiting = False
- if hasattr(t, "waiting"):
- while t.waiting.peek():
- _task_queue.push_head(t.waiting.pop_head())
- waiting = True
- t.waiting = None # Free waiting queue head
- if not waiting and not isinstance(er, excs_stop):
- # An exception ended this detached task, so queue it for later
- # execution to handle the uncaught exception if no other task retrieves
- # the exception in the meantime (this is handled by Task.throw).
- _task_queue.push_head(t)
- # Indicate task is done by setting coro to the task object itself
- t.coro = t
- # Save return value of coro to pass up to caller
- t.data = er
+ if t.state:
+ # Task was running but is now finished.
+ waiting = False
+ if t.state is True:
+ # "None" indicates that the task is complete and not await'ed on (yet).
+ t.state = None
+ else:
+ # Schedule any other tasks waiting on the completion of this task.
+ while t.state.peek():
+ _task_queue.push_head(t.state.pop_head())
+ waiting = True
+ # "False" indicates that the task is complete and has been await'ed on.
+ t.state = False
+ if not waiting and not isinstance(er, excs_stop):
+ # An exception ended this detached task, so queue it for later
+ # execution to handle the uncaught exception if no other task retrieves
+ # the exception in the meantime (this is handled by Task.throw).
+ _task_queue.push_head(t)
+ # Save return value of coro to pass up to caller.
+ t.data = er
+ elif t.state is None:
+ # Task is already finished and nothing await'ed on the task,
+ # so call the exception handler.
+ _exc_context["exception"] = exc
+ _exc_context["future"] = t
+ Loop.call_exception_handler(_exc_context)
# Create a new task from a coroutine and run it until it finishes
diff --git a/extmod/uasyncio/task.py b/extmod/uasyncio/task.py
index 68ddf496f..26df7b172 100644
--- a/extmod/uasyncio/task.py
+++ b/extmod/uasyncio/task.py
@@ -123,6 +123,7 @@ class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
+ self.state = True # None, False, True or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
@@ -130,30 +131,30 @@ class Task:
self.ph_rightmost_parent = None # Paring heap
def __iter__(self):
- if self.coro is self:
- # Signal that the completed-task has been await'ed on.
- self.waiting = None
- elif not hasattr(self, "waiting"):
- # Lazily allocated head of linked list of Tasks waiting on completion of this task.
- self.waiting = TaskQueue()
+ if not self.state:
+ # Task finished, signal that is has been await'ed on.
+ self.state = False
+ elif self.state is True:
+ # Allocated head of linked list of Tasks waiting on completion of this task.
+ self.state = TaskQueue()
return self
def __next__(self):
- if self.coro is self:
+ if not self.state:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
- self.waiting.push_head(core.cur_task)
+ self.state.push_head(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self
def done(self):
- return self.coro is self
+ return not self.state
def cancel(self):
# Check if task is already finished.
- if self.coro is self:
+ if not self.state:
return False
# Can't cancel self (not supported yet).
if self is core.cur_task:
@@ -172,13 +173,3 @@ class Task:
core._task_queue.push_head(self)
self.data = core.CancelledError
return True
-
- def throw(self, value):
- # This task raised an exception which was uncaught; handle that now.
- # Set the data because it was cleared by the main scheduling loop.
- self.data = value
- if not hasattr(self, "waiting"):
- # Nothing await'ed on the task so call the exception handler.
- core._exc_context["exception"] = value
- core._exc_context["future"] = self
- core.Loop.call_exception_handler(core._exc_context)
diff --git a/tests/extmod/uasyncio_cancel_wait_on_finished.py b/tests/extmod/uasyncio_cancel_wait_on_finished.py
new file mode 100644
index 000000000..66b36dd60
--- /dev/null
+++ b/tests/extmod/uasyncio_cancel_wait_on_finished.py
@@ -0,0 +1,41 @@
+# Test cancelling a task that is waiting on a task that just finishes.
+
+try:
+ import uasyncio as asyncio
+except ImportError:
+ try:
+ import asyncio
+ except ImportError:
+ print("SKIP")
+ raise SystemExit
+
+
+async def sleep_task():
+ print("sleep_task sleep")
+ await asyncio.sleep(0)
+ print("sleep_task wake")
+
+
+async def wait_task(t):
+ print("wait_task wait")
+ await t
+ print("wait_task wake")
+
+
+async def main():
+ waiting_task = asyncio.create_task(wait_task(asyncio.create_task(sleep_task())))
+
+ print("main sleep")
+ await asyncio.sleep(0)
+ print("main sleep")
+ await asyncio.sleep(0)
+
+ waiting_task.cancel()
+ print("main wait")
+ try:
+ await waiting_task
+ except asyncio.CancelledError as er:
+ print(repr(er))
+
+
+asyncio.run(main())
diff --git a/tests/extmod/uasyncio_cancel_wait_on_finished.py.exp b/tests/extmod/uasyncio_cancel_wait_on_finished.py.exp
new file mode 100644
index 000000000..60e871bfe
--- /dev/null
+++ b/tests/extmod/uasyncio_cancel_wait_on_finished.py.exp
@@ -0,0 +1,7 @@
+main sleep
+sleep_task sleep
+wait_task wait
+main sleep
+sleep_task wake
+main wait
+CancelledError()