mqttdevicemanager/mediamtx.py

69 lines
2.1 KiB
Python
Executable File

#! /usr/bin/env python3
import socket
import signal
import asyncio
from dataclasses import dataclass
import aiohttp
from mqtthandler.command import command
from mqtthandler.handler import MQTTConfig, MQTTHandler, task
from prometheus_client.parser import text_string_to_metric_families
@dataclass
class MediaMTXConfig:
host: str = "http://localhost"
metrics_path: str = ":9998/metrics"
username: str | None = "admin"
password: str | None = "admin"
class MediaMTXHandler(MQTTHandler):
def __init__(
self,
mqtt_config: MQTTConfig,
handler_id: str,
mediamtx_config: MediaMTXConfig,
):
super().__init__(mqtt_config, handler_id)
self.config = mediamtx_config
@task
async def retrieve_metrics(self):
cache = {}
while True:
auth = aiohttp.BasicAuth(login=self.config.username, password=self.config.password)
async with aiohttp.ClientSession(auth=auth) as session:
while True:
async with session.get(f"{self.config.host}{self.config.metrics_path}") as r:
metrics = await r.text()
for family in text_string_to_metric_families(metrics):
for sample in family.samples:
topic = sample.name.replace("_", "/")
if cache.get(topic) != sample.value:
cache[topic] = sample.value
print(topic, sample.value)
await self.set_property(topic, sample.value, 0, True)
await asyncio.sleep(1)
print("Connection dropped!")
await asyncio.sleep(10)
async def main():
handler_id = f"mediamtx-{socket.gethostname()}"
mqtt_config = MQTTConfig(host="127.0.0.1")
handler = MediaMTXHandler(mqtt_config, handler_id, MediaMTXConfig())
signal.signal(signal.SIGINT, lambda signum, frame: handler.stop())
await handler.run()
if __name__ == "__main__":
asyncio.run(main())