summaryrefslogtreecommitdiff
path: root/examples/bluetooth/ble_bonding_peripheral.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/bluetooth/ble_bonding_peripheral.py')
-rw-r--r--examples/bluetooth/ble_bonding_peripheral.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/examples/bluetooth/ble_bonding_peripheral.py b/examples/bluetooth/ble_bonding_peripheral.py
new file mode 100644
index 000000000..bd7596dbc
--- /dev/null
+++ b/examples/bluetooth/ble_bonding_peripheral.py
@@ -0,0 +1,194 @@
+# This example demonstrates a simple temperature sensor peripheral.
+#
+# The sensor's local value updates every second, and it will notify
+# any connected central every 10 seconds.
+#
+# Work-in-progress demo of implementing bonding and passkey auth.
+
+import bluetooth
+import random
+import struct
+import time
+import json
+import binascii
+from ble_advertising import advertising_payload
+
+from micropython import const
+
+_IRQ_CENTRAL_CONNECT = const(1)
+_IRQ_CENTRAL_DISCONNECT = const(2)
+_IRQ_GATTS_INDICATE_DONE = const(20)
+
+_IRQ_ENCRYPTION_UPDATE = const(28)
+_IRQ_PASSKEY_ACTION = const(31)
+
+_IRQ_GET_SECRET = const(29)
+_IRQ_SET_SECRET = const(30)
+
+_FLAG_READ = const(0x0002)
+_FLAG_NOTIFY = const(0x0010)
+_FLAG_INDICATE = const(0x0020)
+
+_FLAG_READ_ENCRYPTED = const(0x0200)
+
+# org.bluetooth.service.environmental_sensing
+_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
+# org.bluetooth.characteristic.temperature
+_TEMP_CHAR = (
+ bluetooth.UUID(0x2A6E),
+ _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE | _FLAG_READ_ENCRYPTED,
+)
+_ENV_SENSE_SERVICE = (
+ _ENV_SENSE_UUID,
+ (_TEMP_CHAR,),
+)
+
+# org.bluetooth.characteristic.gap.appearance.xml
+_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
+
+_IO_CAPABILITY_DISPLAY_ONLY = const(0)
+_IO_CAPABILITY_DISPLAY_YESNO = const(1)
+_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
+_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
+_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
+
+_PASSKEY_ACTION_INPUT = const(2)
+_PASSKEY_ACTION_DISP = const(3)
+_PASSKEY_ACTION_NUMCMP = const(4)
+
+
+class BLETemperature:
+ def __init__(self, ble, name="mpy-temp"):
+ self._ble = ble
+ self._load_secrets()
+ self._ble.irq(self._irq)
+ self._ble.config(bond=True)
+ self._ble.config(le_secure=True)
+ self._ble.config(mitm=True)
+ self._ble.config(io=_IO_CAPABILITY_DISPLAY_YESNO)
+ self._ble.active(True)
+ self._ble.config(addr_mode=2)
+ ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
+ self._connections = set()
+ self._payload = advertising_payload(
+ name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
+ )
+ self._advertise()
+
+ def _irq(self, event, data):
+ # Track connections so we can send notifications.
+ if event == _IRQ_CENTRAL_CONNECT:
+ conn_handle, _, _ = data
+ self._connections.add(conn_handle)
+ elif event == _IRQ_CENTRAL_DISCONNECT:
+ conn_handle, _, _ = data
+ self._connections.remove(conn_handle)
+ self._save_secrets()
+ # Start advertising again to allow a new connection.
+ self._advertise()
+ elif event == _IRQ_ENCRYPTION_UPDATE:
+ conn_handle, encrypted, authenticated, bonded, key_size = data
+ print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
+ elif event == _IRQ_PASSKEY_ACTION:
+ conn_handle, action, passkey = data
+ print("passkey action", conn_handle, action, passkey)
+ if action == _PASSKEY_ACTION_NUMCMP:
+ accept = int(input("accept? "))
+ self._ble.gap_passkey(conn_handle, action, accept)
+ elif action == _PASSKEY_ACTION_DISP:
+ print("displaying 123456")
+ self._ble.gap_passkey(conn_handle, action, 123456)
+ elif action == _PASSKEY_ACTION_INPUT:
+ print("prompting for passkey")
+ passkey = int(input("passkey? "))
+ self._ble.gap_passkey(conn_handle, action, passkey)
+ else:
+ print("unknown action")
+ elif event == _IRQ_GATTS_INDICATE_DONE:
+ conn_handle, value_handle, status = data
+ elif event == _IRQ_SET_SECRET:
+ sec_type, key, value = data
+ key = sec_type, bytes(key)
+ value = bytes(value) if value else None
+ print("set secret:", key, value)
+ if value is None:
+ if key in self._secrets:
+ del self._secrets[key]
+ return True
+ else:
+ return False
+ else:
+ self._secrets[key] = value
+ return True
+ elif event == _IRQ_GET_SECRET:
+ sec_type, index, key = data
+ print("get secret:", sec_type, index, bytes(key) if key else None)
+ if key is None:
+ i = 0
+ for (t, _key), value in self._secrets.items():
+ if t == sec_type:
+ if i == index:
+ return value
+ i += 1
+ return None
+ else:
+ key = sec_type, bytes(key)
+ return self._secrets.get(key, None)
+
+ def set_temperature(self, temp_deg_c, notify=False, indicate=False):
+ # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
+ # Write the local value, ready for a central to read.
+ self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
+ if notify or indicate:
+ for conn_handle in self._connections:
+ if notify:
+ # Notify connected centrals.
+ self._ble.gatts_notify(conn_handle, self._handle)
+ if indicate:
+ # Indicate connected centrals.
+ self._ble.gatts_indicate(conn_handle, self._handle)
+
+ def _advertise(self, interval_us=500000):
+ self._ble.config(addr_mode=2)
+ self._ble.gap_advertise(interval_us, adv_data=self._payload)
+
+ def _load_secrets(self):
+ self._secrets = {}
+ try:
+ with open("secrets.json", "r") as f:
+ entries = json.load(f)
+ for sec_type, key, value in entries:
+ self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
+ except:
+ print("no secrets available")
+
+ def _save_secrets(self):
+ try:
+ with open("secrets.json", "w") as f:
+ json_secrets = [
+ (sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
+ for (sec_type, key), value in self._secrets.items()
+ ]
+ json.dump(json_secrets, f)
+ except:
+ print("failed to save secrets")
+
+
+def demo():
+ ble = bluetooth.BLE()
+ temp = BLETemperature(ble)
+
+ t = 25
+ i = 0
+
+ while True:
+ # Write every second, notify every 10 seconds.
+ i = (i + 1) % 10
+ temp.set_temperature(t, notify=i == 0, indicate=False)
+ # Random walk the temperature.
+ t += random.uniform(-0.5, 0.5)
+ time.sleep_ms(1000)
+
+
+if __name__ == "__main__":
+ demo()