mqttdevicemanager/ubxhandler.py

123 lines
3.5 KiB
Python

import asyncio
import aioserial
import aiomqtt
import pyubx2
import io
import logging
from command import command
from handler import MQTTHandler, task
# The pyubx2 library spams the console with errors that aren't errors.
# I don't care if you failed to parse an incomplete buffer.
logging.getLogger("pyubx2").setLevel(logging.FATAL)
class UBXAsyncParser:
def __init__(self):
self.buffer = bytearray()
def feed(self, data: bytes):
self.buffer.extend(data)
def parse(self):
while True:
stream = io.BytesIO(self.buffer)
try:
ubr = pyubx2.UBXReader(stream, parsing=True)
raw, parsed = ubr.read()
if raw is None:
return
consumed = stream.tell()
del self.buffer[:consumed]
yield parsed
except pyubx2.UBXStreamError:
# incomplete frame
return
class UBXHandler(MQTTHandler):
def __init__(
self,
mqtt_client: aiomqtt.Client,
handler_id: str,
serial_port: aioserial.AioSerial,
):
super().__init__(mqtt_client, handler_id)
self.serial_port = serial_port
@task
async def handle_ubx_messages(self):
async def read_serial():
parser = UBXAsyncParser()
while True:
chunk = await self.serial_port.read_async(64)
if not chunk:
await asyncio.sleep(0)
continue
parser.feed(chunk)
for message in parser.parse():
yield message
async for message in read_serial():
if isinstance(message, pyubx2.UBXMessage):
for name, value in vars(message).items():
# Skip 'private' members
if name.startswith("_"):
continue
topic = f"{self.topic_base}/{message.identity}/{name}"
await self.mqtt_client.publish(topic, value, qos=1, retain=True)
@command({"type": "number"}, description="An example command")
async def example_cmd(self, args):
print(f"Executing command with args {args}")
@command(
{
"type": "object",
"properties": {
"portID": {
"type": "integer",
},
"baudRate": {
"type": "integer",
"enum": [
110,
300,
600,
1200,
2400,
4800,
9600,
14400,
19200,
38400,
57600,
115200,
230400,
460800,
921600,
],
},
},
"required": ["portID", "baudRate"],
"additionalProperties": False,
},
description="Reconfigure the serial port for the UBX simulator.",
)
async def configure_port(self, args):
message = pyubx2.UBXMessage(
"CFG",
"CFG-PRT",
pyubx2.ubxtypes_core.SET,
portID=args["portID"],
baudRate=args["baudRate"],
)
num_bytes = await self.serial_port.write_async(message.serialize())
return num_bytes