#! /usr/bin/env python3 import asyncio import aioserial import aiomqtt import pyubx2 import io import logging import aioserial import asyncio import socket import signal from mqtthandler.command import command from mqtthandler.handler import MQTTConfig, MQTTHandler, task # The pyubx2 library spams the console with errors that aren't errors. # I don't care if you failed to parse an incomplete buffer. logging.getLogger("pyubx2").setLevel(logging.FATAL) class UBXAsyncParser: def __init__(self): self.buffer = bytearray() def feed(self, data: bytes): self.buffer.extend(data) def parse(self): while True: stream = io.BytesIO(self.buffer) try: ubr = pyubx2.UBXReader(stream, parsing=True) raw, parsed = ubr.read() if raw is None: return consumed = stream.tell() del self.buffer[:consumed] yield parsed except pyubx2.UBXStreamError: # incomplete frame return class UBXHandler(MQTTHandler): def __init__( self, mqtt_client: aiomqtt.Client, handler_id: str, serial_port: aioserial.AioSerial, ): super().__init__(mqtt_client, handler_id) self.serial_port = serial_port @task async def handle_ubx_messages(self): async def read_serial(): parser = UBXAsyncParser() while True: chunk = await self.serial_port.read_async(64) if not chunk: await asyncio.sleep(0) continue parser.feed(chunk) for message in parser.parse(): yield message async for message in read_serial(): if isinstance(message, pyubx2.UBXMessage): for name, value in vars(message).items(): # Skip 'private' members if name.startswith("_"): continue topic = f"{self.property_topic}/{message.identity}/{name}" await self.mqtt_client.publish(topic, value, qos=1, retain=True) else: # print("Unexpected response:", message) pass @command({"type": "number"}, description="An example command") async def example_cmd(self, args): print(f"Executing command with args {args}") @command( { "type": "object", "properties": { "portID": { "type": "integer", }, "baudRate": { "type": "integer", "enum": [ 110, 300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 38400, 57600, 115200, 230400, 460800, 921600, ], "default": 9600 }, }, "required": ["portID", "baudRate"], "additionalProperties": False, }, description="Reconfigure the serial port for the UBX simulator.", ) async def configure_port(self, args): message = pyubx2.UBXMessage( "CFG", "CFG-PRT", pyubx2.ubxtypes_core.SET, portID=args["portID"], baudRate=args["baudRate"], ) num_bytes = await self.serial_port.write_async(message.serialize()) return num_bytes @command( { "type": "object", "properties": { "measRate": { "type": "integer", "minimum": 50, "maximum": 60000, "description": "Measurement period in milliseconds", "default": 1000, }, "navRate": { "type": "integer", "minimum": 1, "maximum": 127, "description": "Number of measurement cycles per navigation solution", "default": 1 }, "timeRef": { "type": "integer", "enum": [0, 1], "description": "Time reference (0=UTC, 1=GPS)", }, }, "required": ["measRate", "navRate", "timeRef"], "additionalProperties": False, }, description="Reconfigure the rate properties for the UBX device.", ) async def configure_rate(self, args): message = pyubx2.UBXMessage( "CFG", "CFG-RATE", pyubx2.ubxtypes_core.SET, measRate=args["measRate"], navRate=args["navRate"], timeRef=args["timeRef"], ) num_bytes = await self.serial_port.write_async(message.serialize()) return num_bytes async def main(): handler_id = f"example-gps-{socket.gethostname()}" mqtt_config = MQTTConfig(host="127.0.0.1", port=1883) serial_port = aioserial.AioSerial( port="/tmp/ttyV0", baudrate=115200, timeout=0.05, # 50 ms ) handler = UBXHandler(mqtt_config, handler_id, serial_port) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) await handler.run() if __name__ == "__main__": asyncio.run(main())