Compare commits

..

1 Commits

Author SHA1 Message Date
d84268847c Added first run of kafka logging 2023-06-15 13:33:32 +09:30
28 changed files with 506 additions and 724 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1 +0,0 @@
https://www2.cs.uic.edu/~i101/SoundFiles/

Binary file not shown.

Binary file not shown.

Binary file not shown.

43
docker-compose.yml Normal file
View File

@ -0,0 +1,43 @@
version: "3"
services:
kafka:
container_name: kafka
hostname: kafka
image: bitnami/kafka:latest
ports:
# Flip the ports around, external gets default.
- "9092:9094"
- "9094:9092"
networks:
- kafka-internal
volumes:
- "kafka_data:/bitnami"
environment:
# https://hub.docker.com/r/bitnami/kafka/
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
networks:
- kafka-internal
environment:
# https://docs.kafka-ui.provectus.io/configuration/misc-configuration-properties
KAFKA_CLUSTERS_0_NAME: test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
volumes:
kafka_data:
driver: local
networks:
kafka-internal:

View File

@ -1,55 +0,0 @@
from streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class FileRadio(Streamer):
REST_PATH = "sample"
def __init__(self, path):
super().__init__()
self.path = path
self.basename = os.path.basename(self.path)
self.name, self.ext = os.path.splitext(self.basename)
def _stream_thread(self):
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream -
"-1", # ...indefinitely
"-i",
self.path,
"-c",
"copy",
"-f",
"rtsp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
fr = FileRadio("./data/sampleaudio/taunt.mp3")
fr.start_stream()
while True:
time.sleep(1)

View File

