diff options
author | Jim Mussared <jim.mussared@gmail.com> | 2020-06-17 14:57:52 +1000 |
---|---|---|
committer | Damien George <damien@micropython.org> | 2020-07-18 14:34:29 +1000 |
commit | 89a95b7c85cd90967020b80a0a4649b2f745bea8 (patch) | |
tree | e73ea8bdbe5062025b40c4ff8e841772172b31cc /examples/bluetooth/ble_simple_peripheral.py | |
parent | e152d0c19769f26e5416f86fe81afb9376fb531b (diff) |
examples/bluetooth: Add simple UART demo with central and peripheral.
Diffstat (limited to 'examples/bluetooth/ble_simple_peripheral.py')
-rw-r--r-- | examples/bluetooth/ble_simple_peripheral.py | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_simple_peripheral.py b/examples/bluetooth/ble_simple_peripheral.py new file mode 100644 index 000000000..08cd7fa98 --- /dev/null +++ b/examples/bluetooth/ble_simple_peripheral.py @@ -0,0 +1,98 @@ +# This example demonstrates a UART periperhal. + +import bluetooth +import random +import struct +import time +from ble_advertising import advertising_payload + +from micropython import const + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) +_IRQ_GATTS_WRITE = const(3) + +_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") +_UART_TX = ( + bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), + bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, +) +_UART_RX = ( + bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), + bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE, +) +_UART_SERVICE = ( + _UART_UUID, + (_UART_TX, _UART_RX,), +) + + +class BLESimplePeripheral: + def __init__(self, ble, name="mpy-uart"): + self._ble = ble + self._ble.active(True) + self._ble.irq(handler=self._irq) + ((self._handle_tx, self._handle_rx,),) = self._ble.gatts_register_services( + (_UART_SERVICE,) + ) + self._connections = set() + self._write_callback = None + self._payload = advertising_payload(name=name, services=[_UART_UUID],) + self._advertise() + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _, = data + print("New connection", conn_handle) + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _, = data + print("Disconnected", conn_handle) + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + elif event == _IRQ_GATTS_WRITE: + conn_handle, value_handle = data + value = self._ble.gatts_read(value_handle) + if value_handle == self._handle_rx and self._write_callback: + self._write_callback(value) + + def send(self, data): + for conn_handle in self._connections: + self._ble.gatts_notify(conn_handle, self._handle_tx, data) + + def is_connected(self): + return len(self._connections) > 0 + + def _advertise(self, interval_us=500000): + print("Starting advertising") + self._ble.gap_advertise(interval_us, adv_data=self._payload) + + def on_write(self, callback): + self._write_callback = callback + + +def demo(): + ble = bluetooth.BLE() + p = BLESimplePeripheral(ble) + + def on_rx(v): + print("RX", v) + + p.on_write(on_rx) + + i = 0 + while True: + if p.is_connected(): + # Short burst of queued notifications. + for _ in range(3): + data = str(i) + "_" + print("TX", data) + p.send(data) + i += 1 + time.sleep_ms(100) + + +if __name__ == "__main__": + demo() |