#! /usr/bin/env python3 import asyncio import uuid import aioserial import aiomqtt import pyubx2 from datetime import datetime, timedelta SERIAL_PORT = "/tmp/ttyV0" BAUD = 115200 SERIAL_PUBLISH_TOPIC = "serial/data" SERIAL_COMMAND_TOPIC = "serial/command" async def parse_serial(ser): buffer = bytearray() # pyubx2 requires a file-like object, so we'll use a memoryview wrapper class StreamWrapper: def read(self, n=1): # grab at most n bytes from buffer if not buffer: raise BlockingIOError out = buffer[:n] del buffer[:n] return bytes(out) ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True) while True: chunk = await ser.read_async(200) if chunk: buffer.extend(chunk) try: while True: raw, parsed = ubr.read() if raw is None: break yield raw, parsed except (pyubx2.UBXStreamError, BlockingIOError, Exception): pass else: await asyncio.sleep(0) # yield control async def serial_reader(ser, mqtt_client): async for raw, parsed in parse_serial(ser): if isinstance(parsed, pyubx2.UBXMessage): for name, value in vars(parsed).items(): if name.startswith('_'): continue await mqtt_client.publish(f"ubx/{parsed.identity}/{name}", value, retain=True) else: print(parsed.identity, parsed) await mqtt_client.publish(f"ubx/raw", raw) await mqtt_client.publish(f"ubx/parsed", str(parsed)) async def mqtt_command_writer(ser, client): async for message in client.messages: topic = str(message.topic) print(message.topic, message.payload) if str(topic) == SERIAL_COMMAND_TOPIC: await ser.write_async(message.payload) async def main(): client_id = f"jono-test-{uuid.uuid4()}" interval = 5 ser = aioserial.AioSerial( port=SERIAL_PORT, baudrate=BAUD, timeout=0.05, # 50 ms ) while True: try: async with aiomqtt.Client( "127.0.0.1", port=1883, identifier=client_id, ) as client: await client.subscribe(SERIAL_COMMAND_TOPIC) await asyncio.gather( serial_reader(ser, client), mqtt_command_writer(ser, client), ) except aiomqtt.MqttError as e: print(e) print(f"Connection lost; reconnecting in {interval} seconds...") await asyncio.sleep(interval) if __name__ == "__main__": asyncio.run(main())