mqttdevicemanager/mqtthandler/command.py
2026-03-18 15:39:27 +10:30

57 lines
1.5 KiB
Python

import json
import jsonschema
from dataclasses import dataclass, asdict
class CommandArgumentError(ValueError):
pass
class CommandExecutionError(RuntimeError):
pass
class Command:
def __init__(self, name: str, schema, handler=None, **kwargs):
self.name = name
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
def __call__(self, handler_instance, payload):
if self.handler is None:
raise NotImplementedError(f"No handler bound for command '{self.name}'")
try:
arguments = json.loads(payload)
jsonschema.validate(arguments, self.schema)
except json.decoder.JSONDecodeError as e:
raise CommandArgumentError(
f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}"
)
except jsonschema.ValidationError as e:
raise CommandArgumentError(f"Schema error in {e.json_path}: {e.message}")
try:
return self.handler(handler_instance, arguments)
except Exception as e:
print("Internal command error: ", e)
raise CommandExecutionError(f"Command execution failed internally.")
def command(schema, **kwargs):
def decorator(func):
return Command(func.__name__, schema, handler=func, **kwargs)
return decorator
@dataclass
class CommandResponse:
success: bool
message: str = None
correlation: str = None
def __str__(self):
return json.dumps(asdict(self))