Added optional property validation against a schema

This commit is contained in:
Jono Targett 2026-03-19 23:29:37 +10:30
parent 93d7e9e5d1
commit a0421f07d0
11 changed files with 337 additions and 91 deletions

58
broker/acl.conf Normal file
View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% EMQX ACL configuration
%%--------------------------------------------------------------------
%% =========================
%% Device user permissions
%% =========================
%% Devices can publish ONLY to their own namespace
{allow, {user, "device"}, publish, ["device/${clientid}/meta/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/property/#"]}.
{allow, {user, "device"}, publish, ["device/${clientid}/command/#"]}.
%% Devices can receive commands
{allow, {user, "device"}, subscribe, ["device/${clientid}/command/#"]}.
%% =========================
%% Authenticated users
%% =========================
{allow, {user, "bob"}, subscribe, ["device/#"]}.
%% Any authenticated user can read all device topics
{allow, {user, all}, subscribe, ["device/+/meta/#"]}.
{allow, {user, all}, subscribe, ["device/+/property/#"]}.
{allow, {user, all}, subscribe, ["device/+/command/#"]}.
%% Any authenticated user can publish commands to any device
{allow, {user, all}, publish, ["device/+/command/+"]}.
%% =========================
%% Response topic mechanism
%% =========================
%% Clients can SUBSCRIBE to their own response inbox
{allow, {user, all}, subscribe, ["client/${clientid}/responses/#"]}.
%% Authenticated users can PUBLISH to any client response inbox
{allow, {user, all}, publish, ["client/+/responses/#"]}.
%% (No subscribe permission for others -> enforced by default deny)
%% =========================
%% Unauthenticated users
%% =========================
%% Allow anonymous users to read ONLY meta topics
{allow, {ipaddr, "0.0.0.0/0"}, subscribe, ["device/+/meta/#"]}.
%% =========================
%% Default deny
%% =========================
{deny, all}.

81
broker/emqx.conf Normal file
View File