@ -10,9 +10,8 @@
import numpy, math, sys, time import numpy, math, sys, time
from numpy import fft from numpy import fft
def impulse(mask): def impulse(mask):
"""Convert frequency domain mask to time-domain""" ''' Convert frequency domain mask to time-domain '''
# Negative side, a mirror of positive side # Negative side, a mirror of positive side
negatives = mask[1:-1] negatives = mask[1:-1]
negatives.reverse() negatives.reverse()
@ -23,15 +22,15 @@ def impulse(mask):
impulse_response = fft.ifft(mask).real.tolist() impulse_response = fft.ifft(mask).real.tolist()
# swap left and right sides # swap left and right sides
left = impulse_response[: fft_length // 2] left = impulse_response[:fft_length // 2]
right = impulse_response[fft_length // 2 :] right = impulse_response[fft_length // 2:]
impulse_response = right + left impulse_response = right + left
return impulse_response return impulse_response
def lo_mask(sample_rate, tap_count, freq, dboct): def lo_mask(sample_rate, tap_count, freq, dboct):
"""Create a freq domain mask for a lowpass filter""" ''' Create a freq domain mask for a lowpass filter '''
order = dboct / 6 order = dboct / 6
max_freq = sample_rate / 2.0 max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0) f2s = max_freq / (tap_count / 2.0)
@ -39,14 +38,14 @@ def lo_mask(sample_rate, tap_count, freq, dboct):
freq /= f2s freq /= f2s
l = tap_count // 2 l = tap_count // 2
mask = [] mask = []
for f in range(0, l + 1): for f in range(0, l+1):
H = 1.0 / (1 + (f / freq) ** (2 * order)) ** 0.5 H = 1.0 / ( 1 + (f / freq) ** (2 * order) ) ** 0.5
mask.append(H) mask.append(H)
return mask return mask
def hi_mask(sample_rate, tap_count, freq, dboct): def hi_mask(sample_rate, tap_count, freq, dboct):
"""Create a freq domain mask for a highpass filter""" ''' Create a freq domain mask for a highpass filter '''
order = dboct / 6 order = dboct / 6
max_freq = sample_rate / 2.0 max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0) f2s = max_freq / (tap_count / 2.0)
@ -54,25 +53,25 @@ def hi_mask(sample_rate, tap_count, freq, dboct):
freq /= f2s freq /= f2s
l = tap_count // 2 l = tap_count // 2
mask = [] mask = []
for f in range(0, l + 1): for f in range(0, l+1):
H = 1.0 / (1 + (freq / (f + 0.0001)) ** (2 * order)) ** 0.5 H = 1.0 / ( 1 + (freq / (f + 0.0001)) ** (2 * order) ) ** 0.5
mask.append(H) mask.append(H)
return mask return mask
def combine_masks(mask1, mask2): def combine_masks(mask1, mask2):
"""Combine two filter masks""" ''' Combine two filter masks '''
assert len(mask1) == len(mask2) assert len(mask1) == len(mask2)
return [mask1[i] * mask2[i] for i in range(0, len(mask1))] return [ mask1[i] * mask2[i] for i in range(0, len(mask1)) ]
def taps(sample_rate, freq, dboct, is_highpass): def taps(sample_rate, freq, dboct, is_highpass):
cutoff_octaves = 60 / dboct cutoff_octaves = 60 / dboct
if is_highpass: if is_highpass:
cutoff = freq / 2**cutoff_octaves cutoff = freq / 2 ** cutoff_octaves
else: else:
cutoff = freq * 2**cutoff_octaves cutoff = freq * 2 ** cutoff_octaves
cutoff = min(cutoff, sample_rate / 2) cutoff = min(cutoff, sample_rate / 2)
transition_band = abs(freq - cutoff) transition_band = abs(freq - cutoff)
@ -88,8 +87,8 @@ class filter:
def feed(self, original): def feed(self, original):
unfiltered = numpy.concatenate((self.buf, original)) unfiltered = numpy.concatenate((self.buf, original))
self.buf = unfiltered[-len(self.coefs) :] self.buf = unfiltered[-len(self.coefs):]
filtered = numpy.convolve(unfiltered, self.coefs, mode="valid") filtered = numpy.convolve(unfiltered, self.coefs, mode='valid')
assert len(filtered) == len(original) + 1 assert len(filtered) == len(original) + 1
return filtered[1:] return filtered[1:]
@ -99,7 +98,7 @@ class low_pass(filter):
tap_count = taps(sample_rate, f, dbo, False) tap_count = taps(sample_rate, f, dbo, False)
mask = lo_mask(sample_rate, tap_count, f, dbo) mask = lo_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [0 for n in self.coefs] self.buf = [ 0 for n in self.coefs ]
class high_pass(filter): class high_pass(filter):
@ -107,19 +106,18 @@ class high_pass(filter):
tap_count = taps(sample_rate, f, dbo, True) tap_count = taps(sample_rate, f, dbo, True)
mask = hi_mask(sample_rate, tap_count, f, dbo) mask = hi_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [0 for n in self.coefs] self.buf = [ 0 for n in self.coefs ]
class band_pass(filter): class band_pass(filter):
def __init__(self, sample_rate, lo, hi, dbo): def __init__(self, sample_rate, lo, hi, dbo):
tap_count = max( tap_count = max(taps(sample_rate, lo, dbo, True),
taps(sample_rate, lo, dbo, True), taps(sample_rate, hi, dbo, False) taps(sample_rate, hi, dbo, False))
)
lomask = lo_mask(sample_rate, tap_count, hi, dbo) lomask = lo_mask(sample_rate, tap_count, hi, dbo)
himask = hi_mask(sample_rate, tap_count, lo, dbo) himask = hi_mask(sample_rate, tap_count, lo, dbo)
mask = combine_masks(lomask, himask) mask = combine_masks(lomask, himask)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [0 for n in self.coefs] self.buf = [ 0 for n in self.coefs ]
class deemphasis(filter): class deemphasis(filter):
@ -133,9 +131,8 @@ class deemphasis(filter):
# slope in dB/octave of deemphasis filter # slope in dB/octave of deemphasis filter
dedbo = 10 / octaves dedbo = 10 / octaves
tap_count = max( tap_count = max(taps(sample_rate, lo, dedbo, False),
taps(sample_rate, lo, dedbo, False), taps(sample_rate, hi, final_dbo, False) taps(sample_rate, hi, final_dbo, False))
)
# Calculate deemphasis filter # Calculate deemphasis filter
demask = lo_mask(sample_rate, tap_count, lo, dedbo) demask = lo_mask(sample_rate, tap_count, lo, dedbo)
@ -144,7 +141,7 @@ class deemphasis(filter):
mask = combine_masks(demask, fmask) mask = combine_masks(demask, fmask)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [0 for n in self.coefs] self.buf = [ 0 for n in self.coefs ]
class decimator(filter): class decimator(filter):
@ -158,11 +155,11 @@ class decimator(filter):
# Gets the last n-th sample of every n (n = factor) # Gets the last n-th sample of every n (n = factor)
# If e.g. gets 12 samples, gets s[4] and s[9], and # If e.g. gets 12 samples, gets s[4] and s[9], and
# stoves s[10:] to the next round # stoves s[10:] to the next round
""" '''
decimated = [ original[ self.factor * i + self.factor - 1 ] \ decimated = [ original[ self.factor * i + self.factor - 1 ] \
for i in range(0, len(original) // self.factor) ] for i in range(0, len(original) // self.factor) ]
""" '''
decimated = original[(self.factor - 1) :: self.factor] decimated = original[(self.factor - 1)::self.factor]
self.buf2 = original[: -len(original) % self.factor] self.buf2 = original[:-len(original) % self.factor]
return decimated return decimated

View File

@ -19,24 +19,12 @@ from formats import FORMATS
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
"-v", "--verbose", help="Print additional informational output", action="store_true" parser.add_argument('-f', '--format', choices=list(FORMATS.keys()), help='Input sample format', required=True)
) parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True)
parser.add_argument( parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True)
"-f",
"--format",
choices=list(FORMATS.keys()),
help="Input sample format",
required=True,
)
parser.add_argument(
"-s", "--sample-rate", metavar="rate", help="Source sample rate (Hz)", required=True
)
parser.add_argument(
"-d", "--demod-rate", metavar="rate", help="Output sample rate (Hz)", required=True
)
# TODO JMT: Output to file # TODO JMT: Output to file
# parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.') #parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
args = parser.parse_args() args = parser.parse_args()
INPUT_RATE = int(prefixed.Float(args.sample_rate)) INPUT_RATE = int(prefixed.Float(args.sample_rate))
@ -44,10 +32,7 @@ OUTPUT_RATE = int(prefixed.Float(args.demod_rate))
DECIMATION = INPUT_RATE / OUTPUT_RATE DECIMATION = INPUT_RATE / OUTPUT_RATE
if DECIMATION != math.floor(DECIMATION): if DECIMATION != math.floor(DECIMATION):
print( print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr)
f"The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}",
file=sys.stderr,
)
sys.exit(1) sys.exit(1)
FORMAT = FORMATS[args.format].numpy FORMAT = FORMATS[args.format].numpy
@ -78,17 +63,15 @@ decimate2 = filters.decimator(DECIMATION)
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120) lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Band-pass filter for stereo (L-R) modulated audio # Band-pass filter for stereo (L-R) modulated audio
hi = filters.band_pass( hi = filters.band_pass(INPUT_RATE,
INPUT_RATE, STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120 STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120)
)
# Filter to extract pilot signal # Filter to extract pilot signal
pilot = filters.band_pass( pilot = filters.band_pass(INPUT_RATE,
INPUT_RATE, STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120 STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120)
)
last_angle = 0.0 last_angle = 0.0
remaining_data = b"" remaining_data = b''
while True: while True:
# Ingest 0.1s worth of data # Ingest 0.1s worth of data
@ -96,7 +79,7 @@ while True:
if not data: if not data:
break break
# TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1 # TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1
# data = remaining_data + data #data = remaining_data + data
if len(data) < 2 * BYTES_PER_SAMPLE: if len(data) < 2 * BYTES_PER_SAMPLE:
remaining_data = data remaining_data = data
@ -190,7 +173,8 @@ while True:
last_deviation_avg = 0.0 last_deviation_avg = 0.0
# Translate rotation to frequency deviation # Translate rotation to frequency deviation
STEREO_CARRIER /= 1 + (rotation * 1.05) / tau STEREO_CARRIER /= (1 + (rotation * 1.05) / tau)
# Downsample, Low-pass/deemphasis demodulated L-R # Downsample, Low-pass/deemphasis demodulated L-R
output_jstereo = lo_r.feed(output_jstereo) output_jstereo = lo_r.feed(output_jstereo)
@ -212,4 +196,4 @@ while True:
output[1::2] = output_right output[1::2] = output_right
output = output.astype(int) output = output.astype(int)
sys.stdout.buffer.write(struct.pack("<%dh" % len(output), *output)) sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))

View File

@ -2,7 +2,6 @@ import numpy as np
from SoapySDR import * from SoapySDR import *
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass
class FormatSpec: class FormatSpec:
name: str name: str
@ -10,10 +9,9 @@ class FormatSpec:
numpy: np.dtype numpy: np.dtype
packing: str packing: str
FORMATS = { FORMATS = {
"CU8": FormatSpec("CU8", SOAPY_SDR_CU8, np.uint8, "=%dB"), 'CU8': FormatSpec('CU8', SOAPY_SDR_CU8, np.uint8, '=%dB'),
"CS8": FormatSpec("CS8", SOAPY_SDR_CS8, np.int8, "=%db"), 'CS8': FormatSpec('CS8', SOAPY_SDR_CS8, np.int8, '=%db'),
"CS16": FormatSpec("CS16", SOAPY_SDR_CS16, np.int16, "=%dh"), 'CS16': FormatSpec('CS16', SOAPY_SDR_CS16, np.int16, '=%dh'),
"CF32": FormatSpec("CF32", SOAPY_SDR_CF32, np.float32, "=%df"), 'CF32': FormatSpec('CF32', SOAPY_SDR_CF32, np.float32, '=%df'),
} }

64
kafkalogging.py Normal file
View File

@ -0,0 +1,64 @@
from kafka import KafkaProducer
import subprocess
import os
kafka_servers = ['localhost:9092']
producers = {}
# Do anything that might take more than a few ms early, otherwise you hit overflow errors
def prep_io(process, process_name=None):
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
if os.get_blocking(pipe.fileno()):
os.set_blocking(pipe.fileno(), False)
if process.pid not in producers:
producers[process.pid] = {}
if pipe_name not in producers[process.pid]:
producers[process.pid][pipe_name] = KafkaProducer(
bootstrap_servers=kafka_servers,
max_block_ms=200
)
# Inspired by gist:
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
def check_io(process, process_name=None):
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
while True:
# The python official docs recommend using Popen.communicate() instead of reading
# from the pipes directly. Using Popen.communicate is a blocking call though,
# which would stall threads for processes producing no output.
output = pipe.readline()
if output:
producers[process.pid][pipe_name].send(topic_name, value=output)
else:
break
producers[process.pid][pipe_name].flush()
if __name__ == '__main__':
import time
ping = subprocess.Popen(
[
'/usr/bin/ping',
'1.1.1.1',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
while True:
check_io(ping, 'ping')
time.sleep(0.1)

View File

@ -38,10 +38,10 @@ udpMaxPayloadSize: 1472
externalAuthenticationURL: externalAuthenticationURL:
# Enable the HTTP API. # Enable the HTTP API.
api: yes api: no
# Enable Prometheus-compatible metrics. # Enable Prometheus-compatible metrics.
metrics: yes metrics: no
# Enable pprof-compatible endpoint to monitor performances. # Enable pprof-compatible endpoint to monitor performances.
pprof: no pprof: no

View File

@ -1,12 +1,7 @@
#! /usr/bin/env python3 #! /usr/bin/env python3
import os
import sys
import requests
import SoapySDR as soapy import SoapySDR as soapy
from radio import Radio from radio import Radio
from tuuube import Tuuube
from fileradio import FileRadio
from flask import Flask, jsonify from flask import Flask, jsonify
from flasgger import Swagger from flasgger import Swagger
@ -18,30 +13,8 @@ swag = Swagger(app)
radios = {} radios = {}
@app.route("/report") @app.route('/radio/report')
def report(): def report():
"""Get streams report from the RTSP relay.
---
responses:
200:
description: JSON with streams report.
400:
description: JSON with error message.
"""
try:
r = requests.get("http://localhost:9997/v1/paths/list")
j = r.json()
for item in j["items"]:
del j["items"][item]["conf"]
del j["items"][item]["confName"]
return jsonify(j)
except Exception as e:
return _error_message_json(e.message), 500
@app.route("/radio/report")
def radio_report():
"""List radio devices available to the system. """List radio devices available to the system.
--- ---
responses: responses:
@ -52,8 +25,7 @@ def radio_report():
devices = [dict(device) for device in soapy.Device.enumerate()] devices = [dict(device) for device in soapy.Device.enumerate()]
return jsonify(devices) return jsonify(devices)
@app.route('/radio/<radio>/connect')
@app.route("/radio/<radio>/connect")
def connect(radio): def connect(radio):
"""Connect to a radio device, by driver name or serial number. """Connect to a radio device, by driver name or serial number.
--- ---
@ -65,29 +37,29 @@ def connect(radio):
required: true required: true
responses: responses:
200: 200:
description: JSON with message successfully connected to a radio. description: Successfully connected to a radio.
400: 400:
description: JSON with error message No radio device by that name is available. description: No radio device by that name is available.
500: 500:
description: JSON with error message failed to connect to radio. description: Failed to connect to radio.
""" """
if radio in radios: if radio in radios:
return _error_message_json("Radio device already connected"), 400 return "Radio device already connected", 400
devices = [dict(device) for device in soapy.Device.enumerate()] devices = [dict(device) for device in soapy.Device.enumerate()]
for device in devices: for device in devices:
if radio in device.values(): if radio in device.values():
try: try:
radios[radio] = Radio(radio, device) radios[radio] = Radio(radio, device)
return jsonify("message", "successfully connected radio"), 200 return "", 200
except Exception as e: except Exception as e:
radios.pop(radio) radios.pop(radio)
return _error_message_json(e.message), 500 return str(e), 500
return "Radio device not found", 400 return "Radio device not found", 400
@app.route('/radio/<radio>/disconnect')
@app.route("/radio/<radio>/disconnect")
def disconnect(radio): def disconnect(radio):
"""Disconnect from a radio device. """Disconnect from a radio device.
--- ---
@ -107,12 +79,11 @@ def disconnect(radio):
""" """
if radio in radios: if radio in radios:
radios.pop(radio) radios.pop(radio)
return jsonify("message", "succesfully disconnected radio"), 200 return "", 200
else: else:
return _error_message_json("Radio not connected"), 400 return "Radio not connected", 400
@app.route('/radio/<radio>/configure/<frequency>')
@app.route("/radio/<radio>/configure/<frequency>")
def configure(radio, frequency): def configure(radio, frequency):
"""Tune the radio to a frequency. """Tune the radio to a frequency.
You must connect to the radio before attempting to configure it. You must connect to the radio before attempting to configure it.
@ -131,21 +102,20 @@ def configure(radio, frequency):
required: true required: true
responses: responses:
200: 200:
description: JSON with radio configuration. description: JSON
400: 400:
description: The specified radio is not connected. description: The specified radio is not connected.
""" """
if radio in radios: if radio in radios:
return jsonify(radios[radio].configure(frequency)) return jsonify(radios[radio].configure(frequency))
else: else:
return _error_message_json("Radio not connected"), 400 return "Radio not connected", 400
@app.route('/radio/<radio>/start')
@app.route("/radio/<radio>/start")
def start_stream(radio): def start_stream(radio):
"""Start the radio stream. """Start the radio stream.
Once the stream has been started, connect to the stream at: Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/radio/[radio] rtsp://[host]:8554/radio/[radio]/stream
--- ---
parameters: parameters:
- name: radio - name: radio
@ -153,20 +123,14 @@ def start_stream(radio):
in: path in: path
type: string type: string
required: true required: true
responses:
200:
description: JSON with message successful start of radio stream.
400:
description: JSON with error message.
""" """
try: try:
radios[radio].start_stream() radios[radio].start_stream()
return jsonify("message", "successfully started radio stream"), 200 return "", 200
except Exception as e: except Exception as e:
return _error_message_json(e.message), 400 return str(e), 400
@app.route('/radio/<radio>/end')
@app.route("/radio/<radio>/end")
def end_stream(radio): def end_stream(radio):
"""Terminate the radio stream. """Terminate the radio stream.
--- ---
@ -176,21 +140,14 @@ def end_stream(radio):
in: path in: path
type: string type: string
required: true required: true
responses:
200:
description: JSON with message successful termination of radio stream.
400:
description: JSON with error message.
""" """
try: try:
radios[radio].end_stream() radios[radio].end_stream()
return jsonify("message", "successfully ended radio stream"), 200 return "", 200
except Exception as e: except Exception as e:
error_message = {"error_message": e.message} return str(e), 400
return _error_message_json(e.message), 400
@app.route('/radio/<radio>/info')
@app.route("/radio/<radio>/info")
def radio_info(radio): def radio_info(radio):
"""Get information about a radio. """Get information about a radio.
--- ---
@ -202,117 +159,40 @@ def radio_info(radio):
required: true required: true
responses: responses:
200: 200:
description: JSON with radio information. description: JSON
400: 400:
description: JSON with error message. description: The specified radio is not connected.
""" """
try: try:
return jsonify(radios[radio].get_info()) return jsonify(radios[radio].get_info())
except Exception as e: except Exception as e:
return _error_message_json(e.message), 400 return str(e), 400
tubes = {} if __name__ == '__main__':
@app.route("/tuuube/<id>/start")
def start_tuuube_stream(id):
"""Start streaming from a youtube source.
Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/tuuube/[id]
---
parameters:
- name: id
description:
The youtube video ID. That is the part following the `watch?v=` in the URL. For example, `dQw4w9WgXcQ`.\n
Other good options are - \n
`BaW_jenozKc`, yt_dlp package test video.\n
`b2je8uBlgFM`, stereo audio test.\n
`LDU_Txk06tM`, crab rave, a commonly used audio fidelity test.\n
`sPT_epMLkwQ`, Kilsyth CFA major factory fire dispatch radio traffic.
in: path
type: string
required: true
responses:
200:
description: JSON with message successful start of youtube stream.
400:
description: JSON with error message.
"""
if id not in tubes:
tubes[id] = Tuuube(id)
try:
tubes[id].start_stream()
return jsonify("message", "successfully started youtube stream"), 200
except Exception as e:
return _error_message_json(e.message), 400
@app.route("/tuuube/<id>/end")
def end_tuuube_stream(id):
"""Terminate the youtube stream.
---
parameters:
- name: id
description: The youtube video ID.
in: path
type: string
required: true
responses:
200:
description: JSON with message successful termination of youtube stream.
400:
description: JSON with error message.
"""
try:
tubes[id].end_stream()
return jsonify("message", "succesfully ended youtube stream"), 200
except Exception as e:
return _error_message_json(e.message), 400
"""
Helper function to return a JSON error message
parameters: error_message - the error message to return
returns: a JSON object with the error message
"""
def _error_message_json(error_message: str):
error_message = {"error_message": error_message}
return jsonify(error_message)
if __name__ == "__main__":
import subprocess import subprocess
rtsp_relay = subprocess.Popen( rtsp_relay = subprocess.Popen(
["./dependencies/mediamtx/mediamtx", "./mediamtx.yml"], [
'./dependencies/mediamtx/mediamtx',
'./mediamtx.yml'
],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL
) )
for path, _, files in os.walk("./data/sampleaudio"): app.run(
for file in files: host='0.0.0.0',
name, ext = os.path.splitext(file) threaded=True,
if ext == ".mp3": debug=False
tubes[name] = FileRadio(f"{path}/{file}") )
tubes[name].start_stream()
app.run(host="0.0.0.0", threaded=True, debug=False) print('Stopping any currently streaming radios...')
print("Stopping any currently streaming radios...")
for radio in radios: for radio in radios:
if radios[radio].is_streaming(): if radios[radio].is_streaming():
radios[radio].end_stream() radios[radio].end_stream()
radios = None radios = None
for tube in tubes: print('Killing RTSP relay...')
if tubes[tube].is_streaming():
tubes[tube].end_stream()
tubes = None
print("Killing RTSP relay...")
rtsp_relay.kill() rtsp_relay.kill()
rtsp_relay.wait() # Necessary? rtsp_relay.wait() # Necessary?

161
radio.py
View File

@ -7,19 +7,20 @@ import subprocess
import struct import struct
from soapyhelpers import * from soapyhelpers import *
from samplerates import * from samplerates import *
from streamer import Streamer, is_alive from kafkalogging import *
class Radio(Streamer): class Radio:
REST_PATH = "radio" FORMAT = 'CS16'
FORMAT = "CS16"
SAMPLES = 8192 SAMPLES = 8192
PORT = 8554
def __init__(self, name, device_info): def __init__(self, name, device_info):
super().__init__()
self.name = name self.name = name
self.device_info = device_info self.device_info = device_info
self.run = False
self.thread = None
self.device = soapy.Device(device_info) self.device = soapy.Device(device_info)
if self.device is None: if self.device is None:
raise RuntimeError("Failed to connect to radio device") raise RuntimeError("Failed to connect to radio device")
@ -33,7 +34,7 @@ class Radio(Streamer):
frequency = int(prefixed.Float(frequency)) frequency = int(prefixed.Float(frequency))
bandwidth = 200000 bandwidth = 200000
sample_rates = preferred_sample_rates(self.capabilities["rx"]["sample-rates"]) sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates'])
if len(sample_rates) == 0: if len(sample_rates) == 0:
raise RuntimeError("No suitable sample rates are available") raise RuntimeError("No suitable sample rates are available")
self.sample_rate, self.output_rate = sample_rates[0] self.sample_rate, self.output_rate = sample_rates[0]
@ -46,52 +47,72 @@ class Radio(Streamer):
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
return { return {
"frequency": self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), 'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
"sample-rate": self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), 'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
"bandwidth": self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), 'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
"gain-mode": "auto" 'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual',
if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0)
else "manual",
} }
def get_info(self): def get_info(self):
return { return {
"name": self.name, 'name': self.name,
"device": self.device_info, 'device': self.device_info,
"capabilities": self.capabilities, 'capabilities': self.capabilities,
"stream-path": self._stream_path(), 'stream-path': self._stream_path(),
"streaming": self.is_streaming(), 'streaming': self.is_streaming(),
} }
def _get_capabilities(self): def _get_capabilities(self):
def get_direction_capabilities(direction): def get_direction_capabilities(direction):
return { return {
"antennas": self.device.listAntennas(direction, 0), 'antennas': self.device.listAntennas(direction, 0),
"gains": self.device.listGains(direction, 0), 'gains': self.device.listGains(direction, 0),
"frequencies": self.device.listFrequencies(direction, 0), 'frequencies': self.device.listFrequencies(direction, 0),
"sample-rates": self.device.listSampleRates(direction, 0), 'sample-rates': self.device.listSampleRates(direction, 0),
"bandwidths": self.device.listBandwidths(direction, 0), 'bandwidths': self.device.listBandwidths(direction, 0),
"sensors": self.device.listSensors(direction, 0), 'sensors': self.device.listSensors(direction, 0),
"formats": self.device.getStreamFormats(direction, 0), 'formats': self.device.getStreamFormats(direction, 0),
} }
return { return {
"rx": get_direction_capabilities(soapy.SOAPY_SDR_RX), 'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
"tx": get_direction_capabilities(soapy.SOAPY_SDR_TX), 'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
"clock-sources": self.device.listClockSources(), 'clock-sources': self.device.listClockSources(),
"time-sources": self.device.listTimeSources(), 'time-sources': self.device.listTimeSources(),
"register-interfaces": self.device.listRegisterInterfaces(), 'register-interfaces': self.device.listRegisterInterfaces(),
"gpios": self.device.listGPIOBanks(), 'gpios': self.device.listGPIOBanks(),
"uarts": self.device.listUARTs(), 'uarts': self.device.listUARTs(),
} }
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError('Stream thread is already running')
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError('No stream thread to terminate')
self.run = False
self.thread.join()
self.thread = None
def _stream_thread(self): def _stream_thread(self):
self._init_stream() self._init_stream()
def is_alive(subprocess):
return (subprocess.poll() is None)
while self.run: while self.run:
# Check that the child processes are still running # Check that the child processes are still running
if (not is_alive(self.demod)) or (not is_alive(self.playback)): if (not is_alive(self.demod)) or (not is_alive(self.playback)):
print("DSP chain failed, aborting stream.", file=sys.stderr) print('DSP chain failed, aborting stream.', file=sys.stderr)
break break
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
@ -101,63 +122,46 @@ class Radio(Streamer):
elif result.ret < 0: elif result.ret < 0:
error = SoapyError(result.ret) error = SoapyError(result.ret)
if error is not SoapyError.Timeout: if error is not SoapyError.Timeout:
print( print("Stream read failed, aborting stream:", error, file=sys.stderr)
"Stream read failed, aborting stream:", error, file=sys.stderr
)
break break
continue continue
else: else:
read_size = int(result.ret * 2) read_size = int(result.ret * 2)
self.demod.stdin.write( self.demod.stdin.write(
struct.pack( struct.pack(FORMATS[Radio.FORMAT].packing % read_size,
FORMATS[Radio.FORMAT].packing % read_size, *self.buffer[:read_size])
*self.buffer[:read_size],
)
) )
# handle subprocess logs
check_io(self.demod, f"{self.name}-demod")
check_io(self.playback, f"{self.name}-playback")
self._cleanup_stream() self._cleanup_stream()
def _init_stream(self): def _init_stream(self):
self.playback = subprocess.Popen( self.playback = subprocess.Popen(
[ [
"/usr/bin/ffmpeg", '/usr/bin/ffmpeg', '-f', 's16le', '-ar', str(self.output_rate), '-ac', '2', '-i', '-',
"-f", '-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
"s16le",
"-ar",
str(self.output_rate),
"-ac",
"2",
"-i",
"-",
"-f",
"rtsp",
self.stream_address(),
], ],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.PIPE
) )
self.demod = subprocess.Popen( self.demod = subprocess.Popen(
[ ['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
"/usr/bin/python3",
"fm_demod.py",
"-f",
"CS16",
"-s",
str(self.sample_rate),
"-d",
str(self.output_rate),
],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=self.playback.stdin, stdout=self.playback.stdin,
stderr=subprocess.DEVNULL, stderr=subprocess.PIPE
) )
prep_io(self.demod, f"{self.name}-demod")
prep_io(self.playback, f"{self.name}-playback")
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy) self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
self.stream = self.device.setupStream( self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy
)
result = self.device.activateStream(self.stream) result = self.device.activateStream(self.stream)
if result != 0: if result != 0:
@ -181,26 +185,29 @@ class Radio(Streamer):
self.playback.wait() self.playback.wait()
self.playback = None self.playback = None
def _stream_path(self):
return f":{Radio.PORT}/radio/{self.name}"
""" """
Quick and dirty test of the Radio class. Quick and dirty test of the Radio class.
""" """
if __name__ == "__main__": if __name__ == '__main__':
import time import time
sdr = Radio("demo", {"driver": "sdrplay"}) sdr = Radio('demo', {'driver': 'sdrplay'})
print("Configuring...") print('Configuring...')
sdr.configure("105.5M") sdr.configure('105.5M')
print("Configured.") print('Configured.')
print("Starting stream...") print('Starting stream...')
sdr.start_stream() sdr.start_stream()
print("Stream started.") print('Stream started.')
# Let the stream play for a while # Let the stream play for a while
time.sleep(15) time.sleep(15)
print("Ending stream...") print('Ending stream...')
sdr.end_stream() sdr.end_stream()
print("Stream ended.") print('Stream ended.')

View File

@ -3,5 +3,3 @@ flasgger==0.9.5
Flask==2.2.3 Flask==2.2.3
numpy==1.17.4 numpy==1.17.4
prefixed==0.7.0 prefixed==0.7.0
PyYAML==6.0
requests==2.22.0

View File

@ -17,7 +17,6 @@ supported_ouput_rates = [
192000, # Too much. 192000, # Too much.
] ]
def score(pair, target_output=32000, target_ratio=10): def score(pair, target_output=32000, target_ratio=10):
""" """
Heuristic for scoring input & output sample rates. The criteria are: Heuristic for scoring input & output sample rates. The criteria are:
@ -37,38 +36,29 @@ def score(pair, target_output=32000, target_ratio=10):
ratio = pair[0] // pair[1] ratio = pair[0] // pair[1]
return ( return abs(pair[1] - target_output)/2500 \
abs(pair[1] - target_output) / 2500 + max(0, target_output - pair[1])/2500 \
+ max(0, target_output - pair[1]) / 2500 + abs(ratio - target_ratio)**0.8 \
+ abs(ratio - target_ratio) ** 0.8 + max(0, target_ratio - ratio)**2
+ max(0, target_ratio - ratio) ** 2
)
def flatten(l): def flatten(l):
return [item for sublist in l for item in sublist] return [item for sublist in l for item in sublist]
def flatten_dict(d): def flatten_dict(d):
return [(key, value) for key, rates in d.items() for value in rates] return [(key,value) for key,rates in d.items() for value in rates]
def get_pairs(input_rate): def get_pairs(input_rate):
return [ return [
(input_rate, rate) for rate in supported_ouput_rates if (input_rate % rate == 0) (input_rate, rate)
for rate in supported_ouput_rates
if (input_rate % rate == 0)
] ]
def supported_sample_rates(supported_input_rates): def supported_sample_rates(supported_input_rates):
return { return {
in_rate: [ in_rate: [out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0]
out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0
]
for in_rate in supported_input_rates for in_rate in supported_input_rates
} }
def preferred_sample_rates(supported_input_rates): def preferred_sample_rates(supported_input_rates):
return sorted( return sorted(flatten_dict(supported_sample_rates(supported_input_rates)), key=score)
flatten_dict(supported_sample_rates(supported_input_rates)), key=score
)

34
scripts/connect.sh Executable file
View File

@ -0,0 +1,34 @@
#! /bin/bash
set -e
server="localhost:5000"
radio="sdrplay"
frequency="105.5M"
trap deinit INT
deinit() {
curl $server/radio/$radio/end
curl $server/radio/$radio/disconnect
#kill $pid
killall ffplay
echo -n ""
}
init() {
curl $server/radio/$radio/connect
curl $server/radio/$radio/configure/$frequency
curl $server/radio/$radio/start
}
init
echo "Waiting for stream to become active..."
sleep 3
ffplay -nodisp -hide_banner rtsp://localhost:8554/radio/sdrplay &
pid=$!
while true; do
sleep 1
done

View File

@ -1,8 +0,0 @@
#! /bin/bash
for arg in "$@"; do
path="${arg%/*}"
file="${arg##*/}"
filename="${file%%.*}"
ffmpeg -i $path/$file -vn -ar 44100 -ac 2 -b:a 100k $path/$filename.mp3
done

View File

@ -8,10 +8,6 @@ setup() {
sudo xargs apt-get install -y < requirements.apt sudo xargs apt-get install -y < requirements.apt
sudo pip install -r requirements.pip sudo pip install -r requirements.pip
# Requires yt-dlp, but a very recent version (as youtube broke their API for the pip version)
# https://stackoverflow.com/a/75504772
python3 -m pip install --force-reinstall https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz
# Install device driver # Install device driver
./scripts/SDRplay_RSP_API-Linux-3.07.1.run ./scripts/SDRplay_RSP_API-Linux-3.07.1.run

View File

@ -1,7 +1,6 @@
import SoapySDR as soapy import SoapySDR as soapy
from enum import IntEnum from enum import IntEnum
class SoapyError(IntEnum): class SoapyError(IntEnum):
Timeout = soapy.SOAPY_SDR_TIMEOUT Timeout = soapy.SOAPY_SDR_TIMEOUT
StreamError = soapy.SOAPY_SDR_STREAM_ERROR StreamError = soapy.SOAPY_SDR_STREAM_ERROR
@ -11,7 +10,6 @@ class SoapyError(IntEnum):
TimeError = soapy.SOAPY_SDR_TIME_ERROR TimeError = soapy.SOAPY_SDR_TIME_ERROR
Underflow = soapy.SOAPY_SDR_UNDERFLOW Underflow = soapy.SOAPY_SDR_UNDERFLOW
class SoapyFlag(IntEnum): class SoapyFlag(IntEnum):
EndBurst = soapy.SOAPY_SDR_END_BURST EndBurst = soapy.SOAPY_SDR_END_BURST
HasTime = soapy.SOAPY_SDR_HAS_TIME HasTime = soapy.SOAPY_SDR_HAS_TIME

View File

@ -1,51 +0,0 @@
from threading import Thread
import yaml
with open("mediamtx.yml", "r") as config_file:
MEDIASERVER_CONFIG = yaml.safe_load(config_file)
def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False
class Streamer:
PROTOCOL = "rtsp"
REST_PATH = "stream"
def __init__(self):
self.run = False
self.thread = None
self.name = None
def stream_path(self):
return f"{MEDIASERVER_CONFIG['rtspAddress']}/{type(self).REST_PATH}/{self.name}"
def stream_address(self, host):
return f"{Streamer.PROTOCOL}://{host}{self.stream_path()}"
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError("Stream thread is already running")
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError("No stream thread to terminate")
self.run = False
self.thread.join()
self.thread = None
if __name__ == "__main__":
from pprint import pprint
pprint(MEDIASERVER_CONFIG)

View File

@ -1,92 +0,0 @@
#! /usr/bin/env python3
"""
We aren't using either the apt or the pip repositories for the youtube_dl
as there is a known bug affecting those two versions. The youtube API has changed
since their release, causing downloads to fail.
Make sure you use the ./setup.sh script to obtain the latest github release of
yt_dlp, as this version carries the latest fixes.
"""
# import youtube_dl
import yt_dlp as youtube_dl
from streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class Tuuube(Streamer):
REST_PATH = "tuuube"
def __init__(self, name):
super().__init__()
self.name = name
self.playback = None
def source_path(self):
return f"/tmp/{self.name}.mp3"
def _stream_thread(self):
if not os.path.exists(self.source_path()) or not os.path.isfile(
self.source_path()
):
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": f"/tmp/{self.name}.%(ext)s", # yt_dlp will append %(ext) if not specified,
"postprocessors": [ # resulting in `/tmp/file.mp3.mp3` :/
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
}
try:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"https://www.youtube.com/watch?v={self.name}"])
except Exception as e:
print(f"File sourcing failed, aborting stream. {e}", file=sys.stderr)
self.run = False
return
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re",
"-stream_loop",
"-1",
"-i",
self.source_path(),
"-c",
"copy",
"-f",
"rtsp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
tube = Tuuube("BaW_jenozKc")
tube.start_stream()
while True:
print(tube.is_streaming())
time.sleep(1)