1
0
Fork 0

Merge pull request #1 from gmrandazzo/main

This commit is contained in:
Daniele Tricoli 2023-03-07 22:19:50 +01:00 committed by GitHub
commit ab7f3c3b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 6 deletions

View File

@ -20,8 +20,6 @@ __all__ = [
DEFAULT_TIMEOUT = 5.0
NORDIC_UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
NORDIC_UART_TX_CHARACTERISTIC = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
NORDIC_UART_RX_CHARACTERISTIC = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
async def scan(adapter: str, timeout: float = DEFAULT_TIMEOUT):
@ -39,6 +37,8 @@ class Client:
self._receive_callback = None
self.disconnect_event = asyncio.Event()
self._client = None
self.uart_rx_char = None
self.uart_tx_char = None
async def connect(self, timeout: float = DEFAULT_TIMEOUT):
device = await BleakScanner.find_device_by_address(
@ -46,9 +46,14 @@ class Client:
)
if device is None:
raise RuntimeError(f"device with address {self.address} not found")
self._client = BleakClient(device, disconnected_callback=self.on_disconnect)
await self._client.__aenter__()
for service in self._client.services:
for char in service.characteristics:
if "write" in char.properties:
self.uart_tx_char = char
elif "notify" in char.properties:
self.uart_rx_char = char
async def disconnect(self):
if self._client is not None:
@ -57,11 +62,11 @@ class Client:
async def start(self):
if self._client is not None:
await self._client.start_notify(NORDIC_UART_RX_CHARACTERISTIC, self.on_rx)
await self._client.start_notify(self.uart_rx_char, self.on_rx)
async def stop(self):
if self._client is not None:
await self._client.stop_notify(NORDIC_UART_RX_CHARACTERISTIC)
await self._client.stop_notify(self.uart_rx_char)
def set_receive_callback(self, callback: Callable[[Any], None]):
self._receive_callback = callback
@ -82,7 +87,7 @@ class Client:
async def send(self, data):
if self._client is not None:
await self._client.write_gatt_char(NORDIC_UART_TX_CHARACTERISTIC, data)
await self._client.write_gatt_char(self.uart_tx_char, data)
async def send_forever(self, data: bytes, sleep_time: float):
while True: