mqttdevicemanager/assethandler.py

110 lines
3.0 KiB
Python
Executable File

#! /usr/bin/env python3
import asyncio
import uuid
import aioserial
import aiomqtt
import pyubx2
SERIAL_PORT = "/tmp/ttyV0"
BAUD = 115200
SERIAL_PUBLISH_TOPIC = "serial/data"
SERIAL_COMMAND_TOPIC = "serial/command"
async def parse_serial(ser):
"""
Async generator: read raw bytes from serial and yield parsed UBX/NMEA messages.
"""
buffer = bytearray()
# pyubx2 requires a file-like object, so we'll use a memoryview wrapper
class StreamWrapper:
def read(self, n=1):
# grab at most n bytes from buffer
if not buffer:
raise BlockingIOError
out = buffer[:n]
del buffer[:n]
return bytes(out)
ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True)
while True:
# read available data from serial (small chunks)
chunk = await ser.read_async(200)
if chunk:
buffer.extend(chunk)
# attempt to parse as many messages as possible
try:
while True:
raw, parsed = ubr.read()
if raw is None:
break
yield raw, parsed
except (BlockingIOError, Exception):
# incomplete message; wait for more bytes
pass
except pyubx2.UBXStreamError:
pass # silently ignore incomplete packets
else:
await asyncio.sleep(0) # yield control
async def serial_reader(ser, mqtt_client):
"""
Coroutine wrapper around async generator parse_serial.
Feeds parsed UBX/NMEA messages to MQTT.
"""
async for raw, parsed in parse_serial(ser):
# For example, publish raw bytes to MQTT
print(parsed)
await mqtt_client.publish("serial/parsed", str(parsed))
await mqtt_client.publish("serial/raw", raw)
# Optional: inspect parsed fields
# print(parsed)
async def mqtt_command_writer(ser, client):
async for message in client.messages:
topic = str(message.topic)
print(message.topic, message.payload)
if str(topic) == SERIAL_COMMAND_TOPIC:
await ser.write_async(message.payload)
async def main():
client_id = f"jono-test-{uuid.uuid4()}"
interval = 5
ser = aioserial.AioSerial(
port=SERIAL_PORT,
baudrate=BAUD,
timeout=0.05, # 50 ms
)
while True:
try:
async with aiomqtt.Client(
"127.0.0.1",
port=1883,
identifier=client_id,
) as client:
await client.subscribe(SERIAL_COMMAND_TOPIC)
await asyncio.gather(
serial_reader(ser, client),
mqtt_command_writer(ser, client),
)
except aiomqtt.MqttError as e:
print(e)
print(f"Connection lost; reconnecting in {interval} seconds...")
await asyncio.sleep(interval)
if __name__ == "__main__":
asyncio.run(main())