#! /usr/bin/env python3 import asyncio import uuid import aioserial import aiomqtt import pyubx2 BAUD = 115200 class SerialMQTTHandler: def __init__( self, handler_id: str, serial_port: str, mqtt_host: str = "127.0.0.1", mqtt_port: int = 1883, ): self.handler_id = handler_id self.serial_port = serial_port self.mqtt_host = mqtt_host self.mqtt_port = mqtt_port # Serial self.ser = aioserial.AioSerial( port=serial_port, baudrate=BAUD, timeout=0.05, # 50 ms ) # MQTT client self.client_id = f"{handler_id}-{uuid.uuid4()}" self.mqtt_client = aiomqtt.Client( mqtt_host, port=mqtt_port, identifier=self.client_id ) # Topic base self.topic_base = f"asset/{handler_id}" async def parse_serial(self): buffer = bytearray() class StreamWrapper: def read(inner_self, n=1): if not buffer: raise BlockingIOError out = buffer[:n] del buffer[:n] return bytes(out) ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True) while True: chunk = await self.ser.read_async(200) if chunk: buffer.extend(chunk) try: while True: raw, parsed = ubr.read() if raw is None: break yield raw, parsed except (pyubx2.UBXStreamError, BlockingIOError, Exception): pass else: await asyncio.sleep(0) async def serial_reader_task(self): async for raw, parsed in self.parse_serial(): if isinstance(parsed, pyubx2.UBXMessage): for name, value in vars(parsed).items(): if name.startswith("_"): continue topic = f"{self.topic_base}/{parsed.identity}/{name}" await self.mqtt_client.publish(topic, value, qos=1, retain=True) else: print(parsed) await self.mqtt_client.publish(f"{self.topic_base}/raw", raw) await self.mqtt_client.publish(f"{self.topic_base}/parsed", str(parsed)) async def mqtt_command_writer_task(self): command_topic = f"{self.topic_base}/command" async for message in self.mqtt_client.messages: topic = str(message.topic) if topic == command_topic: await self.ser.write_async(message.payload) async def run(self): command_topic = f"{self.topic_base}/command" interval = 5 while True: try: async with self.mqtt_client as client: await client.subscribe(command_topic) await asyncio.gather( self.serial_reader_task(), self.mqtt_command_writer_task(), ) except aiomqtt.MqttError as e: print( f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {interval}s..." ) await asyncio.sleep(interval) if __name__ == "__main__": async def main(): handler = SerialMQTTHandler("example-gps", "/tmp/ttyV0") await handler.run() asyncio.run(main())