diff --git a/console/src/components/dashboard/CommandsWidget.vue b/console/src/components/dashboard/CommandsWidget.vue index f33f855..291cdb6 100644 --- a/console/src/components/dashboard/CommandsWidget.vue +++ b/console/src/components/dashboard/CommandsWidget.vue @@ -104,10 +104,10 @@ onMounted(() => { const command = parts[3] const field = parts[4] - if (field === 'description') { + if (field === '$description') { updateCommand(device, command, 'description', payload) } - if (field === 'schema') { + if (field === '$schema') { const schema = JSON.parse(payload) updateCommand(device, command, 'schema', schema) } diff --git a/data/StarWars60.mp3 b/data/StarWars60.mp3 new file mode 100644 index 0000000..6284b4e Binary files /dev/null and b/data/StarWars60.mp3 differ diff --git a/data/drum-cadence-a.ogg b/data/drum-cadence-a.ogg new file mode 100644 index 0000000..38026a7 Binary files /dev/null and b/data/drum-cadence-a.ogg differ diff --git a/mediamtx.py b/mediamtx.py index 2f302d1..32cc59b 100755 --- a/mediamtx.py +++ b/mediamtx.py @@ -8,7 +8,7 @@ import aiohttp from mqtthandler.command import command from mqtthandler.handler import MQTTConfig, MQTTHandler, task - +from prometheus_client.parser import text_string_to_metric_families @dataclass class MediaMTXConfig: @@ -38,16 +38,16 @@ class MediaMTXHandler(MQTTHandler): while True: async with session.get(f"{self.config.host}{self.config.metrics_path}") as r: metrics = await r.text() - - for line in metrics.split("\n")[:-1]: - metric, value = line.split(" ") - topic = metric.replace("_", "/") - full_topic = f"{self.property_topic}/{topic}" - if cache.get(full_topic) != value: - cache[full_topic] = value - await self.mqtt_client.publish(full_topic, value, retain=True) + for family in text_string_to_metric_families(metrics): + for sample in family.samples: + topic = sample.name.replace("_", "/") + if cache.get(topic) != sample.value: + cache[topic] = sample.value + print(topic, sample.value) + + await self.set_property(topic, sample.value, 0, True) await asyncio.sleep(1) diff --git a/mqtthandler/command.py b/mqtthandler/command.py index 6a49c2e..da49795 100644 --- a/mqtthandler/command.py +++ b/mqtthandler/command.py @@ -1,5 +1,6 @@ import json import jsonschema +import inflection from dataclasses import dataclass, asdict @@ -12,8 +13,11 @@ class CommandExecutionError(RuntimeError): class Command: - def __init__(self, name: str, schema, handler=None, **kwargs): + def __init__( + self, name: str, description: str, schema: dict, handler=None, **kwargs + ): self.name = name + self.description = description self.schema = schema self.handler = handler self.additional_properties = kwargs @@ -39,9 +43,15 @@ class Command: raise CommandExecutionError(f"Command execution failed internally.") -def command(schema, **kwargs): +def command(schema: dict, description: str, name: str | None = None, **kwargs): def decorator(func): - return Command(func.__name__, schema, handler=func, **kwargs) + return Command( + name if name is not None else func.__name__, + description, + schema, + handler=func, + **kwargs, + ) return decorator diff --git a/mqtthandler/handler.py b/mqtthandler/handler.py index b7255f8..5f0229e 100644 --- a/mqtthandler/handler.py +++ b/mqtthandler/handler.py @@ -24,6 +24,7 @@ class MQTTConfig: class MQTTHandler: + STATUS = "status" def __init__( self, @@ -36,11 +37,10 @@ class MQTTHandler: self.topic_base = f"device/{handler_id}" self.command_topic = f"{self.topic_base}/command" self.property_topic = f"{self.topic_base}/property" - self.status_topic = f"{self.property_topic}/status" self._shutdown_event = asyncio.Event() will = aiomqtt.Will( - topic=self.status_topic, payload="OFFLINE", qos=1, retain=True + topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True ) self.mqtt_client = aiomqtt.Client( @@ -51,31 +51,53 @@ class MQTTHandler: will=will, ) + self.commands = self.get_available_commands() + def get_available_commands(self): commands = {} for base in self.__class__.__mro__: for name, attr in vars(base).items(): if isinstance(attr, Command): - commands[name] = attr + print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'") + commands[attr.name] = attr return commands - async def publish_commands(self): - for name, command in self.get_available_commands().items(): - await self.mqtt_client.publish( - f"{self.command_topic}/{command.name}/schema", - json.dumps(command.schema), - qos=1, - retain=True, - ) - for k, v in command.additional_properties.items(): + async def register_commands(self): + for name, command in self.commands.items(): + for k, v in { + "schema": json.dumps(command.schema), + "description": command.description, + **command.additional_properties, + }.items(): await self.mqtt_client.publish( - f"{self.command_topic}/{command.name}/{k}", + f"{self.command_topic}/{command.name}/${k}", str(v), qos=1, retain=True, ) + async def register_property(self, property: str, description: str | None = None, schema: dict | None = None): + data = { + "schema": json.dumps(schema), + "description": description, + } + for k, v in {k:v for k,v in data.items() if v is not None}.items(): + await self.mqtt_client.publish( + f"{self.property_topic}/{property}/${k}", + str(v), + qos=1, + retain=True, + ) + + async def set_property(self, property: str, value, qos=0, retain=False): + await self.mqtt_client.publish( + f"{self.property_topic}/{property}", + str(value), + qos=qos, + retain=retain, + ) + async def execute_command( self, command_name: str, @@ -101,7 +123,7 @@ class MQTTHandler: ) try: - command = self.get_available_commands()[command_name] + command = self.commands[command_name] result = await command(self, payload) await respond(True, result) except (CommandArgumentError, CommandExecutionError) as e: @@ -121,7 +143,7 @@ class MQTTHandler: async def shutdown_watcher(self): await self._shutdown_event.wait() - await self.mqtt_client.publish(self.status_topic, "OFFLINE", qos=1, retain=True) + await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True) def stop(self): self._shutdown_event.set() @@ -133,12 +155,14 @@ class MQTTHandler: try: async with self.mqtt_client as client: await client.subscribe(f"{self.command_topic}/+") - await self.publish_commands() + await self.register_commands() # announce that we are online - await client.publish( - self.status_topic, "ONLINE", qos=1, retain=True - ) + await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True) + await self.register_property("status", "Indicates the status of the device.", { + "type": "string", + "enum": ["ONLINE", "OFFLINE"] + }) tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()] diff --git a/radio.py b/radio.py new file mode 100755 index 0000000..b8b3b9d --- /dev/null +++ b/radio.py @@ -0,0 +1,60 @@ +#! /usr/bin/env python3 + +import socket +import signal +import asyncio +from dataclasses import dataclass + +from mqtthandler.command import command +from mqtthandler.handler import MQTTConfig, MQTTHandler, task + +from streamer.fileradio import FileRadio + +class RadioHandler(MQTTHandler): + def __init__( + self, + mqtt_config: MQTTConfig, + handler_id: str, + ): + super().__init__(mqtt_config, handler_id) + + self.radio = FileRadio("./data/StarWars60.mp3", handler_id) + + @task + async def publish_stream_path(self): + await self.set_property("path", self.radio.stream_path(), 1, True) + await self.set_property("file", self.radio.path, 1, True) + + @command({"type": "object"}, "Start the radio stream.") + async def start(self, args): + print("Start requested") + self.radio.start_stream() + + # Manually renamed this one bc it conflicts with the base class version + @command({"type": "object"}, "Stop the radio stream.", "stop") + async def stop_stream(self, args): + print("Stop requested") + self.radio.end_stream() + + @command({"type": "string"}, "Change the stream path.") + async def change_path(self, args): + self.radio = FileRadio(self.radio.path, args) + await self.publish_stream_path() + + @command({"type": "string"}, "Tune the radio for different playback.") + async def tune(self, args): + print("Tune request: ", args) + self.radio = FileRadio(args, self.radio.name) + await self.publish_stream_path() + +async def main(): + handler_id = f"radio-{socket.gethostname()}" + mqtt_config = MQTTConfig(host="127.0.0.1") + handler = RadioHandler(mqtt_config, handler_id) + + signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) + await handler.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt index a15df68..3e8150b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ pyubxutils jsonschema aiohttp aiodns +inflection +prometheus-client diff --git a/streamer/__init__.py b/streamer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/streamer/fileradio.py b/streamer/fileradio.py new file mode 100644 index 0000000..8fb243c --- /dev/null +++ b/streamer/fileradio.py @@ -0,0 +1,60 @@ +from .streamer import Streamer, is_alive +import subprocess +import time +import sys +import os + + +class FileRadio(Streamer): + REST_PATH = "sample" + + def __init__(self, path, name=None): + super().__init__() + self.path = path + self.basename = os.path.basename(self.path) + if name is None: + self.name, self.ext = os.path.splitext(self.basename) + else: + self.name = name + + def _stream_thread(self): + self.playback = subprocess.Popen( + [ + "/usr/bin/ffmpeg", + "-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag + "-stream_loop", # Loop the stream - + "-1", # ...indefinitely + "-i", + self.path, + "-c:a", + "aac", + "-f", + "rtsp", + "-rtsp_transport", + "tcp", + self.stream_address("localhost"), + ], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + while self.run: + if not is_alive(self.playback): + print("Playback failed, aborting stream.", file=sys.stderr) + break + time.sleep(0.1) + + self.run = False + + self.playback.kill() + self.playback.wait() + self.playback = None + + +if __name__ == "__main__": + fr = FileRadio("./data/sampleaudio/taunt.mp3") + fr.start_stream() + + while True: + time.sleep(1) diff --git a/streamer/requirements.txt b/streamer/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/streamer/streamer.py b/streamer/streamer.py new file mode 100644 index 0000000..de92ba0 --- /dev/null +++ b/streamer/streamer.py @@ -0,0 +1,39 @@ +from threading import Thread + +def is_alive(subprocess): + return True if (subprocess and subprocess.poll() is None) else False + + +class Streamer: + PROTOCOL = "rtsp" + REST_PATH = "stream" + + def __init__(self): + self.run = False + self.thread = None + self.name = None + + def stream_path(self): + return f"{type(self).REST_PATH}/{self.name}" + + def stream_address(self, host, port=8554): + return f"{type(self).PROTOCOL}://{host}:{port}/{self.stream_path()}" + + def is_streaming(self): + return True if (self.thread and self.thread.is_alive()) else False + + def start_stream(self): + if self.is_streaming(): + raise RuntimeError("Stream thread is already running") + + self.run = True + self.thread = Thread(target=self._stream_thread, daemon=True, args=()) + self.thread.start() + + def end_stream(self): + if self.thread is None: + raise RuntimeError("No stream thread to terminate") + + self.run = False + self.thread.join() + self.thread = None diff --git a/ubx.py b/ubx.py index 3d69695..e851568 100755 --- a/ubx.py +++ b/ubx.py @@ -76,13 +76,13 @@ class UBXHandler(MQTTHandler): if name.startswith("_"): continue - topic = f"{self.property_topic}/{message.identity}/{name}" - await self.mqtt_client.publish(topic, value, qos=1, retain=True) + property = f"{message.identity}/{name}" + await self.set_property(property, value, 1, True) else: # print("Unexpected response:", message) pass - @command({"type": "number"}, description="An example command") + @command({"type": "number"}, "An example command", foo="bar") async def example_cmd(self, args): print(f"Executing command with args {args}") @@ -118,7 +118,7 @@ class UBXHandler(MQTTHandler): "required": ["portID", "baudRate"], "additionalProperties": False, }, - description="Reconfigure the serial port for the UBX simulator.", + "Reconfigure the serial port for the UBX simulator.", ) async def configure_port(self, args): message = pyubx2.UBXMessage( @@ -158,7 +158,7 @@ class UBXHandler(MQTTHandler): "required": ["measRate", "navRate", "timeRef"], "additionalProperties": False, }, - description="Reconfigure the rate properties for the UBX device.", + "Reconfigure the rate properties for the UBX device.", ) async def configure_rate(self, args): message = pyubx2.UBXMessage(