@ -0,0 +1,81 @@
## Place read-only configurations in this file.
## To define configurations that can later be overridden through UI/API/CLI, add them to `etc/base.hocon`.
##
## Config precedence order:
## etc/base.hocon < cluster.hocon < emqx.conf < environment variables
##
## See https://docs.emqx.com/en/enterprise/latest/configuration/configuration.html for more information.
## Configuration full example can be found in etc/examples
node {
name = "emqx@127.0.0.1"
cookie = "emqx50elixir"
data_dir = "data"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
dashboard {
listeners {
http.bind = 18083
# https.bind = 18084
https {
ssl_options {
certfile = "${EMQX_ETC_DIR}/certs/cert.pem"
keyfile = "${EMQX_ETC_DIR}/certs/key.pem"
}
}
}
}
##--------------------------------------------------------------------
## Authentication
##--------------------------------------------------------------------
## Load users from file
authn {
enable = true
sources = [
{
type = file
path = "etc/passwd"
password_hash_algorithm {
name = plain
}
}
]
}
##--------------------------------------------------------------------
## Authorization (ACL)
##--------------------------------------------------------------------
authorization {
sources = [
{
type = file
path = "etc/acl.conf"
}
]
no_match = deny
}
##--------------------------------------------------------------------
## Anonymous access
##--------------------------------------------------------------------
allow_anonymous = true
##--------------------------------------------------------------------
## Listener (basic)
##--------------------------------------------------------------------
listeners.tcp.default {
bind = "0.0.0.0:1883"
}

3
broker/passwd Normal file
View File

@ -0,0 +1,3 @@
device:devicesecret
alice:alicepass
bob:bobpass

3
broker/users.csv Normal file
View File

@ -0,0 +1,3 @@
device1,secret1
device2,secret2
dashboard,adminpass
1 device1 secret1
2 device2 secret2
3 dashboard adminpass

View File

@ -9,6 +9,10 @@ services:
- 8084:8084
- 8883:8883
- 18083:18083
volumes:
- ./broker/emqx.conf:/opt/emqx/etc/emqx.conf:z
- ./broker/acl.conf:/opt/emqx/etc/acl.conf:z
- ./broker/passwd:/opt/emqx/etc/passwd:z
mediamtx:
container_name: mediamtx

View File

@ -47,7 +47,7 @@ class MediaMTXHandler(MQTTHandler):
cache[topic] = sample.value
print(topic, sample.value)
await self.set_property(topic, sample.value, 0, True)
await self.set_property(topic, sample.value, qos=0, retain=True)
await asyncio.sleep(1)

View File

@ -64,3 +64,13 @@ class CommandResponse:
def __str__(self):
return json.dumps(asdict(self))
def enumerate_commands(obj: object):
commands = {}
for base in obj.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
commands[attr.name] = attr
return commands

View File

@ -4,101 +4,114 @@ import inspect
import paho
import signal
import json
from dataclasses import dataclass
from enum import Enum, auto
from .command import (
Command,
CommandResponse,
CommandArgumentError,
CommandExecutionError,
enumerate_commands,
)
from .property import Property
@dataclass
class MQTTConfig:
host: str
port: int = 1883
username: str | None = None
password: str | None = None
keepalive: int = 60
class Status(Enum):
ONLINE = auto()
OFFLINE = auto()
class MQTTHandler:
DEVICE = "device"
META = "meta"
PROPERTY = "property"
COMMAND = "command"
STATUS = "status"
def __init__(
self,
mqtt_config: MQTTConfig,
handler_id: str,
):
self.handler_id = handler_id
self.mqtt_config = mqtt_config
self.topic_base = f"device/{handler_id}"
self.command_topic = f"{self.topic_base}/command"
self.property_topic = f"{self.topic_base}/property"
self.topic_base = f"{MQTTHandler.DEVICE}/{handler_id}"
self.meta_topic = f"{self.topic_base}/{MQTTHandler.META}"
self.command_topic = f"{self.topic_base}/{MQTTHandler.COMMAND}"
self.property_topic = f"{self.topic_base}/{MQTTHandler.PROPERTY}"
self._shutdown_event = asyncio.Event()
will = aiomqtt.Will(
topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True
self._mqtt_client = None
self._commands = enumerate_commands(self)
self._properties = {}
self._meta = {}
async def set_property(self, name: str, value, **kwargs):
if name in self._properties:
await self._properties[name](value, **kwargs)
else:
#print(f"Warning: proeprty {name} is unregistered")
await self._publish(f"{self.property_topic}/{name}", value, **kwargs)
async def register_property(
self, name: str, description: str | None = None, schema: dict | None = None
):
property = self._register_property(
f"{self.property_topic}/{name}", description, schema
)
self._properties[name] = property
self.mqtt_client = aiomqtt.Client(
self.mqtt_config.host,
port=self.mqtt_config.port,
identifier=handler_id,
protocol=paho.mqtt.client.MQTTv5,
will=will,
)
async def _register_property(
self, name: str, description: str | None = None, schema: dict | None = None
):
property = Property(name, description, schema, self._publish)
data = {
"schema": json.dumps(schema),
"description": description,
}
for k, v in {k: v for k, v in data.items() if v is not None}.items():
await self._mqtt_client.publish(
f"{name}/${k}",
str(v),
qos=1,
retain=True,
)
self.commands = self.get_available_commands()
return property
def get_available_commands(self):
commands = {}
for base in self.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'")
commands[attr.name] = attr
async def _publish(self, name: str, value, **kwargs):
await self._mqtt_client.publish(f"{name}", value, **kwargs)
return commands
async def register_commands(self):
for name, command in self.commands.items():
async def _register_commands(self):
for name, command in self._commands.items():
for k, v in {
"schema": json.dumps(command.schema),
"description": command.description,
**command.additional_properties,
}.items():
await self.mqtt_client.publish(
await self._mqtt_client.publish(
f"{self.command_topic}/{command.name}/${k}",
str(v),
qos=1,
retain=True,
)
async def register_property(self, property: str, description: str | None = None, schema: dict | None = None):
data = {
"schema": json.dumps(schema),
"description": description,
}
for k, v in {k:v for k,v in data.items() if v is not None}.items():
await self.mqtt_client.publish(
f"{self.property_topic}/{property}/${k}",
str(v),
qos=1,
retain=True,
)
async def _announce(self):
# announce that we are online
await self._register_commands()
async def set_property(self, property: str, value, qos=0, retain=False):
await self.mqtt_client.publish(
f"{self.property_topic}/{property}",
str(value),
qos=qos,
retain=retain,
self._meta[MQTTHandler.STATUS] = await self._register_property(
f"{self.meta_topic}/{MQTTHandler.STATUS}",
"Indicates the status of the device.",
{"type": "string", "enum": list(Status.__members__.keys())},
)
await self._meta[MQTTHandler.STATUS](
self, json.dumps(Status.ONLINE.name), qos=1, retain=True
)
async def execute_command(
async def _execute_command(
self,
command_name: str,
payload: str,
@ -115,7 +128,7 @@ class MQTTHandler:
if hasattr(properties, "CorrelationData")
else None
)
await self.mqtt_client.publish(
await self._mqtt_client.publish(
properties.ResponseTopic,
str(CommandResponse(success, str(message), correlation)),
qos=1,
@ -123,7 +136,7 @@ class MQTTHandler:
)
try:
command = self.commands[command_name]
command = self._commands[command_name]
result = await command(self, payload)
await respond(True, result)
except (CommandArgumentError, CommandExecutionError) as e:
@ -132,39 +145,53 @@ class MQTTHandler:
print(f"Failed to execute command {command_name} with unknown cause: ", e)
await respond(False, "Unexpected error")
async def mqtt_command_writer_task(self):
async for message in self.mqtt_client.messages:
async def _command_executor(self):
await self._mqtt_client.subscribe(f"{self.command_topic}/+")
async for message in self._mqtt_client.messages:
topic = str(message.topic)
payload = message.payload.decode("utf-8")
if topic.startswith(self.command_topic):
command_name = topic.removeprefix(f"{self.command_topic}/")
await self.execute_command(command_name, payload, message.properties)
await self._execute_command(command_name, payload, message.properties)
async def shutdown_watcher(self):
async def _shutdown_watcher(self):
await self._shutdown_event.wait()
await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True)
await self._meta[MQTTHandler.STATUS](
self, json.dumps(Status.OFFLINE.name), qos=1, retain=True
)
def stop(self):
self._shutdown_event.set()
signal.signal(signal.SIGINT, signal.SIG_DFL)
async def run(self):
async def run(self, host: str, **kwargs):
INTERVAL = 5
will = aiomqtt.Will(
topic=f"{self.meta_topic}/{MQTTHandler.STATUS}",
payload=json.dumps(Status.OFFLINE.name),
qos=1,
retain=True,
)
while True:
try:
async with self.mqtt_client as client:
await client.subscribe(f"{self.command_topic}/+")
await self.register_commands()
async with aiomqtt.Client(
host,
protocol=paho.mqtt.client.MQTTv5,
will=will,
identifier=self.handler_id,
**kwargs,
) as client:
self._mqtt_client = client
# announce that we are online
await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True)
await self.register_property("status", "Indicates the status of the device.", {
"type": "string",
"enum": ["ONLINE", "OFFLINE"]
})
tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()]
tasks = [
self._command_executor(),
self._shutdown_watcher(),
self._announce(),
]
# Inspect instance methods
for attr_name in dir(self):
@ -184,6 +211,9 @@ class MQTTHandler:
)
await asyncio.sleep(INTERVAL)
finally:
self._mqtt_client = None
def task(func):
"""Decorator to mark async methods for automatic gathering."""

58
mqtthandler/property.py Normal file
View File

@ -0,0 +1,58 @@
import json
import jsonschema
class PropertyValueError(ValueError):
pass
class Property:
"""
Presumes that the handler will take the same arguments as aiomqtt.Client.publish
ie: async publish(
topic: str,
payload: str | bytes | bytearray | int | float | None = None,
qos: int = 0,
retain: bool = False,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any
) None
"""
def __init__(
self,
name: str,
description: str | None = None,
schema: dict | None = None,
handler=None,
**kwargs,
):
self.name = name
self.description = description
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
def __call__(self, handler_instance, payload, **kwargs):
if self.handler is None:
raise NotImplementedError(f"No handler bound for property '{self.name}'")
try:
value = json.loads(payload)
if self.schema is not None:
jsonschema.validate(value, self.schema)
except json.decoder.JSONDecodeError as e:
raise PropertyValueError(
f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}"
)
except jsonschema.ValidationError as e:
raise PropertyValueError(f"Schema error in {e.json_path}: {e.message}")
try:
return self.handler(self.name, payload, **kwargs)
except Exception as e:
print("Failed to set property: ", e)
raise RuntimeError(f"Failed to set property.")

View File

@ -22,8 +22,8 @@ class RadioHandler(MQTTHandler):
@task
async def publish_stream_path(self):
await self.set_property("path", self.radio.stream_path(), 1, True)
await self.set_property("file", self.radio.path, 1, True)
await self.set_property("path", self.radio.stream_path(), qos=1, qos=True)
await self.set_property("file", self.radio.path, qos=1, qos=True)
@command({"type": "object"}, "Start the radio stream.")
async def start(self, args):

29
ubx.py
View File

@ -12,7 +12,7 @@ import socket
import signal
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from mqtthandler.handler import MQTTHandler, task
# The pyubx2 library spams the console with errors that aren't errors.
# I don't care if you failed to parse an incomplete buffer.
@ -47,11 +47,10 @@ class UBXAsyncParser:
class UBXHandler(MQTTHandler):
def __init__(
self,
mqtt_client: aiomqtt.Client,
handler_id: str,
serial_port: aioserial.AioSerial,
):
super().__init__(mqtt_client, handler_id)
super().__init__(handler_id)
self.serial_port = serial_port
@task
@ -77,7 +76,7 @@ class UBXHandler(MQTTHandler):
continue
property = f"{message.identity}/{name}"
await self.set_property(property, value, 1, True)
await self.set_property(property, value, qos=0, retain=True)
else:
# print("Unexpected response:", message)
pass
@ -112,7 +111,7 @@ class UBXHandler(MQTTHandler):
460800,
921600,
],
"default": 9600
"default": 9600,
},
},
"required": ["portID", "baudRate"],
@ -147,7 +146,7 @@ class UBXHandler(MQTTHandler):
"minimum": 1,
"maximum": 127,
"description": "Number of measurement cycles per navigation solution",
"default": 1
"default": 1,
},
"timeRef": {
"type": "integer",
@ -175,18 +174,18 @@ class UBXHandler(MQTTHandler):
async def main():
handler_id = f"example-gps-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1", port=1883)
serial_port = aioserial.AioSerial(
port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
handler = UBXHandler(
handler_id,
aioserial.AioSerial(
port="/tmp/ttyV0",
baudrate=115200,
timeout=0.05, # 50 ms
),
)
handler = UBXHandler(mqtt_config, handler_id, serial_port)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
await handler.run("127.0.0.1", port=1883, username="device", password="devicesecret")
if __name__ == "__main__":