diff --git a/mediamtx.py b/mediamtx.py index 22eb91c..3b733d4 100755 --- a/mediamtx.py +++ b/mediamtx.py @@ -7,7 +7,7 @@ from dataclasses import dataclass import aiohttp from mqtthandler.command import command -from mqtthandler.handler import MQTTConfig, MQTTHandler, task +from mqtthandler.handler import MQTTHandler, task from prometheus_client.parser import text_string_to_metric_families @dataclass @@ -21,11 +21,10 @@ class MediaMTXConfig: class MediaMTXHandler(MQTTHandler): def __init__( self, - mqtt_config: MQTTConfig, - handler_id: str, + name: str, mediamtx_config: MediaMTXConfig, ): - super().__init__(mqtt_config, handler_id) + super().__init__(name) self.config = mediamtx_config @task @@ -56,12 +55,9 @@ class MediaMTXHandler(MQTTHandler): async def main(): - handler_id = f"mediamtx-{socket.gethostname()}" - mqtt_config = MQTTConfig(host="127.0.0.1") - handler = MediaMTXHandler(mqtt_config, handler_id, MediaMTXConfig()) - + handler = MediaMTXHandler("mediamtx", MediaMTXConfig()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) - await handler.run() + await handler.run("127.0.0.1", username="device", password="devicesecret") if __name__ == "__main__": diff --git a/mqtthandler/handler.py b/mqtthandler/handler.py index e990aa5..25c5443 100644 --- a/mqtthandler/handler.py +++ b/mqtthandler/handler.py @@ -4,6 +4,10 @@ import inspect import paho import signal import json +import secrets +import os +import socket +from pathlib import Path from enum import Enum, auto from .command import ( @@ -16,12 +20,31 @@ from .command import ( from .property import Property +def get_identifier(cache_path: Path) -> str: + """ + Determine an MQTT client ID using the following order: + 1. Environment variable IDENTIFIER + 2. Value stored in /tmp/.tmp + 3. Generate a new random ID using secrets.token_urlsafe + + The resulting client ID is written to /tmp/mqtt_client_id.tmp for future use. + """ + + client_id = os.environ.get("IDENTIFIER", None) + if not client_id and cache_path.exists(): + client_id = cache_path.read_text().strip() + elif not client_id: + client_id = generate_identifier() + cache_path.write_text(client_id) + return client_id + +def generate_identifier() -> str: + return secrets.token_urlsafe(6) class Status(Enum): ONLINE = auto() OFFLINE = auto() - class MQTTHandler: DEVICE = "device" META = "meta" @@ -31,14 +54,15 @@ class MQTTHandler: def __init__( self, - handler_id: str, + name: str ): - self.handler_id = handler_id + self.name = name + self.identifier = get_identifier(Path(f"/tmp/{self.name}.tmp")) - self.topic_base = f"{MQTTHandler.DEVICE}/{handler_id}" - self.meta_topic = f"{self.topic_base}/{MQTTHandler.META}" - self.command_topic = f"{self.topic_base}/{MQTTHandler.COMMAND}" - self.property_topic = f"{self.topic_base}/{MQTTHandler.PROPERTY}" + self.topic_base = lambda: f"{MQTTHandler.DEVICE}/{self.identifier}" + self.meta_topic = lambda: f"{self.topic_base()}/{MQTTHandler.META}" + self.command_topic = lambda: f"{self.topic_base()}/{MQTTHandler.COMMAND}" + self.property_topic = lambda: f"{self.topic_base()}/{MQTTHandler.PROPERTY}" self._shutdown_event = asyncio.Event() @@ -53,13 +77,13 @@ class MQTTHandler: await self._properties[name](value, **kwargs) else: #print(f"Warning: proeprty {name} is unregistered") - await self._publish(f"{self.property_topic}/{name}", value, **kwargs) + await self._publish(f"{self.property_topic()}/{name}", value, **kwargs) async def register_property( self, name: str, description: str | None = None, schema: dict | None = None ): property = self._register_property( - f"{self.property_topic}/{name}", description, schema + f"{self.property_topic()}/{name}", description, schema ) self._properties[name] = property @@ -92,7 +116,7 @@ class MQTTHandler: **command.additional_properties, }.items(): await self._mqtt_client.publish( - f"{self.command_topic}/{command.name}/${k}", + f"{self.command_topic()}/{command.name}/${k}", str(v), qos=1, retain=True, @@ -103,7 +127,7 @@ class MQTTHandler: await self._register_commands() self._meta[MQTTHandler.STATUS] = await self._register_property( - f"{self.meta_topic}/{MQTTHandler.STATUS}", + f"{self.meta_topic()}/{MQTTHandler.STATUS}", "Indicates the status of the device.", {"type": "string", "enum": list(Status.__members__.keys())}, ) @@ -111,6 +135,10 @@ class MQTTHandler: self, json.dumps(Status.ONLINE.name), qos=1, retain=True ) + await self._publish(f"{self.meta_topic()}/name", self.name, qos=1, retain=True) + await self._publish(f"{self.meta_topic()}/type", type(self).__name__, qos=1, retain=True) + await self._publish(f"{self.meta_topic()}/host", socket.gethostname(), qos=1, retain=True) + async def _execute_command( self, command_name: str, @@ -146,14 +174,14 @@ class MQTTHandler: await respond(False, "Unexpected error") async def _command_executor(self): - await self._mqtt_client.subscribe(f"{self.command_topic}/+") + await self._mqtt_client.subscribe(f"{self.command_topic()}/+") async for message in self._mqtt_client.messages: topic = str(message.topic) payload = message.payload.decode("utf-8") - if topic.startswith(self.command_topic): - command_name = topic.removeprefix(f"{self.command_topic}/") + if topic.startswith(self.command_topic()): + command_name = topic.removeprefix(f"{self.command_topic()}/") await self._execute_command(command_name, payload, message.properties) async def _shutdown_watcher(self): @@ -170,7 +198,7 @@ class MQTTHandler: INTERVAL = 5 will = aiomqtt.Will( - topic=f"{self.meta_topic}/{MQTTHandler.STATUS}", + topic=f"{self.meta_topic()}/{MQTTHandler.STATUS}", payload=json.dumps(Status.OFFLINE.name), qos=1, retain=True, @@ -182,7 +210,7 @@ class MQTTHandler: host, protocol=paho.mqtt.client.MQTTv5, will=will, - identifier=self.handler_id, + identifier=self.identifier, **kwargs, ) as client: self._mqtt_client = client @@ -207,7 +235,7 @@ class MQTTHandler: except aiomqtt.MqttError as e: print( - f"[{self.handler_id}] MQTT connection error: {e}. Reconnecting in {INTERVAL}s..." + f"MQTT connection error: {e}. Reconnecting in {INTERVAL}s..." ) await asyncio.sleep(INTERVAL) diff --git a/radio.py b/radio.py index d2de1b5..45af3ca 100755 --- a/radio.py +++ b/radio.py @@ -6,24 +6,22 @@ import asyncio from dataclasses import dataclass from mqtthandler.command import command -from mqtthandler.handler import MQTTConfig, MQTTHandler, task +from mqtthandler.handler import MQTTHandler, task from streamer.fileradio import FileRadio class RadioHandler(MQTTHandler): def __init__( self, - mqtt_config: MQTTConfig, - handler_id: str, + name: str, ): - super().__init__(mqtt_config, handler_id) - - self.radio = FileRadio("./data/StarWars60.mp3", handler_id) + super().__init__(name) + self.radio = FileRadio("./data/StarWars60.mp3", name) @task async def publish_stream_path(self): - await self.set_property("path", self.radio.stream_path(), qos=1, qos=True) - await self.set_property("file", self.radio.path, qos=1, qos=True) + await self.set_property("path", self.radio.stream_path(), qos=1, retain=True) + await self.set_property("file", self.radio.path, qos=1, retain=True) @command({"type": "object"}, "Start the radio stream.") async def start(self, args): @@ -48,12 +46,9 @@ class RadioHandler(MQTTHandler): await self.publish_stream_path() async def main(): - handler_id = f"radio-{socket.gethostname()}" - mqtt_config = MQTTConfig(host="127.0.0.1") - handler = RadioHandler(mqtt_config, handler_id) - + handler = RadioHandler("radio") signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) - await handler.run() + await handler.run("127.0.0.1", username="device", password="devicesecret") if __name__ == "__main__": diff --git a/ubx.py b/ubx.py index f8116fa..3050982 100755 --- a/ubx.py +++ b/ubx.py @@ -2,13 +2,11 @@ import asyncio import aioserial -import aiomqtt import pyubx2 import io import logging import aioserial import asyncio -import socket import signal from mqtthandler.command import command @@ -47,10 +45,10 @@ class UBXAsyncParser: class UBXHandler(MQTTHandler): def __init__( self, - handler_id: str, + name: str, serial_port: aioserial.AioSerial, ): - super().__init__(handler_id) + super().__init__(name) self.serial_port = serial_port @task @@ -173,10 +171,8 @@ class UBXHandler(MQTTHandler): async def main(): - handler_id = f"example-gps-{socket.gethostname()}" - handler = UBXHandler( - handler_id, + "example-gps", aioserial.AioSerial( port="/tmp/ttyV0", baudrate=115200,