diff --git a/broker/acl.conf b/broker/acl.conf new file mode 100644 index 0000000..3b4b389 --- /dev/null +++ b/broker/acl.conf @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% EMQX ACL configuration +%%-------------------------------------------------------------------- + +%% ========================= +%% Device user permissions +%% ========================= + +%% Devices can publish ONLY to their own namespace +{allow, {user, "device"}, publish, ["device/${clientid}/meta/#"]}. +{allow, {user, "device"}, publish, ["device/${clientid}/property/#"]}. +{allow, {user, "device"}, publish, ["device/${clientid}/command/#"]}. + +%% Devices can receive commands +{allow, {user, "device"}, subscribe, ["device/${clientid}/command/#"]}. + + +%% ========================= +%% Authenticated users +%% ========================= + +{allow, {user, "bob"}, subscribe, ["device/#"]}. + +%% Any authenticated user can read all device topics +{allow, {user, all}, subscribe, ["device/+/meta/#"]}. +{allow, {user, all}, subscribe, ["device/+/property/#"]}. +{allow, {user, all}, subscribe, ["device/+/command/#"]}. + +%% Any authenticated user can publish commands to any device +{allow, {user, all}, publish, ["device/+/command/+"]}. + + +%% ========================= +%% Response topic mechanism +%% ========================= + +%% Clients can SUBSCRIBE to their own response inbox +{allow, {user, all}, subscribe, ["client/${clientid}/responses/#"]}. + +%% Authenticated users can PUBLISH to any client response inbox +{allow, {user, all}, publish, ["client/+/responses/#"]}. + +%% (No subscribe permission for others -> enforced by default deny) + + +%% ========================= +%% Unauthenticated users +%% ========================= + +%% Allow anonymous users to read ONLY meta topics +{allow, {ipaddr, "0.0.0.0/0"}, subscribe, ["device/+/meta/#"]}. + + +%% ========================= +%% Default deny +%% ========================= + +{deny, all}. \ No newline at end of file diff --git a/broker/emqx.conf b/broker/emqx.conf new file mode 100644 index 0000000..8699627 --- /dev/null +++ b/broker/emqx.conf @@ -0,0 +1,81 @@ +## Place read-only configurations in this file. +## To define configurations that can later be overridden through UI/API/CLI, add them to `etc/base.hocon`. +## +## Config precedence order: +## etc/base.hocon < cluster.hocon < emqx.conf < environment variables +## +## See https://docs.emqx.com/en/enterprise/latest/configuration/configuration.html for more information. +## Configuration full example can be found in etc/examples + +node { + name = "emqx@127.0.0.1" + cookie = "emqx50elixir" + data_dir = "data" +} + +cluster { + name = emqxcl + discovery_strategy = manual +} + +dashboard { + listeners { + http.bind = 18083 + # https.bind = 18084 + https { + ssl_options { + certfile = "${EMQX_ETC_DIR}/certs/cert.pem" + keyfile = "${EMQX_ETC_DIR}/certs/key.pem" + } + } + } +} + +##-------------------------------------------------------------------- +## Authentication +##-------------------------------------------------------------------- + +## Load users from file +authn { + enable = true + sources = [ + { + type = file + path = "etc/passwd" + password_hash_algorithm { + name = plain + } + } + ] +} + + +##-------------------------------------------------------------------- +## Authorization (ACL) +##-------------------------------------------------------------------- + +authorization { + sources = [ + { + type = file + path = "etc/acl.conf" + } + ] + no_match = deny +} + + +##-------------------------------------------------------------------- +## Anonymous access +##-------------------------------------------------------------------- + +allow_anonymous = true + + +##-------------------------------------------------------------------- +## Listener (basic) +##-------------------------------------------------------------------- + +listeners.tcp.default { + bind = "0.0.0.0:1883" +} \ No newline at end of file diff --git a/broker/passwd b/broker/passwd new file mode 100644 index 0000000..af98d44 --- /dev/null +++ b/broker/passwd @@ -0,0 +1,3 @@ +device:devicesecret +alice:alicepass +bob:bobpass \ No newline at end of file diff --git a/broker/users.csv b/broker/users.csv new file mode 100644 index 0000000..c365ad6 --- /dev/null +++ b/broker/users.csv @@ -0,0 +1,3 @@ +device1,secret1 +device2,secret2 +dashboard,adminpass diff --git a/docker-compose.yaml b/docker-compose.yaml index dd1b961..1437034 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -9,6 +9,10 @@ services: - 8084:8084 - 8883:8883 - 18083:18083 + volumes: + - ./broker/emqx.conf:/opt/emqx/etc/emqx.conf:z + - ./broker/acl.conf:/opt/emqx/etc/acl.conf:z + - ./broker/passwd:/opt/emqx/etc/passwd:z mediamtx: container_name: mediamtx diff --git a/mediamtx.py b/mediamtx.py index 32cc59b..22eb91c 100755 --- a/mediamtx.py +++ b/mediamtx.py @@ -47,7 +47,7 @@ class MediaMTXHandler(MQTTHandler): cache[topic] = sample.value print(topic, sample.value) - await self.set_property(topic, sample.value, 0, True) + await self.set_property(topic, sample.value, qos=0, retain=True) await asyncio.sleep(1) diff --git a/mqtthandler/command.py b/mqtthandler/command.py index da49795..0c604e9 100644 --- a/mqtthandler/command.py +++ b/mqtthandler/command.py @@ -64,3 +64,13 @@ class CommandResponse: def __str__(self): return json.dumps(asdict(self)) + + +def enumerate_commands(obj: object): + commands = {} + for base in obj.__class__.__mro__: + for name, attr in vars(base).items(): + if isinstance(attr, Command): + commands[attr.name] = attr + + return commands diff --git a/mqtthandler/handler.py b/mqtthandler/handler.py index 5f0229e..e990aa5 100644 --- a/mqtthandler/handler.py +++ b/mqtthandler/handler.py @@ -4,101 +4,114 @@ import inspect import paho import signal import json -from dataclasses import dataclass +from enum import Enum, auto from .command import ( Command, CommandResponse, CommandArgumentError, CommandExecutionError, + enumerate_commands, ) +from .property import Property -@dataclass -class MQTTConfig: - host: str - port: int = 1883 - username: str | None = None - password: str | None = None - keepalive: int = 60 + +class Status(Enum): + ONLINE = auto() + OFFLINE = auto() class MQTTHandler: + DEVICE = "device" + META = "meta" + PROPERTY = "property" + COMMAND = "command" STATUS = "status" def __init__( self, - mqtt_config: MQTTConfig, handler_id: str, ): self.handler_id = handler_id - self.mqtt_config = mqtt_config - self.topic_base = f"device/{handler_id}" - self.command_topic = f"{self.topic_base}/command" - self.property_topic = f"{self.topic_base}/property" + self.topic_base = f"{MQTTHandler.DEVICE}/{handler_id}" + self.meta_topic = f"{self.topic_base}/{MQTTHandler.META}" + self.command_topic = f"{self.topic_base}/{MQTTHandler.COMMAND}" + self.property_topic = f"{self.topic_base}/{MQTTHandler.PROPERTY}" self._shutdown_event = asyncio.Event() - will = aiomqtt.Will( - topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True + + self._mqtt_client = None + + self._commands = enumerate_commands(self) + self._properties = {} + self._meta = {} + + async def set_property(self, name: str, value, **kwargs): + if name in self._properties: + await self._properties[name](value, **kwargs) + else: + #print(f"Warning: proeprty {name} is unregistered") + await self._publish(f"{self.property_topic}/{name}", value, **kwargs) + + async def register_property( + self, name: str, description: str | None = None, schema: dict | None = None + ): + property = self._register_property( + f"{self.property_topic}/{name}", description, schema ) + self._properties[name] = property - self.mqtt_client = aiomqtt.Client( - self.mqtt_config.host, - port=self.mqtt_config.port, - identifier=handler_id, - protocol=paho.mqtt.client.MQTTv5, - will=will, - ) + async def _register_property( + self, name: str, description: str | None = None, schema: dict | None = None + ): + property = Property(name, description, schema, self._publish) + data = { + "schema": json.dumps(schema), + "description": description, + } + for k, v in {k: v for k, v in data.items() if v is not None}.items(): + await self._mqtt_client.publish( + f"{name}/${k}", + str(v), + qos=1, + retain=True, + ) - self.commands = self.get_available_commands() + return property - def get_available_commands(self): - commands = {} - for base in self.__class__.__mro__: - for name, attr in vars(base).items(): - if isinstance(attr, Command): - print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'") - commands[attr.name] = attr + async def _publish(self, name: str, value, **kwargs): + await self._mqtt_client.publish(f"{name}", value, **kwargs) - return commands - - async def register_commands(self): - for name, command in self.commands.items(): + async def _register_commands(self): + for name, command in self._commands.items(): for k, v in { "schema": json.dumps(command.schema), "description": command.description, **command.additional_properties, }.items(): - await self.mqtt_client.publish( + await self._mqtt_client.publish( f"{self.command_topic}/{command.name}/${k}", str(v), qos=1, retain=True, ) - async def register_property(self, property: str, description: str | None = None, schema: dict | None = None): - data = { - "schema": json.dumps(schema), - "description": description, - } - for k, v in {k:v for k,v in data.items() if v is not None}.items(): - await self.mqtt_client.publish( - f"{self.property_topic}/{property}/${k}", - str(v), - qos=1, - retain=True, - ) + async def _announce(self): + # announce that we are online + await self._register_commands() - async def set_property(self, property: str, value, qos=0, retain=False): - await self.mqtt_client.publish( - f"{self.property_topic}/{property}", - str(value), - qos=qos, - retain=retain, + self._meta[MQTTHandler.STATUS] = await self._register_property( + f"{self.meta_topic}/{MQTTHandler.STATUS}", + "Indicates the status of the device.", + {"type": "string", "enum": list(Status.__members__.keys())}, + ) + await self._meta[MQTTHandler.STATUS]( + self, json.dumps(Status.ONLINE.name), qos=1, retain=True ) - async def execute_command( + async def _execute_command( self, command_name: str, payload: str, @@ -115,7 +128,7 @@ class MQTTHandler: if hasattr(properties, "CorrelationData") else None ) - await self.mqtt_client.publish( + await self._mqtt_client.publish( properties.ResponseTopic, str(CommandResponse(success, str(message), correlation)), qos=1, @@ -123,7 +136,7 @@ class MQTTHandler: ) try: - command = self.commands[command_name] + command = self._commands[command_name] result = await command(self, payload) await respond(True, result) except (CommandArgumentError, CommandExecutionError) as e: @@ -132,39 +145,53 @@ class MQTTHandler: print(f"Failed to execute command {command_name} with unknown cause: ", e) await respond(False, "Unexpected error") - async def mqtt_command_writer_task(self): - async for message in self.mqtt_client.messages: + async def _command_executor(self): + await self._mqtt_client.subscribe(f"{self.command_topic}/+") + + async for message in self._mqtt_client.messages: topic = str(message.topic) payload = message.payload.decode("utf-8") if topic.startswith(self.command_topic): command_name = topic.removeprefix(f"{self.command_topic}/") - await self.execute_command(command_name, payload, message.properties) + await self._execute_command(command_name, payload, message.properties) - async def shutdown_watcher(self): + async def _shutdown_watcher(self): await self._shutdown_event.wait() - await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True) + await self._meta[MQTTHandler.STATUS]( + self, json.dumps(Status.OFFLINE.name), qos=1, retain=True + ) def stop(self): self._shutdown_event.set() signal.signal(signal.SIGINT, signal.SIG_DFL) - async def run(self): + async def run(self, host: str, **kwargs): INTERVAL = 5 + + will = aiomqtt.Will( + topic=f"{self.meta_topic}/{MQTTHandler.STATUS}", + payload=json.dumps(Status.OFFLINE.name), + qos=1, + retain=True, + ) + while True: try: - async with self.mqtt_client as client: - await client.subscribe(f"{self.command_topic}/+") - await self.register_commands() + async with aiomqtt.Client( + host, + protocol=paho.mqtt.client.MQTTv5, + will=will, + identifier=self.handler_id, + **kwargs, + ) as client: + self._mqtt_client = client - # announce that we are online - await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True) - await self.register_property("status", "Indicates the status of the device.", { - "type": "string", - "enum": ["ONLINE", "OFFLINE"] - }) - - tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()] + tasks = [ + self._command_executor(), + self._shutdown_watcher(), + self._announce(), + ] # Inspect instance methods for attr_name in dir(self): @@ -184,6 +211,9 @@ class MQTTHandler: ) await asyncio.sleep(INTERVAL) + finally: + self._mqtt_client = None + def task(func): """Decorator to mark async methods for automatic gathering.""" diff --git a/mqtthandler/property.py b/mqtthandler/property.py new file mode 100644 index 0000000..c650fe7 --- /dev/null +++ b/mqtthandler/property.py @@ -0,0 +1,58 @@ +import json +import jsonschema + + +class PropertyValueError(ValueError): + pass + + +class Property: + """ + Presumes that the handler will take the same arguments as aiomqtt.Client.publish + + ie: async publish( + topic: str, + payload: str | bytes | bytearray | int | float | None = None, + qos: int = 0, + retain: bool = False, + properties: Properties | None = None, + *args: Any, + timeout: float | None = None, + **kwargs: Any + ) → None + """ + + def __init__( + self, + name: str, + description: str | None = None, + schema: dict | None = None, + handler=None, + **kwargs, + ): + self.name = name + self.description = description + self.schema = schema + self.handler = handler + self.additional_properties = kwargs + + def __call__(self, handler_instance, payload, **kwargs): + if self.handler is None: + raise NotImplementedError(f"No handler bound for property '{self.name}'") + + try: + value = json.loads(payload) + if self.schema is not None: + jsonschema.validate(value, self.schema) + except json.decoder.JSONDecodeError as e: + raise PropertyValueError( + f"Invalid JSON at line {e.lineno} column {e.colno}: {e.msg}" + ) + except jsonschema.ValidationError as e: + raise PropertyValueError(f"Schema error in {e.json_path}: {e.message}") + + try: + return self.handler(self.name, payload, **kwargs) + except Exception as e: + print("Failed to set property: ", e) + raise RuntimeError(f"Failed to set property.") diff --git a/radio.py b/radio.py index b8b3b9d..d2de1b5 100755 --- a/radio.py +++ b/radio.py @@ -22,8 +22,8 @@ class RadioHandler(MQTTHandler): @task async def publish_stream_path(self): - await self.set_property("path", self.radio.stream_path(), 1, True) - await self.set_property("file", self.radio.path, 1, True) + await self.set_property("path", self.radio.stream_path(), qos=1, qos=True) + await self.set_property("file", self.radio.path, qos=1, qos=True) @command({"type": "object"}, "Start the radio stream.") async def start(self, args): diff --git a/ubx.py b/ubx.py index e851568..f8116fa 100755 --- a/ubx.py +++ b/ubx.py @@ -12,7 +12,7 @@ import socket import signal from mqtthandler.command import command -from mqtthandler.handler import MQTTConfig, MQTTHandler, task +from mqtthandler.handler import MQTTHandler, task # The pyubx2 library spams the console with errors that aren't errors. # I don't care if you failed to parse an incomplete buffer. @@ -47,11 +47,10 @@ class UBXAsyncParser: class UBXHandler(MQTTHandler): def __init__( self, - mqtt_client: aiomqtt.Client, handler_id: str, serial_port: aioserial.AioSerial, ): - super().__init__(mqtt_client, handler_id) + super().__init__(handler_id) self.serial_port = serial_port @task @@ -77,7 +76,7 @@ class UBXHandler(MQTTHandler): continue property = f"{message.identity}/{name}" - await self.set_property(property, value, 1, True) + await self.set_property(property, value, qos=0, retain=True) else: # print("Unexpected response:", message) pass @@ -112,7 +111,7 @@ class UBXHandler(MQTTHandler): 460800, 921600, ], - "default": 9600 + "default": 9600, }, }, "required": ["portID", "baudRate"], @@ -147,7 +146,7 @@ class UBXHandler(MQTTHandler): "minimum": 1, "maximum": 127, "description": "Number of measurement cycles per navigation solution", - "default": 1 + "default": 1, }, "timeRef": { "type": "integer", @@ -175,18 +174,18 @@ class UBXHandler(MQTTHandler): async def main(): handler_id = f"example-gps-{socket.gethostname()}" - mqtt_config = MQTTConfig(host="127.0.0.1", port=1883) - serial_port = aioserial.AioSerial( - port="/tmp/ttyV0", - baudrate=115200, - timeout=0.05, # 50 ms + handler = UBXHandler( + handler_id, + aioserial.AioSerial( + port="/tmp/ttyV0", + baudrate=115200, + timeout=0.05, # 50 ms + ), ) - - handler = UBXHandler(mqtt_config, handler_id, serial_port) - signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) - await handler.run() + + await handler.run("127.0.0.1", port=1883, username="device", password="devicesecret") if __name__ == "__main__":