86 lines
2.4 KiB
Python
86 lines
2.4 KiB
Python
import asyncio
|
|
import logging
|
|
from functools import wraps
|
|
|
|
import click
|
|
from ble_serial.bluetooth.ble_interface import BLE_interface
|
|
|
|
|
|
def coro(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
return asyncio.run(f(*args, **kwargs))
|
|
|
|
return wrapper
|
|
|
|
|
|
async def send_conditionally(ble: BLE_interface, data: bytes, loop: bool):
|
|
"""Send specified data.
|
|
|
|
Raise asyncio.CancelledError if loop == True after data is sent once.
|
|
"""
|
|
ble.queue_send(data)
|
|
# TODO: Make the user able to specify sleep duration.
|
|
await asyncio.sleep(0.1)
|
|
if not loop:
|
|
raise asyncio.CancelledError
|
|
|
|
|
|
async def send_forever(ble: BLE_interface, data: bytes, loop: bool):
|
|
"""Send data forever."""
|
|
while True:
|
|
await send_conditionally(ble, data, loop)
|
|
|
|
|
|
def _receive_callback(data: bytes):
|
|
logging.debug("Received:", data)
|
|
|
|
|
|
@click.group()
|
|
@click.option("--adapter", default="hci0", type=str, help="ble adapter [default: hci0]")
|
|
@click.option("--device", required=True, type=str, help="ble device address")
|
|
@click.pass_context
|
|
def cli(ctx, adapter, device):
|
|
"""A simple tool to send messages into FreakWAN."""
|
|
ctx.ensure_object(dict)
|
|
ctx.obj["ADAPTER"] = adapter
|
|
ctx.obj["DEVICE"] = device
|
|
ctx.obj["BLE"] = BLE_interface(ctx.obj["ADAPTER"], None)
|
|
|
|
|
|
@cli.command()
|
|
@click.pass_context
|
|
@click.option("--loop", is_flag=True, default=False, help="send forever the messages")
|
|
@click.argument("messages", type=str, nargs=-1)
|
|
@coro
|
|
async def send(ctx, messages, loop):
|
|
"""Send one or more messages over BLE to a specific device."""
|
|
msg = " ".join(messages)
|
|
ble = ctx.obj["BLE"]
|
|
device = ctx.obj["DEVICE"]
|
|
ble.set_receiver(_receive_callback)
|
|
try:
|
|
logging.info(f"Connecting to {device}...")
|
|
await ble.connect(device, "public", 10.0)
|
|
# TODO: Handle WRITE_UUID and READ_UUID.
|
|
await ble.setup_chars(None, None, "rw")
|
|
|
|
await asyncio.gather(
|
|
ble.send_loop(),
|
|
send_forever(ble, bytes(msg, "utf-8"), loop),
|
|
)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
await ble.disconnect()
|
|
|
|
|
|
def run():
|
|
"""CLI entrypoint."""
|
|
# ble-serial fire a warning on disconnect, but our main use case is to just
|
|
# send a message and disconnect, so we disable logging here.
|
|
# TODO: Make configurable by the user.
|
|
logging.disable()
|
|
|
|
asyncio.run(cli())
|