summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien George <damien.p.george@gmail.com>2020-03-12 16:46:20 +1100
committerDamien George <damien.p.george@gmail.com>2020-03-26 01:25:45 +1100
commitbc009fdd62f913e36443f8267ffb6133f537fff3 (patch)
tree977959e2e1f54abaf233924188e8fcc5e61c8ab8
parent081d06766223b326b6d7eeceae817b7a3a3f57b0 (diff)
extmod/uasyncio: Add optional implementation of core uasyncio in C.
Implements Task and TaskQueue classes in C, using a pairing-heap data structure. Using this reduces RAM use of each Task, and improves overall performance of the uasyncio scheduler.
-rw-r--r--extmod/moduasyncio.c294
-rw-r--r--extmod/uasyncio/core.py7
-rw-r--r--py/builtin.h1
-rw-r--r--py/mpconfig.h4
-rw-r--r--py/objmodule.c3
-rw-r--r--py/py.mk1
6 files changed, 308 insertions, 2 deletions
diff --git a/extmod/moduasyncio.c b/extmod/moduasyncio.c
new file mode 100644
index 000000000..a1aecc5d4
--- /dev/null
+++ b/extmod/moduasyncio.c
@@ -0,0 +1,294 @@
+/*
+ * This file is part of the MicroPython project, http://micropython.org/
+ *
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2020 Damien P. George
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "py/runtime.h"
+#include "py/smallint.h"
+#include "py/pairheap.h"
+#include "py/mphal.h"
+
+#if MICROPY_PY_UASYNCIO
+
+typedef struct _mp_obj_task_t {
+ mp_pairheap_t pairheap;
+ mp_obj_t coro;
+ mp_obj_t data;
+ mp_obj_t waiting;
+
+ mp_obj_t ph_key;
+} mp_obj_task_t;
+
+typedef struct _mp_obj_task_queue_t {
+ mp_obj_base_t base;
+ mp_obj_task_t *heap;
+} mp_obj_task_queue_t;
+
+STATIC const mp_obj_type_t task_queue_type;
+STATIC const mp_obj_type_t task_type;
+
+STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args);
+
+/******************************************************************************/
+// Ticks for task ordering in pairing heap
+
+STATIC mp_obj_t ticks(void) {
+ return MP_OBJ_NEW_SMALL_INT(mp_hal_ticks_ms() & (MICROPY_PY_UTIME_TICKS_PERIOD - 1));
+}
+
+STATIC mp_int_t ticks_diff(mp_obj_t t1_in, mp_obj_t t0_in) {
+ mp_uint_t t0 = MP_OBJ_SMALL_INT_VALUE(t0_in);
+ mp_uint_t t1 = MP_OBJ_SMALL_INT_VALUE(t1_in);
+ mp_int_t diff = ((t1 - t0 + MICROPY_PY_UTIME_TICKS_PERIOD / 2) & (MICROPY_PY_UTIME_TICKS_PERIOD - 1))
+ - MICROPY_PY_UTIME_TICKS_PERIOD / 2;
+ return diff;
+}
+
+STATIC int task_lt(mp_pairheap_t *n1, mp_pairheap_t *n2) {
+ mp_obj_task_t *t1 = (mp_obj_task_t *)n1;
+ mp_obj_task_t *t2 = (mp_obj_task_t *)n2;
+ return MP_OBJ_SMALL_INT_VALUE(ticks_diff(t1->ph_key, t2->ph_key)) < 0;
+}
+
+/******************************************************************************/
+// TaskQueue class
+
+STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
+ (void)args;
+ mp_arg_check_num(n_args, n_kw, 0, 0, false);
+ mp_obj_task_queue_t *self = m_new_obj(mp_obj_task_queue_t);
+ self->base.type = type;
+ self->heap = (mp_obj_task_t *)mp_pairheap_new(task_lt);
+ return MP_OBJ_FROM_PTR(self);
+}
+
+STATIC mp_obj_t task_queue_peek(mp_obj_t self_in) {
+ mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
+ if (self->heap == NULL) {
+ return mp_const_none;
+ } else {
+ return MP_OBJ_FROM_PTR(self->heap);
+ }
+}
+STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_peek_obj, task_queue_peek);
+
+STATIC mp_obj_t task_queue_push_sorted(size_t n_args, const mp_obj_t *args) {
+ mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(args[0]);
+ mp_obj_task_t *task = MP_OBJ_TO_PTR(args[1]);
+ task->data = mp_const_none;
+ if (n_args == 2) {
+ task->ph_key = ticks();
+ } else {
+ assert(mp_obj_is_small_int(args[2]));
+ task->ph_key = args[2];
+ }
+ self->heap = (mp_obj_task_t *)mp_pairheap_push(task_lt, &self->heap->pairheap, &task->pairheap);
+ return mp_const_none;
+}
+STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_sorted_obj, 2, 3, task_queue_push_sorted);
+
+STATIC mp_obj_t task_queue_pop_head(mp_obj_t self_in) {
+ mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
+ mp_obj_task_t *head = (mp_obj_task_t *)mp_pairheap_peek(task_lt, &self->heap->pairheap);
+ if (head == NULL) {
+ mp_raise_msg(&mp_type_IndexError, "empty heap");
+ }
+ self->heap = (mp_obj_task_t *)mp_pairheap_pop(task_lt, &self->heap->pairheap);
+ return MP_OBJ_FROM_PTR(head);
+}
+STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_head_obj, task_queue_pop_head);
+
+STATIC mp_obj_t task_queue_remove(mp_obj_t self_in, mp_obj_t task_in) {
+ mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
+ mp_obj_task_t *task = MP_OBJ_TO_PTR(task_in);
+ self->heap = (mp_obj_task_t *)mp_pairheap_delete(task_lt, &self->heap->pairheap, &task->pairheap);
+ return mp_const_none;
+}
+STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_queue_remove_obj, task_queue_remove);
+
+STATIC const mp_rom_map_elem_t task_queue_locals_dict_table[] = {
+ { MP_ROM_QSTR(MP_QSTR_peek), MP_ROM_PTR(&task_queue_peek_obj) },
+ { MP_ROM_QSTR(MP_QSTR_push_sorted), MP_ROM_PTR(&task_queue_push_sorted_obj) },
+ { MP_ROM_QSTR(MP_QSTR_push_head), MP_ROM_PTR(&task_queue_push_sorted_obj) },
+ { MP_ROM_QSTR(MP_QSTR_pop_head), MP_ROM_PTR(&task_queue_pop_head_obj) },
+ { MP_ROM_QSTR(MP_QSTR_remove), MP_ROM_PTR(&task_queue_remove_obj) },
+};
+STATIC MP_DEFINE_CONST_DICT(task_queue_locals_dict, task_queue_locals_dict_table);
+
+STATIC const mp_obj_type_t task_queue_type = {
+ { &mp_type_type },
+ .name = MP_QSTR_TaskQueue,
+ .make_new = task_queue_make_new,
+ .locals_dict = (mp_obj_dict_t *)&task_queue_locals_dict,
+};
+
+/******************************************************************************/
+// Task class
+
+// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
+STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
+
+STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
+ mp_arg_check_num(n_args, n_kw, 1, 2, false);
+ mp_obj_task_t *self = m_new_obj(mp_obj_task_t);
+ self->pairheap.base.type = type;
+ mp_pairheap_init_node(task_lt, &self->pairheap);
+ self->coro = args[0];
+ self->data = mp_const_none;
+ self->waiting = mp_const_none;
+ self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
+ if (n_args == 2) {
+ uasyncio_context = args[1];
+ }
+ return MP_OBJ_FROM_PTR(self);
+}
+
+STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
+ mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
+ // Check if task is already finished.
+ if (self->coro == mp_const_none) {
+ return mp_const_false;
+ }
+ // Can't cancel self (not supported yet).
+ mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
+ if (self_in == cur_task) {
+ mp_raise_msg(&mp_type_RuntimeError, "cannot cancel self");
+ }
+ // If Task waits on another task then forward the cancel to the one it's waiting on.
+ while (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&task_type))) {
+ self = MP_OBJ_TO_PTR(self->data);
+ }
+
+ mp_obj_t _task_queue = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__task_queue));
+
+ // Reschedule Task as a cancelled task.
+ mp_obj_t dest[3];
+ mp_load_method_maybe(self->data, MP_QSTR_remove, dest);
+ if (dest[0] != MP_OBJ_NULL) {
+ // Not on the main running queue, remove the task from the queue it's on.
+ dest[2] = MP_OBJ_FROM_PTR(self);
+ mp_call_method_n_kw(1, 0, dest);
+ // _task_queue.push_head(self)
+ dest[0] = _task_queue;
+ dest[1] = MP_OBJ_FROM_PTR(self);
+ task_queue_push_sorted(2, dest);
+ } else if (ticks_diff(self->ph_key, ticks()) > 0) {
+ // On the main running queue but scheduled in the future, so bring it forward to now.
+ // _task_queue.remove(self)
+ task_queue_remove(_task_queue, MP_OBJ_FROM_PTR(self));
+ // _task_queue.push_head(self)
+ dest[0] = _task_queue;
+ dest[1] = MP_OBJ_FROM_PTR(self);
+ task_queue_push_sorted(2, dest);
+ }
+
+ self->data = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError));
+
+ return mp_const_true;
+}
+STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
+
+STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
+ mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
+ if (dest[0] == MP_OBJ_NULL) {
+ // Load
+ if (attr == MP_QSTR_coro) {
+ dest[0] = self->coro;
+ } else if (attr == MP_QSTR_data) {
+ dest[0] = self->data;
+ } else if (attr == MP_QSTR_waiting) {
+ if (self->waiting != mp_const_none) {
+ dest[0] = self->waiting;
+ }
+ } else if (attr == MP_QSTR_cancel) {
+ dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
+ dest[1] = self_in;
+ } else if (attr == MP_QSTR_ph_key) {
+ dest[0] = self->ph_key;
+ }
+ } else if (dest[1] != MP_OBJ_NULL) {
+ // Store
+ if (attr == MP_QSTR_coro) {
+ self->coro = dest[1];
+ dest[0] = MP_OBJ_NULL;
+ } else if (attr == MP_QSTR_data) {
+ self->data = dest[1];
+ dest[0] = MP_OBJ_NULL;
+ } else if (attr == MP_QSTR_waiting) {
+ self->waiting = dest[1];
+ dest[0] = MP_OBJ_NULL;
+ }
+ }
+}
+
+STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
+ (void)iter_buf;
+ mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
+ if (self->waiting == mp_const_none) {
+ self->waiting = task_queue_make_new(&task_queue_type, 0, 0, NULL);
+ }
+ return self_in;
+}
+
+STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
+ mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
+ if (self->coro == mp_const_none) {
+ // Task finished, raise return value to caller so it can continue.
+ nlr_raise(self->data);
+ } else {
+ // Put calling task on waiting queue.
+ mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
+ mp_obj_t args[2] = { self->waiting, cur_task };
+ task_queue_push_sorted(2, args);
+ // Set calling task's data to this task that it waits on, to double-link it.
+ ((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
+ }
+ return mp_const_none;
+}
+
+STATIC const mp_obj_type_t task_type = {
+ { &mp_type_type },
+ .name = MP_QSTR_Task,
+ .make_new = task_make_new,
+ .attr = task_attr,
+ .getiter = task_getiter,
+ .iternext = task_iternext,
+};
+
+/******************************************************************************/
+// C-level uasyncio module
+
+STATIC const mp_rom_map_elem_t mp_module_uasyncio_globals_table[] = {
+ { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR__uasyncio) },
+ { MP_ROM_QSTR(MP_QSTR_TaskQueue), MP_ROM_PTR(&task_queue_type) },
+ { MP_ROM_QSTR(MP_QSTR_Task), MP_ROM_PTR(&task_type) },
+};
+STATIC MP_DEFINE_CONST_DICT(mp_module_uasyncio_globals, mp_module_uasyncio_globals_table);
+
+const mp_obj_module_t mp_module_uasyncio = {
+ .base = { &mp_type_module },
+ .globals = (mp_obj_dict_t *)&mp_module_uasyncio_globals,
+};
+
+#endif // MICROPY_PY_UASYNCIO
diff --git a/extmod/uasyncio/core.py b/extmod/uasyncio/core.py
index 049e2537f..5d7191d7f 100644
--- a/extmod/uasyncio/core.py
+++ b/extmod/uasyncio/core.py
@@ -4,8 +4,11 @@
from time import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select
-# Import TaskQueue and Task
-from .task import TaskQueue, Task
+# Import TaskQueue and Task, preferring built-in C code over Python code
+try:
+ from _uasyncio import TaskQueue, Task
+except:
+ from .task import TaskQueue, Task
################################################################################
diff --git a/py/builtin.h b/py/builtin.h
index 2dbe8a782..1e4769cd6 100644
--- a/py/builtin.h
+++ b/py/builtin.h
@@ -103,6 +103,7 @@ extern const mp_obj_module_t mp_module_thread;
extern const mp_obj_dict_t mp_module_builtins_globals;
// extmod modules
+extern const mp_obj_module_t mp_module_uasyncio;
extern const mp_obj_module_t mp_module_uerrno;
extern const mp_obj_module_t mp_module_uctypes;
extern const mp_obj_module_t mp_module_uzlib;
diff --git a/py/mpconfig.h b/py/mpconfig.h
index c5829a3e0..d96687b81 100644
--- a/py/mpconfig.h
+++ b/py/mpconfig.h
@@ -1291,6 +1291,10 @@ typedef double mp_float_t;
// Extended modules
+#ifndef MICROPY_PY_UASYNCIO
+#define MICROPY_PY_UASYNCIO (0)
+#endif
+
#ifndef MICROPY_PY_UCTYPES
#define MICROPY_PY_UCTYPES (0)
#endif
diff --git a/py/objmodule.c b/py/objmodule.c
index 79047009f..060e1bc1e 100644
--- a/py/objmodule.c
+++ b/py/objmodule.c
@@ -170,6 +170,9 @@ STATIC const mp_rom_map_elem_t mp_builtin_module_table[] = {
// extmod modules
+ #if MICROPY_PY_UASYNCIO
+ { MP_ROM_QSTR(MP_QSTR__uasyncio), MP_ROM_PTR(&mp_module_uasyncio) },
+ #endif
#if MICROPY_PY_UERRNO
{ MP_ROM_QSTR(MP_QSTR_uerrno), MP_ROM_PTR(&mp_module_uerrno) },
#endif
diff --git a/py/py.mk b/py/py.mk
index 1a56dcce8..8c90beff7 100644
--- a/py/py.mk
+++ b/py/py.mk
@@ -168,6 +168,7 @@ PY_CORE_O_BASENAME = $(addprefix py/,\
)
PY_EXTMOD_O_BASENAME = \
+ extmod/moduasyncio.o \
extmod/moductypes.o \
extmod/modujson.o \
extmod/modure.o \