diff options
author | Jim Mussared <jim.mussared@gmail.com> | 2019-10-15 17:45:21 +1100 |
---|---|---|
committer | Damien George <damien.p.george@gmail.com> | 2019-10-18 13:36:51 +1100 |
commit | 25a228af7e5809fe3a582f665c13a2d38eb5ed40 (patch) | |
tree | d6d81589b78df5989e7a123b4b339f6eca834c12 /examples/bluetooth/ble_uart_repl.py | |
parent | ebf8332104c1c6fbd01b521a713e49d040c62920 (diff) |
examples/bluetooth: Add basic BLE peripheral examples.
Consisting of:
- ble_advertising.py -- helper to generate advertising payload.
- ble_temperature.py -- simple temperature device.
- ble_uart_periperhal.py -- BLE UART wrapper.
- ble_uart_repl.py -- dupterm-compatible uart.
Diffstat (limited to 'examples/bluetooth/ble_uart_repl.py')
-rw-r--r-- | examples/bluetooth/ble_uart_repl.py | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_uart_repl.py b/examples/bluetooth/ble_uart_repl.py new file mode 100644 index 000000000..430a571a6 --- /dev/null +++ b/examples/bluetooth/ble_uart_repl.py @@ -0,0 +1,80 @@ +# Proof-of-concept of a REPL over BLE UART. +# +# Tested with the Adafruit Bluefruit app on Android. +# Set the EoL characters to \r\n. + +import bluetooth +import io +import os +import micropython +import machine + +from ble_uart_peripheral import BLEUART + +_MP_STREAM_POLL = const(3) +_MP_STREAM_POLL_RD = const(0x0001) + +# TODO: Remove this when STM32 gets machine.Timer. +if hasattr(machine, 'Timer'): + _timer = machine.Timer(-1) +else: + _timer = None + +# Batch writes into 50ms intervals. +def schedule_in(handler, delay_ms): + def _wrap(_arg): + handler() + if _timer: + _timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap) + else: + micropython.schedule(_wrap, None) + +# Simple buffering stream to support the dupterm requirements. +class BLEUARTStream(io.IOBase): + def __init__(self, uart): + self._uart = uart + self._tx_buf = bytearray() + self._uart.irq(self._on_rx) + + def _on_rx(self): + # Needed for ESP32. + if hasattr(os, 'dupterm_notify'): + os.dupterm_notify(None) + + def read(self, sz=None): + return self._uart.read(sz) + + def readinto(self, buf): + avail = self._uart.read(len(buf)) + if not avail: + return None + for i in range(len(avail)): + buf[i] = avail[i] + return len(avail) + + def ioctl(self, op, arg): + if op == _MP_STREAM_POLL: + if self._uart.any(): + return _MP_STREAM_POLL_RD + return 0 + + def _flush(self): + data = self._tx_buf[0:100] + self._tx_buf = self._tx_buf[100:] + self._uart.write(data) + if self._tx_buf: + schedule_in(self._flush, 50) + + def write(self, buf): + empty = not self._tx_buf + self._tx_buf += buf + if empty: + schedule_in(self._flush, 50) + + +def start(): + ble = bluetooth.BLE() + uart = BLEUART(ble, name='mpy-repl') + stream = BLEUARTStream(uart) + + os.dupterm(stream) |