diff --git a/command.py b/command.py index 55e21ca..6a49c2e 100644 --- a/command.py +++ b/command.py @@ -1,5 +1,14 @@ +import json import jsonschema -from dataclasses import dataclass +from dataclasses import dataclass, asdict + + +class CommandArgumentError(ValueError): + pass + + +class CommandExecutionError(RuntimeError): + pass class Command: @@ -9,19 +18,25 @@ class Command: self.handler = handler self.additional_properties = kwargs - self._validator = jsonschema.validators.validator_for( - schema, default=jsonschema.validators.Draft7Validator - )(schema=schema) - - def validate(self, o) -> bool: - return self._validator.is_valid(o) - - def __call__(self, handler_instance, args): - if not self.validate(args): - raise ValueError(f"Invalid arguments for command '{self.name}'") + def __call__(self, handler_instance, payload): if self.handler is None: - raise RuntimeError(f"No handler bound for command '{self.name}'") - return self.handler(handler_instance, args) + raise NotImplementedError(f"No handler bound for command '{self.name}'") + + try: + arguments = json.loads(payload) + jsonschema.validate(arguments, self.schema) + except json.decoder.JSONDecodeError as e: + raise CommandArgumentError( + f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}" + ) + except jsonschema.ValidationError as e: + raise CommandArgumentError(f"Schema error in {e.json_path}: {e.message}") + + try: + return self.handler(handler_instance, arguments) + except Exception as e: + print("Internal command error: ", e) + raise CommandExecutionError(f"Command execution failed internally.") def command(schema, **kwargs): @@ -36,3 +51,6 @@ class CommandResponse: success: bool message: str = None correlation: str = None + + def __str__(self): + return json.dumps(asdict(self)) diff --git a/handler.py b/handler.py index c57ea84..8362545 100644 --- a/handler.py +++ b/handler.py @@ -1,10 +1,14 @@ -import asyncio import aiomqtt -import json -from command import Command, CommandResponse +import asyncio import inspect -from paho.mqtt.properties import Properties -from dataclasses import asdict +import paho + +from command import ( + Command, + CommandResponse, + CommandArgumentError, + CommandExecutionError, +) class MQTTHandler: @@ -47,9 +51,11 @@ class MQTTHandler: ) async def execute_command( - self, command_name: str, payload: str, properties: Properties = None + self, + command_name: str, + payload: str, + properties: paho.mqtt.properties.Properties = None, ): - async def respond(success: bool, message: str = None): if ( properties is not None @@ -63,23 +69,17 @@ class MQTTHandler: ) await self.mqtt_client.publish( properties.ResponseTopic, - json.dumps( - asdict(CommandResponse(success, str(message), correlation)) - ), + str(CommandResponse(success, str(message), correlation)), qos=1, retain=False, ) try: command = self.get_available_commands()[command_name] - argument = json.loads(payload) - result = await command(self, argument) + result = await command(self, payload) await respond(True, result) - - except json.decoder.JSONDecodeError as e: - await respond(False, f"Failed to parse payload as JSON: {e}") - except ValueError as e: - await respond(False, f"Command payload does not match expected schema: {e}") + except (CommandArgumentError, CommandExecutionError) as e: + await respond(False, f"{e}") except Exception as e: print(f"Failed to execute command {command_name} with unknown cause: ", e) await respond(False, "Unexpected error") diff --git a/main.py b/main.py index e729313..ea7bf12 100755 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import aioserial import asyncio import paho import uuid +import socket from ubxhandler import UBXHandler @@ -12,7 +13,7 @@ BAUD = 115200 async def main(): - handler_id = "example-gps" + handler_id = f"example-gps-{socket.gethostname()}" mqtt_host = "127.0.0.1" mqtt_port = 1883 @@ -30,7 +31,7 @@ async def main(): timeout=0.05, # 50 ms ) - handler = UBXHandler(mqtt_client, "example-gps", serial_port) + handler = UBXHandler(mqtt_client, handler_id, serial_port) await handler.run() diff --git a/ubxhandler.py b/ubxhandler.py index 7a928a7..32b17cc 100644 --- a/ubxhandler.py +++ b/ubxhandler.py @@ -70,10 +70,10 @@ class UBXHandler(MQTTHandler): if name.startswith("_"): continue - topic = f"{self.topic_base}/{message.identity}/{name}" + topic = f"{self.property_topic}/{message.identity}/{name}" await self.mqtt_client.publish(topic, value, qos=1, retain=True) else: - #print("Unexpected response:", message) + # print("Unexpected response:", message) pass @command({"type": "number"}, description="An example command")