#! /usr/bin/env python3 import socket import signal import asyncio from dataclasses import dataclass import aiohttp from mqtthandler.command import command from mqtthandler.handler import MQTTConfig, MQTTHandler, task from prometheus_client.parser import text_string_to_metric_families @dataclass class MediaMTXConfig: host: str = "http://localhost" metrics_path: str = ":9998/metrics" username: str | None = "admin" password: str | None = "admin" class MediaMTXHandler(MQTTHandler): def __init__( self, mqtt_config: MQTTConfig, handler_id: str, mediamtx_config: MediaMTXConfig, ): super().__init__(mqtt_config, handler_id) self.config = mediamtx_config @task async def retrieve_metrics(self): cache = {} while True: auth = aiohttp.BasicAuth(login=self.config.username, password=self.config.password) async with aiohttp.ClientSession(auth=auth) as session: while True: async with session.get(f"{self.config.host}{self.config.metrics_path}") as r: metrics = await r.text() for family in text_string_to_metric_families(metrics): for sample in family.samples: topic = sample.name.replace("_", "/") if cache.get(topic) != sample.value: cache[topic] = sample.value print(topic, sample.value) await self.set_property(topic, sample.value, 0, True) await asyncio.sleep(1) print("Connection dropped!") await asyncio.sleep(10) async def main(): handler_id = f"mediamtx-{socket.gethostname()}" mqtt_config = MQTTConfig(host="127.0.0.1") handler = MediaMTXHandler(mqtt_config, handler_id, MediaMTXConfig()) signal.signal(signal.SIGINT, lambda signum, frame: handler.stop()) await handler.run() if __name__ == "__main__": asyncio.run(main())