mqttdevicemanager/command.py

39 lines
1.0 KiB
Python

import jsonschema
from dataclasses import dataclass
class Command:
def __init__(self, name: str, schema, handler=None, **kwargs):
self.name = name
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
self._validator = jsonschema.validators.validator_for(
schema, default=jsonschema.validators.Draft7Validator
)(schema=schema)
def validate(self, o) -> bool:
return self._validator.is_valid(o)
def __call__(self, args):
if not self.validate(args):
raise ValueError(f"Invalid arguments for command '{self.name}'")
if self.handler is None:
raise RuntimeError(f"No handler bound for command '{self.name}'")
return self.handler(args)
def command(schema, **kwargs):
def decorator(func):
return Command(func.__name__, schema, handler=func, **kwargs)
return decorator
@dataclass
class CommandResponse:
success: bool
message: str = None
correlation: str = None