diff --git a/main.py b/main.py index 646ce38..e729313 100755 --- a/main.py +++ b/main.py @@ -10,6 +10,7 @@ from ubxhandler import UBXHandler BAUD = 115200 + async def main(): handler_id = "example-gps" mqtt_host = "127.0.0.1" @@ -24,10 +25,10 @@ async def main(): ) serial_port = aioserial.AioSerial( - port="/tmp/ttyV0", - baudrate=BAUD, - timeout=0.05, # 50 ms - ) + port="/tmp/ttyV0", + baudrate=BAUD, + timeout=0.05, # 50 ms + ) handler = UBXHandler(mqtt_client, "example-gps", serial_port) await handler.run() diff --git a/ubxhandler.py b/ubxhandler.py index 8ff5770..dff8c90 100644 --- a/ubxhandler.py +++ b/ubxhandler.py @@ -7,36 +7,48 @@ from command import command from handler import MQTTHandler, task +class StreamWrapper: + def __init__(self): + self.buffer = bytearray() + + def read(self, n=1): + if not self.buffer: + raise BlockingIOError + out = self.buffer[:n] + del self.buffer[:n] + return bytes(out) + + def readline(self): + """Return bytes up to and including the first newline.""" + newline_index = self.buffer.find(b"\n") + if newline_index == -1: + # No newline yet, mimic non-blocking behavior + raise BlockingIOError + # Include the newline + line = self.buffer[: newline_index + 1] + del self.buffer[: newline_index + 1] + return bytes(line) + + def extend(self, chunk): + self.buffer.extend(chunk) + + class UBXHandler(MQTTHandler): def __init__( self, mqtt_client: aiomqtt.Client, handler_id: str, - serial_port: aioserial.AioSerial + serial_port: aioserial.AioSerial, ): super().__init__(mqtt_client, handler_id) - self.serial_port = serial_port - @command({"type": "number"}, description="An example command") - async def example_cmd(args): - print(f"Executing command with args {args}") - - async def parse_serial(self): - buffer = bytearray() - - class StreamWrapper: - def read(inner_self, n=1): - if not buffer: - raise BlockingIOError - out = buffer[:n] - del buffer[:n] - return bytes(out) - - ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True) + async def read_serial(self): + buffer = StreamWrapper() + ubr = pyubx2.UBXReader(buffer, parsing=True) while True: - chunk = await self.serial_port.read_async(200) + chunk = await self.serial_port.read_async(256) if chunk: buffer.extend(chunk) @@ -45,18 +57,26 @@ class UBXHandler(MQTTHandler): raw, parsed = ubr.read() if raw is None: break - yield raw, parsed - except (pyubx2.UBXStreamError, BlockingIOError, Exception): - pass + yield parsed + except BlockingIOError: + pass # ordinary behaviour + except (pyubx2.UBXStreamError, Exception) as e: + print("Some kinda error: ", type(e), e) else: await asyncio.sleep(0) @task - async def serial_reader_task(self): - async for raw, parsed in self.parse_serial(): - if isinstance(parsed, pyubx2.UBXMessage): - for name, value in vars(parsed).items(): + async def handle_ubx_messages(self): + async for message in self.read_serial(): + if isinstance(message, pyubx2.UBXMessage): + for name, value in vars(message).items(): + # Skip 'private' members if name.startswith("_"): continue - topic = f"{self.topic_base}/{parsed.identity}/{name}" + + topic = f"{self.topic_base}/{message.identity}/{name}" await self.mqtt_client.publish(topic, value, qos=1, retain=True) + + @command({"type": "number"}, description="An example command") + async def example_cmd(args): + print(f"Executing command with args {args}")