sdrplay-fm-radio/microservice.py
2023-06-15 14:22:05 +09:30

294 lines
7.6 KiB
Python
Executable File

#! /usr/bin/env python3
import os
import sys
import requests
import SoapySDR as soapy
from radio import Radio
from tuuube import Tuuube
from fileradio import FileRadio
from flask import Flask, jsonify
from flasgger import Swagger
app = Flask(__name__)
swag = Swagger(app)
radios = {}
@app.route('/report')
def report():
"""Get streams report from the RTSP relay.
---
responses:
200:
description: JSON
"""
try:
r = requests.get("http://localhost:9997/v1/paths/list")
j = r.json()
for item in j['items']:
del j['items'][item]['conf']
del j['items'][item]['confName']
return jsonify(j)
except Exception as e:
error_message = {"error_message": e.message}
return jsonify(error_message), 500
@app.route('/radio/report')
def radio_report():
"""List radio devices available to the system.
---
responses:
200:
description: A list of radio devices.
"""
devices = [dict(device) for device in soapy.Device.enumerate()]
return jsonify(devices)
@app.route('/radio/<radio>/connect')
def connect(radio):
"""Connect to a radio device, by driver name or serial number.
---
parameters:
- name: radio
description: Radio device driver name, or serial number.
in: path
type: string
required: true
responses:
200:
description: Successfully connected to a radio.
400:
description: No radio device by that name is available.
500:
description: Failed to connect to radio.
"""
if radio in radios:
return "Radio device already connected", 400
devices = [dict(device) for device in soapy.Device.enumerate()]
for device in devices:
if radio in device.values():
try:
radios[radio] = Radio(radio, device)
return "", 200
except Exception as e:
radios.pop(radio)
error_message = {"error_message": e.message}
return jsonify(error_message), 500
return "Radio device not found", 400
@app.route('/radio/<radio>/disconnect')
def disconnect(radio):
"""Disconnect from a radio device.
---
parameters:
- name: radio
description: Radio device driver name, or serial number.
in: path
type: string
required: true
responses:
200:
description: Successfully connected to a radio.
400:
description: No radio device by that name is available.
500:
description: An unknown error occurred.
"""
if radio in radios:
radios.pop(radio)
return "", 200
else:
error_message = {"error_message": "Radio not connected"}
return jsonify(error_message), 400
@app.route('/radio/<radio>/configure/<frequency>')
def configure(radio, frequency):
"""Tune the radio to a frequency.
You must connect to the radio before attempting to configure it.
The radio must be configured before streaming can be started.
---
parameters:
- name: radio
description: Radio device driver name, or serial number.
in: path
type: string
required: true
- name: frequency
description: Frequency (in Hz) to tune the radio to. Suffixing is also supported, ie 87.5M, 108.0M, etc.
in: path
type: string
required: true
responses:
200:
description: JSON
400:
description: The specified radio is not connected.
"""
if radio in radios:
return jsonify(radios[radio].configure(frequency))
else:
error_message = {"error_message": "Radio not connected"}
return jsonify(error_message), 400
@app.route('/radio/<radio>/start')
def start_stream(radio):
"""Start the radio stream.
Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/radio/[radio]
---
parameters:
- name: radio
description: Radio device driver name, or serial number.
in: path
type: string
required: true
"""
try:
radios[radio].start_stream()
return "", 200
except Exception as e:
error_message = {"error_message": e.message}
return jsonify(error_message), 400
@app.route('/radio/<radio>/end')
def end_stream(radio):
"""Terminate the radio stream.
---
parameters:
- name: radio
description: Radio device driver name, or serial number.
in: path
type: string
required: true
"""
try:
radios[radio].end_stream()
return "", 200
except Exception as e:
error_message = {"error_message": e.message}
return jsonify(error_message), 400
@app.route('/radio/<radio>/info')
def radio_info(radio):
"""Get information about a radio.
---
parameters:
- name: radio
description: Radio device driver name, or serial number.
in: path
type: string
required: true
responses:
200:
description: JSON
400:
description: The specified radio is not connected.
"""
try:
return jsonify(radios[radio].get_info())
except Exception as e:
error_message = {"error_message": e.message}
return error_message, 400
tubes = {}
@app.route('/tuuube/<id>/start')
def start_tuuube_stream(id):
"""Start streaming from a youtube source.
Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/tuuube/[id]
---
parameters:
- name: id
description:
The youtube video ID. That is the part following the `watch?v=` in the URL. For example, `dQw4w9WgXcQ`.\n
Other good options are - \n
`BaW_jenozKc`, yt_dlp package test video.\n
`b2je8uBlgFM`, stereo audio test.\n
`LDU_Txk06tM`, crab rave, a commonly used audio fidelity test.\n
`sPT_epMLkwQ`, Kilsyth CFA major factory fire dispatch radio traffic.
in: path
type: string
required: true
"""
if id not in tubes:
tubes[id] = Tuuube(id)
try:
tubes[id].start_stream()
return "", 200
except Exception as e:
error_message = {"error_message": e.message}
return jsonify(error_message), 400
@app.route('/tuuube/<id>/end')
def end_tuuube_stream(id):
"""Terminate the youtube stream.
---
parameters:
- name: id
description: The youtube video ID.
in: path
type: string
required: true
"""
try:
tubes[id].end_stream()
return "", 200
except Exception as e:
error_message = {"error_message": e.message}
return jsonify(error_message), 400
if __name__ == '__main__':
import subprocess
rtsp_relay = subprocess.Popen(
[
'./dependencies/mediamtx/mediamtx',
'./mediamtx.yml'
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
for path, _, files in os.walk('./data/sampleaudio'):
for file in files:
name,ext = os.path.splitext(file)
if ext == '.mp3':
tubes[name] = FileRadio(f"{path}/{file}")
tubes[name].start_stream()
app.run(
host='0.0.0.0',
threaded=True,
debug=False
)
print('Stopping any currently streaming radios...')
for radio in radios:
if radios[radio].is_streaming():
radios[radio].end_stream()
radios = None
for tube in tubes:
if tubes[tube].is_streaming():
tubes[tube].end_stream()
tubes = None
print('Killing RTSP relay...')
rtsp_relay.kill()
rtsp_relay.wait() # Necessary?