1
0
Fork 0
freakble/src/freakble/ble.py

158 lines
4.5 KiB
Python

# Copyright © 2023 Daniele Tricoli <eriol@mornie.org>
# SPDX-License-Identifier: BSD-3-Clause
"""BLE related stuff for freakble."""
import asyncio
import logging
from typing import Any, Callable
from bleak import BleakClient, BleakScanner
from .repl import REPL
__all__ = [
"Client",
"repl_loop",
"scan",
"send_text",
]
DEFAULT_TIMEOUT = 5.0
NORDIC_UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
async def scan(adapter: str, timeout: float = DEFAULT_TIMEOUT):
"""Scan for Bluetooth LE devices."""
return await BleakScanner.discover(
adapter=adapter, timeout=timeout, service_uuids=[NORDIC_UART_SERVICE_UUID]
)
class Client:
"""Simple client for UART devices."""
def __init__(self, adapter: str, address: str) -> None:
self.adapter = adapter
self.address = address
self._receive_callback = None
self.disconnect_event = asyncio.Event()
self._client = None
self.uart_rx_char = None
self.uart_tx_char = None
async def connect(self, timeout: float = DEFAULT_TIMEOUT):
device = await BleakScanner.find_device_by_address(
self.address, adapter=self.adapter, timeout=timeout
)
if device is None:
raise RuntimeError(f"device with address {self.address} not found")
self._client = BleakClient(device, disconnected_callback=self.on_disconnect)
await self._client.__aenter__()
uart_service = self._client.services.get_service(NORDIC_UART_SERVICE_UUID)
if uart_service is None:
raise RuntimeError(
f"device with address {self.address} doesn't have an UART service"
)
for char in uart_service.characteristics:
if "write" in char.properties:
self.uart_tx_char = char
elif "notify" in char.properties:
self.uart_rx_char = char
async def disconnect(self):
if self._client is not None:
await self.stop()
await self._client.disconnect()
async def start(self):
if self._client and self.uart_rx_char:
await self._client.start_notify(self.uart_rx_char, self.on_rx)
async def stop(self):
if self._client and self.uart_rx_char:
await self._client.stop_notify(self.uart_rx_char)
def set_receive_callback(self, callback: Callable[[Any], None]):
self._receive_callback = callback
def on_rx(self, characteristic, data):
logging.debug(characteristic, data)
data = data.decode("utf-8").rstrip()
if self._receive_callback is not None:
self._receive_callback(data)
async def wait_until_disconnect(self):
await self.disconnect_event.wait()
def on_disconnect(self, client: BleakClient):
logging.debug("Disconnect...")
self.disconnect_event.set()
async def send(self, data):
if self._client and self.uart_tx_char:
# We always need to perform a write-with-response because on MacOS we will
# not write at all.
await self._client.write_gatt_char(self.uart_tx_char, data, response=True)
async def send_forever(self, data: bytes, sleep_time: float):
while True:
await self.send(data)
await asyncio.sleep(sleep_time)
async def send_text(
adapter: str,
text: str,
device: str,
loop: bool,
sleep_time: float,
timeout: float,
callback=None,
):
"""Send text over Bluetooth LE.
This is a facade that handle also connection/disconnection.
"""
client = Client(adapter, device)
if callback is not None:
client.set_receive_callback(callback)
try:
await client.connect(timeout)
await client.start()
if loop:
await asyncio.gather(
client.wait_until_disconnect(),
client.send_forever(bytes(text, "utf-8"), sleep_time),
)
else:
await client.send(bytes(text, "utf-8"))
finally:
await client.disconnect()
async def repl_loop(
adapter: str,
device: str,
timeout: float,
):
"""Run a REPL over BLE.
This is a facade that handle also connection/disconnection.
"""
client = Client(adapter, device)
try:
repl = REPL(client)
await client.connect(timeout)
await client.start()
await asyncio.gather(client.wait_until_disconnect(), repl.shell())
except asyncio.CancelledError:
pass
except AssertionError:
raise
finally:
await client.disconnect()