diff --git a/assethandler.py b/assethandler.py index a96017a..9fbe2ac 100755 --- a/assethandler.py +++ b/assethandler.py @@ -4,6 +4,12 @@ import uuid import aioserial import aiomqtt import pyubx2 +import json +from paho.mqtt.properties import Properties +from paho.mqtt.packettypes import PacketTypes +import paho.mqtt.client as mqtt + +from command import Command BAUD = 115200 @@ -31,11 +37,20 @@ class SerialMQTTHandler: # MQTT client self.client_id = f"{handler_id}-{uuid.uuid4()}" self.mqtt_client = aiomqtt.Client( - mqtt_host, port=mqtt_port, identifier=self.client_id + mqtt_host, port=mqtt_port, identifier=self.client_id, protocol=mqtt.MQTTv5 ) # Topic base self.topic_base = f"asset/{handler_id}" + self.command_topic = f"{self.topic_base}/command" + + # Add an arbitrary command + self.commands = {} + + example_command = Command( + "example-cmd", {"type": "number"}, description="An example command" + ) + self.commands[example_command.name] = example_command async def parse_serial(self): buffer = bytearray() @@ -74,27 +89,40 @@ class SerialMQTTHandler: continue topic = f"{self.topic_base}/{parsed.identity}/{name}" await self.mqtt_client.publish(topic, value, qos=1, retain=True) - else: - print(parsed) - - await self.mqtt_client.publish(f"{self.topic_base}/raw", raw) - await self.mqtt_client.publish(f"{self.topic_base}/parsed", str(parsed)) async def mqtt_command_writer_task(self): - command_topic = f"{self.topic_base}/command" async for message in self.mqtt_client.messages: topic = str(message.topic) - if topic == command_topic: - await self.ser.write_async(message.payload) + payload = message.payload.decode("utf-8") + payload = json.loads(payload) + if topic.startswith(self.command_topic): + #await self.ser.write_async(message.payload) + command_name = topic.removeprefix(f"{self.command_topic}/") + command = self.commands.get(command_name) + if command is not None: + print(topic, payload, "valid:", command.validate(payload), message.properties) + + if message.properties is not None and message.properties.ResponseTopic is not None: + await self.mqtt_client.publish(message.properties.ResponseTopic, message.payload, qos=1) + else: + print("Unknown command:", topic, message.payload) async def run(self): - command_topic = f"{self.topic_base}/command" interval = 5 while True: try: async with self.mqtt_client as client: - await client.subscribe(command_topic) + await client.subscribe(f"{self.command_topic}/+") + + for command in self.commands.values(): + props = Properties(PacketTypes.PUBLISH) + props.ResponseTopic = "asset/client/response" + props.CorrelationData = b"req-42" + + await self.mqtt_client.publish(f"{self.command_topic}/{command.name}/schema", str(command.schema), qos=1, retain=True, properties=props) + for k,v in command.additional_properties.items(): + await self.mqtt_client.publish(f"{self.command_topic}/{command.name}/{k}", str(v), qos=1, retain=True) await asyncio.gather( self.serial_reader_task(), diff --git a/command.py b/command.py new file mode 100644 index 0000000..e93827f --- /dev/null +++ b/command.py @@ -0,0 +1,15 @@ +import jsonschema + + +class Command: + def __init__(self, name: str, schema, **kwargs): + self.name = name + self.schema = schema + self.additional_properties = kwargs + + self._validator = jsonschema.validators.validator_for( + schema, default=jsonschema.validators.Draft7Validator + )(schema=schema) + + def validate(self, o) -> bool: + return self._validator.is_valid(o) diff --git a/requirements.txt b/requirements.txt index f9f43a4..38b568f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ aioserial black pyubx2 pyubxutils +jsonschema