mqttdevicemanager/assethandler.py

146 lines
4.9 KiB
Python
Executable File

#! /usr/bin/env python3
import asyncio
import uuid
import aioserial
import aiomqtt
import pyubx2
import json
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import paho.mqtt.client as mqtt
from command import Command
BAUD = 115200
class SerialMQTTHandler:
def __init__(
self,
handler_id: str,
serial_port: str,
mqtt_host: str = "127.0.0.1",
mqtt_port: int = 1883,
):
self.handler_id = handler_id
self.serial_port = serial_port
self.mqtt_host = mqtt_host
self.mqtt_port = mqtt_port
# Serial
self.ser = aioserial.AioSerial(
port=serial_port,
baudrate=BAUD,
timeout=0.05, # 50 ms
)
# MQTT client
self.client_id = f"{handler_id}-{uuid.uuid4()}"
self.mqtt_client = aiomqtt.Client(
mqtt_host, port=mqtt_port, identifier=self.client_id, protocol=mqtt.MQTTv5
)
# Topic base
self.topic_base = f"asset/{handler_id}"
self.command_topic = f"{self.topic_base}/command"
# Add an arbitrary command
self.commands = {}
example_command = Command(
"example-cmd", {"type": "number"}, description="An example command"
)
self.commands[example_command.name] = example_command
async def parse_serial(self):
buffer = bytearray()
class StreamWrapper:
def read(inner_self, n=1):
if not buffer:
raise BlockingIOError
out = buffer[:n]
del buffer[:n]
return bytes(out)
ubr = pyubx2.UBXReader(StreamWrapper(), parsing=True)
while True:
chunk = await self.ser.read_async(200)
if chunk:
buffer.extend(chunk)
try:
while True:
raw, parsed = ubr.read()
if raw is None:
break
yield raw, parsed
except (pyubx2.UBXStreamError, BlockingIOError, Exception):
pass
else:
await asyncio.sleep(0)
async def serial_reader_task(self):
async for raw, parsed in self.parse_serial():
if isinstance(parsed, pyubx2.UBXMessage):
for name, value in vars(parsed).items():
if name.startswith("_"):
continue
topic = f"{self.topic_base}/{parsed.identity}/{name}"
await self.mqtt_client.publish(topic, value, qos=1, retain=True)
async def mqtt_command_writer_task(self):
async for message in self.mqtt_client.messages:
topic = str(message.topic)
payload = message.payload.decode("utf-8")
payload = json.loads(payload)
if topic.startswith(self.command_topic):
#await self.ser.write_async(message.payload)
command_name = topic.removeprefix(f"{self.command_topic}/")
command = self.commands.get(command_name)
if command is not None:
print(topic, payload, "valid:", command.validate(payload), message.properties)
if message.properties is not None and message.properties.ResponseTopic is not None:
await self.mqtt_client.publish(message.properties.ResponseTopic, message.payload, qos=1)
else:
print("Unknown command:", topic, message.payload)
async def run(self):
interval = 5
while True:
try:
async with self.mqtt_client as client:
await client.subscribe(f"{self.command_topic}/+")
for command in self.commands.values():
props = Properties(PacketTypes.PUBLISH)
props.ResponseTopic = "asset/client/response"
props.CorrelationData = b"req-42"
await self.mqtt_client.publish(f"{self.command_topic}/{command.name}/schema", str(command.schema), qos=1, retain=True, properties=props)
for k,v in command.additional_properties.items():
await self.mqtt_client.publish(f"{self.command_topic}/{command.name}/{k}", str(v), qos=1, retain=True)
await asyncio.gather(
self.serial_reader_task(),
self.mqtt_command_writer_task(),
)
except aiomqtt.MqttError as e:
print(
f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {interval}s..."
)
await asyncio.sleep(interval)
if __name__ == "__main__":
async def main():
handler = SerialMQTTHandler("example-gps", "/tmp/ttyV0")
await handler.run()
asyncio.run(main())