mqttdevicemanager/ubx.py

194 lines
5.5 KiB
Python
Executable File

#! /usr/bin/env python3
import asyncio
import aioserial
import aiomqtt
import pyubx2
import io
import logging
import aioserial
import asyncio
import socket
import signal
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
# The pyubx2 library spams the console with errors that aren't errors.
# I don't care if you failed to parse an incomplete buffer.
logging.getLogger("pyubx2").setLevel(logging.FATAL)
class UBXAsyncParser:
def __init__(self):
self.buffer = bytearray()
def feed(self, data: bytes):
self.buffer.extend(data)
def parse(self):
while True:
stream = io.BytesIO(self.buffer)
try:
ubr = pyubx2.UBXReader(stream, parsing=True)
raw, parsed = ubr.read()
if raw is None:
return
consumed = stream.tell()
del self.buffer[:consumed]
yield parsed
except pyubx2.UBXStreamError:
# incomplete frame
return
class UBXHandler(MQTTHandler):
def __init__(
self,
mqtt_client: aiomqtt.Client,
handler_id: str,
serial_port: aioserial.AioSerial,
):
super().__init__(mqtt_client, handler_id)
self.serial_port = serial_port
@task
async def handle_ubx_messages(self):
async def read_serial():
parser = UBXAsyncParser()
while True:
chunk = await self.serial_port.read_async(64)
if not chunk:
await asyncio.sleep(0)
continue
parser.feed(chunk)
for message in parser.parse():
yield message
async for message in read_serial():
if isinstance(message, pyubx2.UBXMessage):
for name, value in vars(message).items():
# Skip 'private' members
if name.startswith("_"):
continue
property = f"{message.identity}/{name}"
await self.set_property(property, value, 1, True)
else:
# print("Unexpected response:", message)
pass
@command({"type": "number"}, "An example command", foo="bar")
async def example_cmd(self, args):
print(f"Executing command with args {args}")
@command(
{
"type": "object",
"properties": {
"portID": {
"type": "integer",
},
"baudRate": {
"type": "integer",
"enum": [
110,
300,
600,
1200,
2400,
4800,
9600,
14400,
19200,
38400,
57600,
115200,
230400,
460800,
921600,
],
"default": 9600
},
},
"required": ["portID", "baudRate"],
"additionalProperties": False,
},
"Reconfigure the serial port for the UBX simulator.",
)
async def configure_port(self, args):
message = pyubx2.UBXMessage(
"CFG",
"CFG-PRT",
pyubx2.ubxtypes_core.SET,
portID=args["portID"],
baudRate=args["baudRate"],
)
num_bytes = await self.serial_port.write_async(message.serialize())
return num_bytes
@command(
{
"type": "object",
"properties": {
"measRate": {
"type": "integer",
"minimum": 50,
"maximum": 60000,
"description": "Measurement period in milliseconds",
"default": 1000,
},
"navRate": {
"type": "integer",
"minimum": 1,
"maximum": 127,
"description": "Number of measurement cycles per navigation solution",
"default": 1
},
"timeRef": {
"type": "integer",
"enum": [0, 1],
"description": "Time reference (0=UTC, 1=GPS)",
},
},
"required": ["measRate", "navRate", "timeRef"],
"additionalProperties": False,
},
"Reconfigure the rate properties for the UBX device.",
)
async def configure_rate(self, args):
message = pyubx2.UBXMessage(
"CFG",
"CFG-RATE",
pyubx2.ubxtypes_core.SET,
measRate=args["measRate"],
navRate=args["navRate"],
timeRef=args["timeRef"],
)
num_bytes = await self.serial_port.write_async(message.serialize())
return num_bytes
async def main():
handler_id = f"example-gps-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1", port=1883)
serial_port = aioserial.AioSerial(
port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
)
handler = UBXHandler(mqtt_config, handler_id, serial_port)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
if __name__ == "__main__":
asyncio.run(main())