Compare commits

...

12 Commits

25 changed files with 723 additions and 355 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

1
data/sampleaudio/SOURCE Normal file
View File

@ -0,0 +1 @@
https://www2.cs.uic.edu/~i101/SoundFiles/

Binary file not shown.

Binary file not shown.

BIN
data/sampleaudio/taunt.mp3 Normal file

Binary file not shown.

55
fileradio.py Normal file
View File

@ -0,0 +1,55 @@
from streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class FileRadio(Streamer):
REST_PATH = "sample"
def __init__(self, path):
super().__init__()
self.path = path
self.basename = os.path.basename(self.path)
self.name, self.ext = os.path.splitext(self.basename)
def _stream_thread(self):
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream -
"-1", # ...indefinitely
"-i",
self.path,
"-c",
"copy",
"-f",
"rtsp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
fr = FileRadio("./data/sampleaudio/taunt.mp3")
fr.start_stream()
while True:
time.sleep(1)

View File

@ -10,8 +10,9 @@
import numpy, math, sys, time
from numpy import fft
def impulse(mask):
''' Convert frequency domain mask to time-domain '''
"""Convert frequency domain mask to time-domain"""
# Negative side, a mirror of positive side
negatives = mask[1:-1]
negatives.reverse()
@ -22,15 +23,15 @@ def impulse(mask):
impulse_response = fft.ifft(mask).real.tolist()
# swap left and right sides
left = impulse_response[:fft_length // 2]
right = impulse_response[fft_length // 2:]
left = impulse_response[: fft_length // 2]
right = impulse_response[fft_length // 2 :]
impulse_response = right + left
return impulse_response
def lo_mask(sample_rate, tap_count, freq, dboct):
''' Create a freq domain mask for a lowpass filter '''
"""Create a freq domain mask for a lowpass filter"""
order = dboct / 6
max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0)
@ -38,14 +39,14 @@ def lo_mask(sample_rate, tap_count, freq, dboct):
freq /= f2s
l = tap_count // 2
mask = []
for f in range(0, l+1):
H = 1.0 / ( 1 + (f / freq) ** (2 * order) ) ** 0.5
for f in range(0, l + 1):
H = 1.0 / (1 + (f / freq) ** (2 * order)) ** 0.5
mask.append(H)
return mask
def hi_mask(sample_rate, tap_count, freq, dboct):
''' Create a freq domain mask for a highpass filter '''
"""Create a freq domain mask for a highpass filter"""
order = dboct / 6
max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0)
@ -53,25 +54,25 @@ def hi_mask(sample_rate, tap_count, freq, dboct):
freq /= f2s
l = tap_count // 2
mask = []
for f in range(0, l+1):
H = 1.0 / ( 1 + (freq / (f + 0.0001)) ** (2 * order) ) ** 0.5
for f in range(0, l + 1):
H = 1.0 / (1 + (freq / (f + 0.0001)) ** (2 * order)) ** 0.5
mask.append(H)
return mask
def combine_masks(mask1, mask2):
''' Combine two filter masks '''
"""Combine two filter masks"""
assert len(mask1) == len(mask2)
return [ mask1[i] * mask2[i] for i in range(0, len(mask1)) ]
return [mask1[i] * mask2[i] for i in range(0, len(mask1))]
def taps(sample_rate, freq, dboct, is_highpass):
cutoff_octaves = 60 / dboct
if is_highpass:
cutoff = freq / 2 ** cutoff_octaves
cutoff = freq / 2**cutoff_octaves
else:
cutoff = freq * 2 ** cutoff_octaves
cutoff = freq * 2**cutoff_octaves
cutoff = min(cutoff, sample_rate / 2)
transition_band = abs(freq - cutoff)
@ -87,8 +88,8 @@ class filter:
def feed(self, original):
unfiltered = numpy.concatenate((self.buf, original))
self.buf = unfiltered[-len(self.coefs):]
filtered = numpy.convolve(unfiltered, self.coefs, mode='valid')
self.buf = unfiltered[-len(self.coefs) :]
filtered = numpy.convolve(unfiltered, self.coefs, mode="valid")
assert len(filtered) == len(original) + 1
return filtered[1:]
@ -98,7 +99,7 @@ class low_pass(filter):
tap_count = taps(sample_rate, f, dbo, False)
mask = lo_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
self.buf = [0 for n in self.coefs]
class high_pass(filter):
@ -106,18 +107,19 @@ class high_pass(filter):
tap_count = taps(sample_rate, f, dbo, True)
mask = hi_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
self.buf = [0 for n in self.coefs]
class band_pass(filter):
def __init__(self, sample_rate, lo, hi, dbo):
tap_count = max(taps(sample_rate, lo, dbo, True),
taps(sample_rate, hi, dbo, False))
tap_count = max(
taps(sample_rate, lo, dbo, True), taps(sample_rate, hi, dbo, False)
)
lomask = lo_mask(sample_rate, tap_count, hi, dbo)
himask = hi_mask(sample_rate, tap_count, lo, dbo)
mask = combine_masks(lomask, himask)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
self.buf = [0 for n in self.coefs]
class deemphasis(filter):
@ -131,8 +133,9 @@ class deemphasis(filter):
# slope in dB/octave of deemphasis filter
dedbo = 10 / octaves
tap_count = max(taps(sample_rate, lo, dedbo, False),
taps(sample_rate, hi, final_dbo, False))
tap_count = max(
taps(sample_rate, lo, dedbo, False), taps(sample_rate, hi, final_dbo, False)
)
# Calculate deemphasis filter
demask = lo_mask(sample_rate, tap_count, lo, dedbo)
@ -141,7 +144,7 @@ class deemphasis(filter):
mask = combine_masks(demask, fmask)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
self.buf = [0 for n in self.coefs]
class decimator(filter):
@ -155,11 +158,11 @@ class decimator(filter):
# Gets the last n-th sample of every n (n = factor)
# If e.g. gets 12 samples, gets s[4] and s[9], and
# stoves s[10:] to the next round
'''
"""
decimated = [ original[ self.factor * i + self.factor - 1 ] \
for i in range(0, len(original) // self.factor) ]
'''
decimated = original[(self.factor - 1)::self.factor]
self.buf2 = original[:-len(original) % self.factor]
"""
decimated = original[(self.factor - 1) :: self.factor]
self.buf2 = original[: -len(original) % self.factor]
return decimated

View File

@ -19,12 +19,24 @@ from formats import FORMATS
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
parser.add_argument('-f', '--format', choices=list(FORMATS.keys()), help='Input sample format', required=True)
parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True)
parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True)
parser.add_argument(
"-v", "--verbose", help="Print additional informational output", action="store_true"
)
parser.add_argument(
"-f",
"--format",
choices=list(FORMATS.keys()),
help="Input sample format",
required=True,
)
parser.add_argument(
"-s", "--sample-rate", metavar="rate", help="Source sample rate (Hz)", required=True
)
parser.add_argument(
"-d", "--demod-rate", metavar="rate", help="Output sample rate (Hz)", required=True
)
# TODO JMT: Output to file
#parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
# parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
args = parser.parse_args()
INPUT_RATE = int(prefixed.Float(args.sample_rate))
@ -32,7 +44,10 @@ OUTPUT_RATE = int(prefixed.Float(args.demod_rate))
DECIMATION = INPUT_RATE / OUTPUT_RATE
if DECIMATION != math.floor(DECIMATION):
print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr)
print(
f"The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}",
file=sys.stderr,
)
sys.exit(1)
FORMAT = FORMATS[args.format].numpy
@ -63,15 +78,17 @@ decimate2 = filters.decimator(DECIMATION)
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Band-pass filter for stereo (L-R) modulated audio
hi = filters.band_pass(INPUT_RATE,
STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120)
hi = filters.band_pass(
INPUT_RATE, STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120
)
# Filter to extract pilot signal
pilot = filters.band_pass(INPUT_RATE,
STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120)
pilot = filters.band_pass(
INPUT_RATE, STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120
)
last_angle = 0.0
remaining_data = b''
remaining_data = b""
while True:
# Ingest 0.1s worth of data
@ -79,7 +96,7 @@ while True:
if not data:
break
# TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1
#data = remaining_data + data
# data = remaining_data + data
if len(data) < 2 * BYTES_PER_SAMPLE:
remaining_data = data
@ -173,8 +190,7 @@ while True:
last_deviation_avg = 0.0
# Translate rotation to frequency deviation
STEREO_CARRIER /= (1 + (rotation * 1.05) / tau)
STEREO_CARRIER /= 1 + (rotation * 1.05) / tau
# Downsample, Low-pass/deemphasis demodulated L-R
output_jstereo = lo_r.feed(output_jstereo)
@ -196,4 +212,4 @@ while True:
output[1::2] = output_right
output = output.astype(int)
sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))
sys.stdout.buffer.write(struct.pack("<%dh" % len(output), *output))

View File

@ -2,6 +2,7 @@ import numpy as np
from SoapySDR import *
from dataclasses import dataclass
@dataclass
class FormatSpec:
name: str
@ -9,9 +10,10 @@ class FormatSpec:
numpy: np.dtype
packing: str
FORMATS = {
'CU8': FormatSpec('CU8', SOAPY_SDR_CU8, np.uint8, '=%dB'),
'CS8': FormatSpec('CS8', SOAPY_SDR_CS8, np.int8, '=%db'),
'CS16': FormatSpec('CS16', SOAPY_SDR_CS16, np.int16, '=%dh'),
'CF32': FormatSpec('CF32', SOAPY_SDR_CF32, np.float32, '=%df'),
"CU8": FormatSpec("CU8", SOAPY_SDR_CU8, np.uint8, "=%dB"),
"CS8": FormatSpec("CS8", SOAPY_SDR_CS8, np.int8, "=%db"),
"CS16": FormatSpec("CS16", SOAPY_SDR_CS16, np.int16, "=%dh"),
"CF32": FormatSpec("CF32", SOAPY_SDR_CF32, np.float32, "=%df"),
}

View File

@ -38,10 +38,10 @@ udpMaxPayloadSize: 1472
externalAuthenticationURL:
# Enable the HTTP API.
api: no
api: yes
# Enable Prometheus-compatible metrics.
metrics: no
metrics: yes
# Enable pprof-compatible endpoint to monitor performances.
pprof: no

View File

@ -1,7 +1,12 @@
#! /usr/bin/env python3
import os
import sys
import requests
import SoapySDR as soapy
from radio import Radio
from tuuube import Tuuube
from fileradio import FileRadio
from flask import Flask, jsonify
from flasgger import Swagger
@ -13,8 +18,30 @@ swag = Swagger(app)
radios = {}
@app.route('/radio/report')
@app.route("/report")
def report():
"""Get streams report from the RTSP relay.
---
responses:
200:
description: JSON with streams report.
400:
description: JSON with error message.
"""
try:
r = requests.get("http://localhost:9997/v1/paths/list")
j = r.json()
for item in j["items"]:
del j["items"][item]["conf"]
del j["items"][item]["confName"]
return jsonify(j)
except Exception as e:
return _error_message_json(e.message), 500
@app.route("/radio/report")
def radio_report():
"""List radio devices available to the system.
---
responses:
@ -25,7 +52,8 @@ def report():
devices = [dict(device) for device in soapy.Device.enumerate()]
return jsonify(devices)
@app.route('/radio/<radio>/connect')
@app.route("/radio/<radio>/connect")
def connect(radio):
"""Connect to a radio device, by driver name or serial number.
---
@ -37,29 +65,29 @@ def connect(radio):
required: true
responses:
200:
description: Successfully connected to a radio.
description: JSON with message successfully connected to a radio.
400:
description: No radio device by that name is available.
description: JSON with error message No radio device by that name is available.
500:
description: Failed to connect to radio.
description: JSON with error message failed to connect to radio.
"""
if radio in radios:
return "Radio device already connected", 400
return _error_message_json("Radio device already connected"), 400
devices = [dict(device) for device in soapy.Device.enumerate()]
for device in devices:
if radio in device.values():
try:
radios[radio] = Radio(radio, device)
return "", 200
return jsonify("message", "successfully connected radio"), 200
except Exception as e:
radios.pop(radio)
return str(e), 500
return _error_message_json(e.message), 500
return "Radio device not found", 400
@app.route('/radio/<radio>/disconnect')
@app.route("/radio/<radio>/disconnect")
def disconnect(radio):
"""Disconnect from a radio device.
---
@ -79,11 +107,12 @@ def disconnect(radio):
"""
if radio in radios:
radios.pop(radio)
return "", 200
return jsonify("message", "succesfully disconnected radio"), 200
else:
return "Radio not connected", 400
return _error_message_json("Radio not connected"), 400
@app.route('/radio/<radio>/configure/<frequency>')
@app.route("/radio/<radio>/configure/<frequency>")
def configure(radio, frequency):
"""Tune the radio to a frequency.
You must connect to the radio before attempting to configure it.
@ -102,20 +131,21 @@ def configure(radio, frequency):
required: true
responses:
200:
description: JSON
description: JSON with radio configuration.
400:
description: The specified radio is not connected.
"""
if radio in radios:
return jsonify(radios[radio].configure(frequency))
else:
return "Radio not connected", 400
return _error_message_json("Radio not connected"), 400
@app.route('/radio/<radio>/start')
@app.route("/radio/<radio>/start")
def start_stream(radio):
"""Start the radio stream.
Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/radio/[radio]/stream
rtsp://[host]:8554/radio/[radio]
---
parameters:
- name: radio
@ -123,14 +153,20 @@ def start_stream(radio):
in: path
type: string
required: true
responses:
200:
description: JSON with message successful start of radio stream.
400:
description: JSON with error message.
"""
try:
radios[radio].start_stream()
return "", 200
return jsonify("message", "successfully started radio stream"), 200
except Exception as e:
return str(e), 400
return _error_message_json(e.message), 400
@app.route('/radio/<radio>/end')
@app.route("/radio/<radio>/end")
def end_stream(radio):
"""Terminate the radio stream.
---
@ -140,14 +176,21 @@ def end_stream(radio):
in: path
type: string
required: true
responses:
200:
description: JSON with message successful termination of radio stream.
400:
description: JSON with error message.
"""
try:
radios[radio].end_stream()
return "", 200
return jsonify("message", "successfully ended radio stream"), 200
except Exception as e:
return str(e), 400
error_message = {"error_message": e.message}
return _error_message_json(e.message), 400
@app.route('/radio/<radio>/info')
@app.route("/radio/<radio>/info")
def radio_info(radio):
"""Get information about a radio.
---
@ -159,40 +202,117 @@ def radio_info(radio):
required: true
responses:
200:
description: JSON
description: JSON with radio information.
400:
description: The specified radio is not connected.
description: JSON with error message.
"""
try:
return jsonify(radios[radio].get_info())
except Exception as e:
return str(e), 400
return _error_message_json(e.message), 400
if __name__ == '__main__':
tubes = {}
@app.route("/tuuube/<id>/start")
def start_tuuube_stream(id):
"""Start streaming from a youtube source.
Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/tuuube/[id]
---
parameters:
- name: id
description:
The youtube video ID. That is the part following the `watch?v=` in the URL. For example, `dQw4w9WgXcQ`.\n
Other good options are - \n
`BaW_jenozKc`, yt_dlp package test video.\n
`b2je8uBlgFM`, stereo audio test.\n
`LDU_Txk06tM`, crab rave, a commonly used audio fidelity test.\n
`sPT_epMLkwQ`, Kilsyth CFA major factory fire dispatch radio traffic.
in: path
type: string
required: true
responses:
200:
description: JSON with message successful start of youtube stream.
400:
description: JSON with error message.
"""
if id not in tubes:
tubes[id] = Tuuube(id)
try:
tubes[id].start_stream()
return jsonify("message", "successfully started youtube stream"), 200
except Exception as e:
return _error_message_json(e.message), 400
@app.route("/tuuube/<id>/end")
def end_tuuube_stream(id):
"""Terminate the youtube stream.
---
parameters:
- name: id
description: The youtube video ID.
in: path
type: string
required: true
responses:
200:
description: JSON with message successful termination of youtube stream.
400:
description: JSON with error message.
"""
try:
tubes[id].end_stream()
return jsonify("message", "succesfully ended youtube stream"), 200
except Exception as e:
return _error_message_json(e.message), 400
"""
Helper function to return a JSON error message
parameters: error_message - the error message to return
returns: a JSON object with the error message
"""
def _error_message_json(error_message: str):
error_message = {"error_message": error_message}
return jsonify(error_message)
if __name__ == "__main__":
import subprocess
rtsp_relay = subprocess.Popen(
[
'./dependencies/mediamtx/mediamtx',
'./mediamtx.yml'
],
["./dependencies/mediamtx/mediamtx", "./mediamtx.yml"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
stderr=subprocess.DEVNULL,
)
app.run(
host='0.0.0.0',
threaded=True,
debug=False
)
for path, _, files in os.walk("./data/sampleaudio"):
for file in files:
name, ext = os.path.splitext(file)
if ext == ".mp3":
tubes[name] = FileRadio(f"{path}/{file}")
tubes[name].start_stream()
print('Stopping any currently streaming radios...')
app.run(host="0.0.0.0", threaded=True, debug=False)
print("Stopping any currently streaming radios...")
for radio in radios:
if radios[radio].is_streaming():
radios[radio].end_stream()
radios = None
print('Killing RTSP relay...')
for tube in tubes:
if tubes[tube].is_streaming():
tubes[tube].end_stream()
tubes = None
print("Killing RTSP relay...")
rtsp_relay.kill()
rtsp_relay.wait() # Necessary?

150
radio.py
View File

@ -7,19 +7,19 @@ import subprocess
import struct
from soapyhelpers import *
from samplerates import *
from streamer import Streamer, is_alive
class Radio:
FORMAT = 'CS16'
class Radio(Streamer):
REST_PATH = "radio"
FORMAT = "CS16"
SAMPLES = 8192
PORT = 8554
def __init__(self, name, device_info):
super().__init__()
self.name = name
self.device_info = device_info
self.run = False
self.thread = None
self.device = soapy.Device(device_info)
if self.device is None:
raise RuntimeError("Failed to connect to radio device")
@ -33,7 +33,7 @@ class Radio:
frequency = int(prefixed.Float(frequency))
bandwidth = 200000
sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates'])
sample_rates = preferred_sample_rates(self.capabilities["rx"]["sample-rates"])
if len(sample_rates) == 0:
raise RuntimeError("No suitable sample rates are available")
self.sample_rate, self.output_rate = sample_rates[0]
@ -46,72 +46,52 @@ class Radio:
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
return {
'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual',
"frequency": self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
"sample-rate": self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
"bandwidth": self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
"gain-mode": "auto"
if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0)
else "manual",
}
def get_info(self):
return {
'name': self.name,
'device': self.device_info,
'capabilities': self.capabilities,
'stream-path': self._stream_path(),
'streaming': self.is_streaming(),
"name": self.name,
"device": self.device_info,
"capabilities": self.capabilities,
"stream-path": self._stream_path(),
"streaming": self.is_streaming(),
}
def _get_capabilities(self):
def get_direction_capabilities(direction):
return {
'antennas': self.device.listAntennas(direction, 0),
'gains': self.device.listGains(direction, 0),
'frequencies': self.device.listFrequencies(direction, 0),
'sample-rates': self.device.listSampleRates(direction, 0),
'bandwidths': self.device.listBandwidths(direction, 0),
'sensors': self.device.listSensors(direction, 0),
'formats': self.device.getStreamFormats(direction, 0),
"antennas": self.device.listAntennas(direction, 0),
"gains": self.device.listGains(direction, 0),
"frequencies": self.device.listFrequencies(direction, 0),
"sample-rates": self.device.listSampleRates(direction, 0),
"bandwidths": self.device.listBandwidths(direction, 0),
"sensors": self.device.listSensors(direction, 0),
"formats": self.device.getStreamFormats(direction, 0),
}
return {
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
'clock-sources': self.device.listClockSources(),
'time-sources': self.device.listTimeSources(),
'register-interfaces': self.device.listRegisterInterfaces(),
'gpios': self.device.listGPIOBanks(),
'uarts': self.device.listUARTs(),
"rx": get_direction_capabilities(soapy.SOAPY_SDR_RX),
"tx": get_direction_capabilities(soapy.SOAPY_SDR_TX),
"clock-sources": self.device.listClockSources(),
"time-sources": self.device.listTimeSources(),
"register-interfaces": self.device.listRegisterInterfaces(),
"gpios": self.device.listGPIOBanks(),
"uarts": self.device.listUARTs(),
}
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError('Stream thread is already running')
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError('No stream thread to terminate')
self.run = False
self.thread.join()
self.thread = None
def _stream_thread(self):
self._init_stream()
def is_alive(subprocess):
return (subprocess.poll() is None)
while self.run:
# Check that the child processes are still running
if (not is_alive(self.demod)) or (not is_alive(self.playback)):
print('DSP chain failed, aborting stream.', file=sys.stderr)
print("DSP chain failed, aborting stream.", file=sys.stderr)
break
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
@ -121,14 +101,18 @@ class Radio:
elif result.ret < 0:
error = SoapyError(result.ret)
if error is not SoapyError.Timeout:
print("Stream read failed, aborting stream:", error, file=sys.stderr)
print(
"Stream read failed, aborting stream:", error, file=sys.stderr
)
break
continue
else:
read_size = int(result.ret * 2)
self.demod.stdin.write(
struct.pack(FORMATS[Radio.FORMAT].packing % read_size,
*self.buffer[:read_size])
struct.pack(
FORMATS[Radio.FORMAT].packing % read_size,
*self.buffer[:read_size],
)
)
self._cleanup_stream()
@ -136,23 +120,44 @@ class Radio:
def _init_stream(self):
self.playback = subprocess.Popen(
[
'/usr/bin/ffmpeg', '-f', 's16le', '-ar', str(self.output_rate), '-ac', '2', '-i', '-',
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
"/usr/bin/ffmpeg",
"-f",
"s16le",
"-ar",
str(self.output_rate),
"-ac",
"2",
"-i",
"-",
"-f",
"rtsp",
self.stream_address(),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
stderr=subprocess.DEVNULL,
)
self.demod = subprocess.Popen(
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
[
"/usr/bin/python3",
"fm_demod.py",
"-f",
"CS16",
"-s",
str(self.sample_rate),
"-d",
str(self.output_rate),
],
stdin=subprocess.PIPE,
stdout=self.playback.stdin,
stderr=subprocess.DEVNULL
stderr=subprocess.DEVNULL,
)
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
self.stream = self.device.setupStream(
soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy
)
result = self.device.activateStream(self.stream)
if result != 0:
@ -176,29 +181,26 @@ class Radio:
self.playback.wait()
self.playback = None
def _stream_path(self):
return f":{Radio.PORT}/radio/{self.name}"
"""
Quick and dirty test of the Radio class.
"""
if __name__ == '__main__':
if __name__ == "__main__":
import time
sdr = Radio('demo', {'driver': 'sdrplay'})
sdr = Radio("demo", {"driver": "sdrplay"})
print('Configuring...')
sdr.configure('105.5M')
print('Configured.')
print("Configuring...")
sdr.configure("105.5M")
print("Configured.")
print('Starting stream...')
print("Starting stream...")
sdr.start_stream()
print('Stream started.')
print("Stream started.")
# Let the stream play for a while
time.sleep(15)
print('Ending stream...')
print("Ending stream...")
sdr.end_stream()
print('Stream ended.')
print("Stream ended.")

View File

@ -3,3 +3,5 @@ flasgger==0.9.5
Flask==2.2.3
numpy==1.17.4
prefixed==0.7.0
PyYAML==6.0
requests==2.22.0

View File

@ -17,6 +17,7 @@ supported_ouput_rates = [
192000, # Too much.
]
def score(pair, target_output=32000, target_ratio=10):
"""
Heuristic for scoring input & output sample rates. The criteria are:
@ -36,29 +37,38 @@ def score(pair, target_output=32000, target_ratio=10):
ratio = pair[0] // pair[1]
return abs(pair[1] - target_output)/2500 \
+ max(0, target_output - pair[1])/2500 \
+ abs(ratio - target_ratio)**0.8 \
+ max(0, target_ratio - ratio)**2
return (
abs(pair[1] - target_output) / 2500
+ max(0, target_output - pair[1]) / 2500
+ abs(ratio - target_ratio) ** 0.8
+ max(0, target_ratio - ratio) ** 2
)
def flatten(l):
return [item for sublist in l for item in sublist]
def flatten_dict(d):
return [(key,value) for key,rates in d.items() for value in rates]
return [(key, value) for key, rates in d.items() for value in rates]
def get_pairs(input_rate):
return [
(input_rate, rate)
for rate in supported_ouput_rates
if (input_rate % rate == 0)
(input_rate, rate) for rate in supported_ouput_rates if (input_rate % rate == 0)
]
def supported_sample_rates(supported_input_rates):
return {
in_rate: [out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0]
in_rate: [
out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0
]
for in_rate in supported_input_rates
}
def preferred_sample_rates(supported_input_rates):
return sorted(flatten_dict(supported_sample_rates(supported_input_rates)), key=score)
return sorted(
flatten_dict(supported_sample_rates(supported_input_rates)), key=score
)

8
scripts/wav2mp3.sh Executable file
View File

@ -0,0 +1,8 @@
#! /bin/bash
for arg in "$@"; do
path="${arg%/*}"
file="${arg##*/}"
filename="${file%%.*}"
ffmpeg -i $path/$file -vn -ar 44100 -ac 2 -b:a 100k $path/$filename.mp3
done

View File

@ -8,6 +8,10 @@ setup() {
sudo xargs apt-get install -y < requirements.apt
sudo pip install -r requirements.pip
# Requires yt-dlp, but a very recent version (as youtube broke their API for the pip version)
# https://stackoverflow.com/a/75504772
python3 -m pip install --force-reinstall https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz
# Install device driver
./scripts/SDRplay_RSP_API-Linux-3.07.1.run

View File

@ -1,6 +1,7 @@
import SoapySDR as soapy
from enum import IntEnum
class SoapyError(IntEnum):
Timeout = soapy.SOAPY_SDR_TIMEOUT
StreamError = soapy.SOAPY_SDR_STREAM_ERROR
@ -10,6 +11,7 @@ class SoapyError(IntEnum):
TimeError = soapy.SOAPY_SDR_TIME_ERROR
Underflow = soapy.SOAPY_SDR_UNDERFLOW
class SoapyFlag(IntEnum):
EndBurst = soapy.SOAPY_SDR_END_BURST
HasTime = soapy.SOAPY_SDR_HAS_TIME

51
streamer.py Normal file
View File

@ -0,0 +1,51 @@
from threading import Thread
import yaml
with open("mediamtx.yml", "r") as config_file:
MEDIASERVER_CONFIG = yaml.safe_load(config_file)
def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False
class Streamer:
PROTOCOL = "rtsp"
REST_PATH = "stream"
def __init__(self):
self.run = False
self.thread = None
self.name = None
def stream_path(self):
return f"{MEDIASERVER_CONFIG['rtspAddress']}/{type(self).REST_PATH}/{self.name}"
def stream_address(self, host):
return f"{Streamer.PROTOCOL}://{host}{self.stream_path()}"
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError("Stream thread is already running")
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError("No stream thread to terminate")
self.run = False
self.thread.join()
self.thread = None
if __name__ == "__main__":
from pprint import pprint
pprint(MEDIASERVER_CONFIG)

92
tuuube.py Executable file
View File

@ -0,0 +1,92 @@
#! /usr/bin/env python3
"""
We aren't using either the apt or the pip repositories for the youtube_dl
as there is a known bug affecting those two versions. The youtube API has changed
since their release, causing downloads to fail.
Make sure you use the ./setup.sh script to obtain the latest github release of
yt_dlp, as this version carries the latest fixes.
"""
# import youtube_dl
import yt_dlp as youtube_dl
from streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class Tuuube(Streamer):
REST_PATH = "tuuube"
def __init__(self, name):
super().__init__()
self.name = name
self.playback = None
def source_path(self):
return f"/tmp/{self.name}.mp3"
def _stream_thread(self):
if not os.path.exists(self.source_path()) or not os.path.isfile(
self.source_path()
):
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": f"/tmp/{self.name}.%(ext)s", # yt_dlp will append %(ext) if not specified,
"postprocessors": [ # resulting in `/tmp/file.mp3.mp3` :/
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
}
try:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"https://www.youtube.com/watch?v={self.name}"])
except Exception as e:
print(f"File sourcing failed, aborting stream. {e}", file=sys.stderr)
self.run = False
return
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re",
"-stream_loop",
"-1",
"-i",
self.source_path(),
"-c",
"copy",
"-f",
"rtsp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
tube = Tuuube("BaW_jenozKc")
tube.start_stream()
while True:
print(tube.is_streaming())
time.sleep(1)