diff --git a/assethandler.py b/assethandler.py index 3b8fcd5..a96017a 100755 --- a/assethandler.py +++ b/assethandler.py @@ -1,101 +1,117 @@ #! /usr/bin/env python3 - import asyncio import uuid import aioserial import aiomqtt import pyubx2 -from datetime import datetime, timedelta -SERIAL_PORT = "/tmp/ttyV0" BAUD = 115200 -SERIAL_PUBLISH_TOPIC = "serial/data" -SERIAL_COMMAND_TOPIC = "serial/command" -async def parse_serial(ser): - buffer = bytearray() +class SerialMQTTHandler: + def __init__( + self, + handler_id: str, + serial_port: str, + mqtt_host: str = "127.0.0.1", + mqtt_port: int = 1883, + ): + self.handler_id = handler_id + self.serial_port = serial_port + self.mqtt_host = mqtt_host + self.mqtt_port = mqtt_port - # pyubx2 requires a file-like object, so we'll use a memoryview wrapper - class StreamWrapper: - def read(self, n=1): - # grab at most n bytes from buffer - if not buffer: - raise BlockingIOError - out = buffer[:n] - del buffer[:n] - return bytes(out) + # Serial + self.ser = aioserial.AioSerial( + port=serial_port, + baudrate=BAUD, + timeout=0.05, # 50 ms + ) - ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True) + # MQTT client + self.client_id = f"{handler_id}-{uuid.uuid4()}" + self.mqtt_client = aiomqtt.Client( + mqtt_host, port=mqtt_port, identifier=self.client_id + ) - while True: - chunk = await ser.read_async(200) - if chunk: - buffer.extend(chunk) + # Topic base + self.topic_base = f"asset/{handler_id}" + async def parse_serial(self): + buffer = bytearray() + + class StreamWrapper: + def read(inner_self, n=1): + if not buffer: + raise BlockingIOError + out = buffer[:n] + del buffer[:n] + return bytes(out) + + ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True) + + while True: + chunk = await self.ser.read_async(200) + if chunk: + buffer.extend(chunk) + + try: + while True: + raw, parsed = ubr.read() + if raw is None: + break + yield raw, parsed + except (pyubx2.UBXStreamError, BlockingIOError, Exception): + pass + else: + await asyncio.sleep(0) + + async def serial_reader_task(self): + async for raw, parsed in self.parse_serial(): + if isinstance(parsed, pyubx2.UBXMessage): + for name, value in vars(parsed).items(): + if name.startswith("_"): + continue + topic = f"{self.topic_base}/{parsed.identity}/{name}" + await self.mqtt_client.publish(topic, value, qos=1, retain=True) + else: + print(parsed) + + await self.mqtt_client.publish(f"{self.topic_base}/raw", raw) + await self.mqtt_client.publish(f"{self.topic_base}/parsed", str(parsed)) + + async def mqtt_command_writer_task(self): + command_topic = f"{self.topic_base}/command" + async for message in self.mqtt_client.messages: + topic = str(message.topic) + if topic == command_topic: + await self.ser.write_async(message.payload) + + async def run(self): + command_topic = f"{self.topic_base}/command" + interval = 5 + + while True: try: - while True: - raw, parsed = ubr.read() - if raw is None: - break - yield raw, parsed - except (pyubx2.UBXStreamError, BlockingIOError, Exception): - pass - else: - await asyncio.sleep(0) # yield control + async with self.mqtt_client as client: + await client.subscribe(command_topic) + await asyncio.gather( + self.serial_reader_task(), + self.mqtt_command_writer_task(), + ) -async def serial_reader(ser, mqtt_client): - async for raw, parsed in parse_serial(ser): - if isinstance(parsed, pyubx2.UBXMessage): - for name, value in vars(parsed).items(): - if name.startswith('_'): - continue - await mqtt_client.publish(f"ubx/{parsed.identity}/{name}", value, retain=True) - else: - print(parsed.identity, parsed) - - await mqtt_client.publish(f"ubx/raw", raw) - await mqtt_client.publish(f"ubx/parsed", str(parsed)) - -async def mqtt_command_writer(ser, client): - async for message in client.messages: - topic = str(message.topic) - print(message.topic, message.payload) - - if str(topic) == SERIAL_COMMAND_TOPIC: - await ser.write_async(message.payload) - - -async def main(): - client_id = f"jono-test-{uuid.uuid4()}" - interval = 5 - - ser = aioserial.AioSerial( - port=SERIAL_PORT, - baudrate=BAUD, - timeout=0.05, # 50 ms - ) - - while True: - try: - async with aiomqtt.Client( - "127.0.0.1", - port=1883, - identifier=client_id, - ) as client: - await client.subscribe(SERIAL_COMMAND_TOPIC) - - await asyncio.gather( - serial_reader(ser, client), - mqtt_command_writer(ser, client), + except aiomqtt.MqttError as e: + print( + f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {interval}s..." ) - - except aiomqtt.MqttError as e: - print(e) - print(f"Connection lost; reconnecting in {interval} seconds...") - await asyncio.sleep(interval) + await asyncio.sleep(interval) if __name__ == "__main__": + + async def main(): + handler = SerialMQTTHandler("example-gps", "/tmp/ttyV0") + await handler.run() + asyncio.run(main())