import asyncio import aioserial import aiomqtt import pyubx2 import io import logging from command import command from handler import MQTTHandler, task # The pyubx2 library spams the console with errors that aren't errors. # I don't care if you failed to parse an incomplete buffer. logging.getLogger("pyubx2").setLevel(logging.FATAL) class UBXAsyncParser: def __init__(self): self.buffer = bytearray() def feed(self, data: bytes): self.buffer.extend(data) def parse(self): while True: stream = io.BytesIO(self.buffer) try: ubr = pyubx2.UBXReader(stream, parsing=True) raw, parsed = ubr.read() if raw is None: return consumed = stream.tell() del self.buffer[:consumed] yield parsed except pyubx2.UBXStreamError: # incomplete frame return class UBXHandler(MQTTHandler): def __init__( self, mqtt_client: aiomqtt.Client, handler_id: str, serial_port: aioserial.AioSerial, ): super().__init__(mqtt_client, handler_id) self.serial_port = serial_port @task async def handle_ubx_messages(self): async def read_serial(): parser = UBXAsyncParser() while True: chunk = await self.serial_port.read_async(64) if not chunk: await asyncio.sleep(0) continue parser.feed(chunk) for message in parser.parse(): yield message async for message in read_serial(): if isinstance(message, pyubx2.UBXMessage): for name, value in vars(message).items(): # Skip 'private' members if name.startswith("_"): continue topic = f"{self.property_topic}/{message.identity}/{name}" await self.mqtt_client.publish(topic, value, qos=1, retain=True) else: # print("Unexpected response:", message) pass @command({"type": "number"}, description="An example command") async def example_cmd(self, args): print(f"Executing command with args {args}") @command( { "type": "object", "properties": { "portID": { "type": "integer", }, "baudRate": { "type": "integer", "enum": [ 110, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 38400, 57600, 115200, 230400, 460800, 921600, ], }, }, "required": ["portID", "baudRate"], "additionalProperties": False, }, description="Reconfigure the serial port for the UBX simulator.", ) async def configure_port(self, args): message = pyubx2.UBXMessage( "CFG", "CFG-PRT", pyubx2.ubxtypes_core.SET, portID=args["portID"], baudRate=args["baudRate"], ) num_bytes = await self.serial_port.write_async(message.serialize()) return num_bytes @command( { "type": "object", "properties": { "measRate": { "type": "integer", "minimum": 50, "maximum": 60000, "description": "Measurement period in milliseconds", }, "navRate": { "type": "integer", "minimum": 1, "maximum": 127, "description": "Number of measurement cycles per navigation solution", }, "timeRef": { "type": "integer", "enum": [0, 1], "description": "Time reference (0=UTC, 1=GPS)", }, }, "required": ["measRate", "navRate", "timeRef"], "additionalProperties": False, }, description="Reconfigure the rate properties for the UBX device.", ) async def configure_rate(self, args): message = pyubx2.UBXMessage( "CFG", "CFG-RATE", pyubx2.ubxtypes_core.SET, measRate=args["measRate"], navRate=args["navRate"], timeRef=args["timeRef"], ) num_bytes = await self.serial_port.write_async(message.serialize()) return num_bytes