import json import jsonschema class PropertyValueError(ValueError): pass class Property: """ Presumes that the handler will take the same arguments as aiomqtt.Client.publish ie: async publish( topic: str, payload: str | bytes | bytearray | int | float | None = None, qos: int = 0, retain: bool = False, properties: Properties | None = None, *args: Any, timeout: float | None = None, **kwargs: Any ) → None """ def __init__( self, name: str, description: str | None = None, schema: dict | None = None, handler=None, **kwargs, ): self.name = name self.description = description self.schema = schema self.handler = handler self.additional_properties = kwargs def __call__(self, handler_instance, payload, **kwargs): if self.handler is None: raise NotImplementedError(f"No handler bound for property '{self.name}'") try: value = json.loads(payload) if self.schema is not None: jsonschema.validate(value, self.schema) except json.decoder.JSONDecodeError as e: raise PropertyValueError( f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}" ) except jsonschema.ValidationError as e: raise PropertyValueError(f"Schema error in {e.json_path}: {e.message}") try: return self.handler(self.name, payload, **kwargs) except Exception as e: print("Failed to set property: ", e) raise RuntimeError(f"Failed to set property.")