Expanded the number of devices to fileradios, some tweaks to how the handlers work in general

This commit is contained in:
Jono Targett 2026-03-18 23:19:12 +10:30
parent 223dd5cdc5
commit 1dac5e35f6
13 changed files with 233 additions and 38 deletions

View File

@ -104,10 +104,10 @@ onMounted(() => {
const command = parts[3]
const field = parts[4]
if (field === 'description') {
if (field === '$description') {
updateCommand(device, command, 'description', payload)
}
if (field === 'schema') {
if (field === '$schema') {
const schema = JSON.parse(payload)
updateCommand(device, command, 'schema', schema)
}

BIN
data/StarWars60.mp3 Normal file

Binary file not shown.

BIN
data/drum-cadence-a.ogg Normal file

Binary file not shown.

View File

@ -8,7 +8,7 @@ import aiohttp
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from prometheus_client.parser import text_string_to_metric_families
@dataclass
class MediaMTXConfig:
@ -38,16 +38,16 @@ class MediaMTXHandler(MQTTHandler):
while True:
async with session.get(f"{self.config.host}{self.config.metrics_path}") as r:
metrics = await r.text()
for line in metrics.split("\n")[:-1]:
metric, value = line.split(" ")
topic = metric.replace("_", "/")
full_topic = f"{self.property_topic}/{topic}"
if cache.get(full_topic) != value:
cache[full_topic] = value
await self.mqtt_client.publish(full_topic, value, retain=True)
for family in text_string_to_metric_families(metrics):
for sample in family.samples:
topic = sample.name.replace("_", "/")
if cache.get(topic) != sample.value:
cache[topic] = sample.value
print(topic, sample.value)
await self.set_property(topic, sample.value, 0, True)
await asyncio.sleep(1)

View File

@ -1,5 +1,6 @@
import json
import jsonschema
import inflection
from dataclasses import dataclass, asdict
@ -12,8 +13,11 @@ class CommandExecutionError(RuntimeError):
class Command:
def __init__(self, name: str, schema, handler=None, **kwargs):
def __init__(
self, name: str, description: str, schema: dict, handler=None, **kwargs
):
self.name = name
self.description = description
self.schema = schema
self.handler = handler
self.additional_properties = kwargs
@ -39,9 +43,15 @@ class Command:
raise CommandExecutionError(f"Command execution failed internally.")
def command(schema, **kwargs):
def command(schema: dict, description: str, name: str | None = None, **kwargs):
def decorator(func):
return Command(func.__name__, schema, handler=func, **kwargs)
return Command(
name if name is not None else func.__name__,
description,
schema,
handler=func,
**kwargs,
)
return decorator

View File

@ -24,6 +24,7 @@ class MQTTConfig:
class MQTTHandler:
STATUS = "status"
def __init__(
self,
@ -36,11 +37,10 @@ class MQTTHandler:
self.topic_base = f"device/{handler_id}"
self.command_topic = f"{self.topic_base}/command"
self.property_topic = f"{self.topic_base}/property"
self.status_topic = f"{self.property_topic}/status"
self._shutdown_event = asyncio.Event()
will = aiomqtt.Will(
topic=self.status_topic, payload="OFFLINE", qos=1, retain=True
topic=f"{self.property_topic}/{MQTTHandler.STATUS}", payload="OFFLINE", qos=1, retain=True
)
self.mqtt_client = aiomqtt.Client(
@ -51,31 +51,53 @@ class MQTTHandler:
will=will,
)
self.commands = self.get_available_commands()
def get_available_commands(self):
commands = {}
for base in self.__class__.__mro__:
for name, attr in vars(base).items():
if isinstance(attr, Command):
commands[name] = attr
print(f"Registering method {type(self).__name__}.{name} as command '{attr.name}'")
commands[attr.name] = attr
return commands
async def publish_commands(self):
for name, command in self.get_available_commands().items():
await self.mqtt_client.publish(
f"{self.command_topic}/{command.name}/schema",
json.dumps(command.schema),
qos=1,
retain=True,
)
for k, v in command.additional_properties.items():
async def register_commands(self):
for name, command in self.commands.items():
for k, v in {
"schema": json.dumps(command.schema),
"description": command.description,
**command.additional_properties,
}.items():
await self.mqtt_client.publish(
f"{self.command_topic}/{command.name}/{k}",
f"{self.command_topic}/{command.name}/${k}",
str(v),
qos=1,
retain=True,
)
async def register_property(self, property: str, description: str | None = None, schema: dict | None = None):
data = {
"schema": json.dumps(schema),
"description": description,
}
for k, v in {k:v for k,v in data.items() if v is not None}.items():
await self.mqtt_client.publish(
f"{self.property_topic}/{property}/${k}",
str(v),
qos=1,
retain=True,
)
async def set_property(self, property: str, value, qos=0, retain=False):
await self.mqtt_client.publish(
f"{self.property_topic}/{property}",
str(value),
qos=qos,
retain=retain,
)
async def execute_command(
self,
command_name: str,
@ -101,7 +123,7 @@ class MQTTHandler:
)
try:
command = self.get_available_commands()[command_name]
command = self.commands[command_name]
result = await command(self, payload)
await respond(True, result)
except (CommandArgumentError, CommandExecutionError) as e:
@ -121,7 +143,7 @@ class MQTTHandler:
async def shutdown_watcher(self):
await self._shutdown_event.wait()
await self.mqtt_client.publish(self.status_topic, "OFFLINE", qos=1, retain=True)
await self.set_property(MQTTHandler.STATUS, "OFFLINE", qos=1, retain=True)
def stop(self):
self._shutdown_event.set()
@ -133,12 +155,14 @@ class MQTTHandler:
try:
async with self.mqtt_client as client:
await client.subscribe(f"{self.command_topic}/+")
await self.publish_commands()
await self.register_commands()
# announce that we are online
await client.publish(
self.status_topic, "ONLINE", qos=1, retain=True
)
await self.set_property(MQTTHandler.STATUS, "ONLINE", qos=1, retain=True)
await self.register_property("status", "Indicates the status of the device.", {
"type": "string",
"enum": ["ONLINE", "OFFLINE"]
})
tasks = [self.mqtt_command_writer_task(), self.shutdown_watcher()]

60
radio.py Executable file
View File

@ -0,0 +1,60 @@
#! /usr/bin/env python3
import socket
import signal
import asyncio
from dataclasses import dataclass
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from streamer.fileradio import FileRadio
class RadioHandler(MQTTHandler):
def __init__(
self,
mqtt_config: MQTTConfig,
handler_id: str,
):
super().__init__(mqtt_config, handler_id)
self.radio = FileRadio("./data/StarWars60.mp3", handler_id)
@task
async def publish_stream_path(self):
await self.set_property("path", self.radio.stream_path(), 1, True)
await self.set_property("file", self.radio.path, 1, True)
@command({"type": "object"}, "Start the radio stream.")
async def start(self, args):
print("Start requested")
self.radio.start_stream()
# Manually renamed this one bc it conflicts with the base class version
@command({"type": "object"}, "Stop the radio stream.", "stop")
async def stop_stream(self, args):
print("Stop requested")
self.radio.end_stream()
@command({"type": "string"}, "Change the stream path.")
async def change_path(self, args):
self.radio = FileRadio(self.radio.path, args)
await self.publish_stream_path()
@command({"type": "string"}, "Tune the radio for different playback.")
async def tune(self, args):
print("Tune request: ", args)
self.radio = FileRadio(args, self.radio.name)
await self.publish_stream_path()
async def main():
handler_id = f"radio-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = RadioHandler(mqtt_config, handler_id)
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -6,3 +6,5 @@ pyubxutils
jsonschema
aiohttp
aiodns
inflection
prometheus-client

0
streamer/__init__.py Normal file
View File

60
streamer/fileradio.py Normal file
View File

@ -0,0 +1,60 @@
from .streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class FileRadio(Streamer):
REST_PATH = "sample"
def __init__(self, path, name=None):
super().__init__()
self.path = path
self.basename = os.path.basename(self.path)
if name is None:
self.name, self.ext = os.path.splitext(self.basename)
else:
self.name = name
def _stream_thread(self):
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream -
"-1", # ...indefinitely
"-i",
self.path,
"-c:a",
"aac",
"-f",
"rtsp",
"-rtsp_transport",
"tcp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
fr = FileRadio("./data/sampleaudio/taunt.mp3")
fr.start_stream()
while True:
time.sleep(1)

View File

39
streamer/streamer.py Normal file
View File

@ -0,0 +1,39 @@
from threading import Thread
def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False
class Streamer:
PROTOCOL = "rtsp"
REST_PATH = "stream"
def __init__(self):
self.run = False
self.thread = None
self.name = None
def stream_path(self):
return f"{type(self).REST_PATH}/{self.name}"
def stream_address(self, host, port=8554):
return f"{type(self).PROTOCOL}://{host}:{port}/{self.stream_path()}"
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError("Stream thread is already running")
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError("No stream thread to terminate")
self.run = False
self.thread.join()
self.thread = None

10
ubx.py
View File

@ -76,13 +76,13 @@ class UBXHandler(MQTTHandler):
if name.startswith("_"):
continue
topic = f"{self.property_topic}/{message.identity}/{name}"
await self.mqtt_client.publish(topic, value, qos=1, retain=True)
property = f"{message.identity}/{name}"
await self.set_property(property, value, 1, True)
else:
# print("Unexpected response:", message)
pass
@command({"type": "number"}, description="An example command")
@command({"type": "number"}, "An example command", foo="bar")
async def example_cmd(self, args):
print(f"Executing command with args {args}")
@ -118,7 +118,7 @@ class UBXHandler(MQTTHandler):
"required": ["portID", "baudRate"],
"additionalProperties": False,
},
description="Reconfigure the serial port for the UBX simulator.",
"Reconfigure the serial port for the UBX simulator.",
)
async def configure_port(self, args):
message = pyubx2.UBXMessage(
@ -158,7 +158,7 @@ class UBXHandler(MQTTHandler):
"required": ["measRate", "navRate", "timeRef"],
"additionalProperties": False,
},
description="Reconfigure the rate properties for the UBX device.",
"Reconfigure the rate properties for the UBX device.",
)
async def configure_rate(self, args):
message = pyubx2.UBXMessage(