diff --git a/ubxhandler.py b/ubxhandler.py index a5e951e..b9756f7 100644 --- a/ubxhandler.py +++ b/ubxhandler.py @@ -2,35 +2,40 @@ import asyncio import aioserial import aiomqtt import pyubx2 +import io +import logging from command import command from handler import MQTTHandler, task +# The pyubx2 library spams the console with errors that aren't errors. +# I don't care if you failed to parse an incomplete buffer. +logging.getLogger("pyubx2").setLevel(logging.FATAL) -class StreamWrapper: + +class UBXAsyncParser: def __init__(self): self.buffer = bytearray() - def read(self, n=1): - if not self.buffer: - raise BlockingIOError - out = self.buffer[:n] - del self.buffer[:n] - return bytes(out) + def feed(self, data: bytes): + self.buffer.extend(data) - def readline(self): - """Return bytes up to and including the first newline.""" - newline_index = self.buffer.find(b"\n") - if newline_index == -1: - # No newline yet, mimic non-blocking behavior - raise BlockingIOError - # Include the newline - line = self.buffer[: newline_index + 1] - del self.buffer[: newline_index + 1] - return bytes(line) + def parse(self): + while True: + stream = io.BytesIO(self.buffer) + try: + ubr = pyubx2.UBXReader(stream, parsing=True) + raw, parsed = ubr.read() + if raw is None: + return - def extend(self, chunk): - self.buffer.extend(chunk) + consumed = stream.tell() + del self.buffer[:consumed] + yield parsed + + except pyubx2.UBXStreamError: + # incomplete frame + return class UBXHandler(MQTTHandler): @@ -43,31 +48,22 @@ class UBXHandler(MQTTHandler): super().__init__(mqtt_client, handler_id) self.serial_port = serial_port - async def read_serial(self): - buffer = StreamWrapper() - ubr = pyubx2.UBXReader(buffer, parsing=True) - - while True: - chunk = await self.serial_port.read_async(256) - if chunk: - buffer.extend(chunk) - - try: - while True: - raw, parsed = ubr.read() - if raw is None: - break - yield parsed - except BlockingIOError: - pass # ordinary behaviour - except (pyubx2.UBXStreamError, Exception) as e: - print("Some kinda error: ", type(e), e) - else: - await asyncio.sleep(0) - @task async def handle_ubx_messages(self): - async for message in self.read_serial(): + + async def read_serial(): + parser = UBXAsyncParser() + while True: + chunk = await self.serial_port.read_async(64) + if not chunk: + await asyncio.sleep(0) + continue + + parser.feed(chunk) + for message in parser.parse(): + yield message + + async for message in read_serial(): if isinstance(message, pyubx2.UBXMessage): for name, value in vars(message).items(): # Skip 'private' members