Compare commits

..

12 Commits

28 changed files with 724 additions and 506 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

1
data/sampleaudio/SOURCE Normal file
View File

@ -0,0 +1 @@
https://www2.cs.uic.edu/~i101/SoundFiles/

Binary file not shown.

Binary file not shown.

BIN
data/sampleaudio/taunt.mp3 Normal file

Binary file not shown.

View File

@ -1,43 +0,0 @@
version: "3"
services:
kafka:
container_name: kafka
hostname: kafka
image: bitnami/kafka:latest
ports:
# Flip the ports around, external gets default.
- "9092:9094"
- "9094:9092"
networks:
- kafka-internal
volumes:
- "kafka_data:/bitnami"
environment:
# https://hub.docker.com/r/bitnami/kafka/
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
networks:
- kafka-internal
environment:
# https://docs.kafka-ui.provectus.io/configuration/misc-configuration-properties
KAFKA_CLUSTERS_0_NAME: test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
volumes:
kafka_data:
driver: local
networks:
kafka-internal:

55
fileradio.py Normal file
View File

@ -0,0 +1,55 @@
from streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class FileRadio(Streamer):
REST_PATH = "sample"
def __init__(self, path):
super().__init__()
self.path = path
self.basename = os.path.basename(self.path)
self.name, self.ext = os.path.splitext(self.basename)
def _stream_thread(self):
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
"-stream_loop", # Loop the stream -
"-1", # ...indefinitely
"-i",
self.path,
"-c",
"copy",
"-f",
"rtsp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
fr = FileRadio("./data/sampleaudio/taunt.mp3")
fr.start_stream()
while True:
time.sleep(1)

View File

@ -10,156 +10,159 @@
import numpy, math, sys, time import numpy, math, sys, time
from numpy import fft from numpy import fft
def impulse(mask): def impulse(mask):
''' Convert frequency domain mask to time-domain ''' """Convert frequency domain mask to time-domain"""
# Negative side, a mirror of positive side # Negative side, a mirror of positive side
negatives = mask[1:-1] negatives = mask[1:-1]
negatives.reverse() negatives.reverse()
mask = mask + negatives mask = mask + negatives
fft_length = len(mask) fft_length = len(mask)
# Convert FFT filter mask to FIR coefficients # Convert FFT filter mask to FIR coefficients
impulse_response = fft.ifft(mask).real.tolist() impulse_response = fft.ifft(mask).real.tolist()
# swap left and right sides # swap left and right sides
left = impulse_response[:fft_length // 2] left = impulse_response[: fft_length // 2]
right = impulse_response[fft_length // 2:] right = impulse_response[fft_length // 2 :]
impulse_response = right + left impulse_response = right + left
return impulse_response return impulse_response
def lo_mask(sample_rate, tap_count, freq, dboct): def lo_mask(sample_rate, tap_count, freq, dboct):
''' Create a freq domain mask for a lowpass filter ''' """Create a freq domain mask for a lowpass filter"""
order = dboct / 6 order = dboct / 6
max_freq = sample_rate / 2.0 max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0) f2s = max_freq / (tap_count / 2.0)
# Convert freq to filter step unit # Convert freq to filter step unit
freq /= f2s freq /= f2s
l = tap_count // 2 l = tap_count // 2
mask = [] mask = []
for f in range(0, l+1): for f in range(0, l + 1):
H = 1.0 / ( 1 + (f / freq) ** (2 * order) ) ** 0.5 H = 1.0 / (1 + (f / freq) ** (2 * order)) ** 0.5
mask.append(H) mask.append(H)
return mask return mask
def hi_mask(sample_rate, tap_count, freq, dboct): def hi_mask(sample_rate, tap_count, freq, dboct):
''' Create a freq domain mask for a highpass filter ''' """Create a freq domain mask for a highpass filter"""
order = dboct / 6 order = dboct / 6
max_freq = sample_rate / 2.0 max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0) f2s = max_freq / (tap_count / 2.0)
# Convert freq frequency to filter step unit # Convert freq frequency to filter step unit
freq /= f2s freq /= f2s
l = tap_count // 2 l = tap_count // 2
mask = [] mask = []
for f in range(0, l+1): for f in range(0, l + 1):
H = 1.0 / ( 1 + (freq / (f + 0.0001)) ** (2 * order) ) ** 0.5 H = 1.0 / (1 + (freq / (f + 0.0001)) ** (2 * order)) ** 0.5
mask.append(H) mask.append(H)
return mask return mask
def combine_masks(mask1, mask2): def combine_masks(mask1, mask2):
''' Combine two filter masks ''' """Combine two filter masks"""
assert len(mask1) == len(mask2) assert len(mask1) == len(mask2)
return [ mask1[i] * mask2[i] for i in range(0, len(mask1)) ] return [mask1[i] * mask2[i] for i in range(0, len(mask1))]
def taps(sample_rate, freq, dboct, is_highpass): def taps(sample_rate, freq, dboct, is_highpass):
cutoff_octaves = 60 / dboct cutoff_octaves = 60 / dboct
if is_highpass: if is_highpass:
cutoff = freq / 2 ** cutoff_octaves cutoff = freq / 2**cutoff_octaves
else: else:
cutoff = freq * 2 ** cutoff_octaves cutoff = freq * 2**cutoff_octaves
cutoff = min(cutoff, sample_rate / 2) cutoff = min(cutoff, sample_rate / 2)
transition_band = abs(freq - cutoff) transition_band = abs(freq - cutoff)
Bt = transition_band / sample_rate Bt = transition_band / sample_rate
taps = int(60 / (22 * Bt)) taps = int(60 / (22 * Bt))
# print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr) # print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr)
return taps return taps
class filter: class filter:
def __init__(self, sample_rate, cutoff): def __init__(self, sample_rate, cutoff):
raise "Abstract" raise "Abstract"
def feed(self, original): def feed(self, original):
unfiltered = numpy.concatenate((self.buf, original)) unfiltered = numpy.concatenate((self.buf, original))
self.buf = unfiltered[-len(self.coefs):] self.buf = unfiltered[-len(self.coefs) :]
filtered = numpy.convolve(unfiltered, self.coefs, mode='valid') filtered = numpy.convolve(unfiltered, self.coefs, mode="valid")
assert len(filtered) == len(original) + 1 assert len(filtered) == len(original) + 1
return filtered[1:] return filtered[1:]
class low_pass(filter): class low_pass(filter):
def __init__(self, sample_rate, f, dbo): def __init__(self, sample_rate, f, dbo):
tap_count = taps(sample_rate, f, dbo, False) tap_count = taps(sample_rate, f, dbo, False)
mask = lo_mask(sample_rate, tap_count, f, dbo) mask = lo_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ] self.buf = [0 for n in self.coefs]
class high_pass(filter): class high_pass(filter):
def __init__(self, sample_rate, f, dbo): def __init__(self, sample_rate, f, dbo):
tap_count = taps(sample_rate, f, dbo, True) tap_count = taps(sample_rate, f, dbo, True)
mask = hi_mask(sample_rate, tap_count, f, dbo) mask = hi_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ] self.buf = [0 for n in self.coefs]
class band_pass(filter): class band_pass(filter):
def __init__(self, sample_rate, lo, hi, dbo): def __init__(self, sample_rate, lo, hi, dbo):
tap_count = max(taps(sample_rate, lo, dbo, True), tap_count = max(
taps(sample_rate, hi, dbo, False)) taps(sample_rate, lo, dbo, True), taps(sample_rate, hi, dbo, False)
lomask = lo_mask(sample_rate, tap_count, hi, dbo) )
himask = hi_mask(sample_rate, tap_count, lo, dbo) lomask = lo_mask(sample_rate, tap_count, hi, dbo)
mask = combine_masks(lomask, himask) himask = hi_mask(sample_rate, tap_count, lo, dbo)
self.coefs = impulse(mask) mask = combine_masks(lomask, himask)
self.buf = [ 0 for n in self.coefs ] self.coefs = impulse(mask)
self.buf = [0 for n in self.coefs]
class deemphasis(filter): class deemphasis(filter):
def __init__(self, sample_rate, us, hi, final_dbo): def __init__(self, sample_rate, us, hi, final_dbo):
# us = RC constant of the hypothetical deemphasis filter # us = RC constant of the hypothetical deemphasis filter
us /= 1000000 us /= 1000000
# 0..lo is not deemphasized # 0..lo is not deemphasized
lo = 1.0 / (2 * math.pi * us) lo = 1.0 / (2 * math.pi * us)
# attenuation from lo to hi should be 10dB # attenuation from lo to hi should be 10dB
octaves = math.log(hi / lo) / math.log(2) octaves = math.log(hi / lo) / math.log(2)
# slope in dB/octave of deemphasis filter # slope in dB/octave of deemphasis filter
dedbo = 10 / octaves dedbo = 10 / octaves
tap_count = max(taps(sample_rate, lo, dedbo, False), tap_count = max(
taps(sample_rate, hi, final_dbo, False)) taps(sample_rate, lo, dedbo, False), taps(sample_rate, hi, final_dbo, False)
)
# Calculate deemphasis filter # Calculate deemphasis filter
demask = lo_mask(sample_rate, tap_count, lo, dedbo) demask = lo_mask(sample_rate, tap_count, lo, dedbo)
# Calculate low-pass filter after deemphasis # Calculate low-pass filter after deemphasis
fmask = lo_mask(sample_rate, tap_count, hi, final_dbo) fmask = lo_mask(sample_rate, tap_count, hi, final_dbo)
mask = combine_masks(demask, fmask) mask = combine_masks(demask, fmask)
self.coefs = impulse(mask) self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ] self.buf = [0 for n in self.coefs]
class decimator(filter): class decimator(filter):
def __init__(self, factor): def __init__(self, factor):
self.buf2 = [] self.buf2 = []
self.factor = int(factor) self.factor = int(factor)
def feed(self, original): def feed(self, original):
original = numpy.concatenate((self.buf2, original)) original = numpy.concatenate((self.buf2, original))
# Gets the last n-th sample of every n (n = factor) # Gets the last n-th sample of every n (n = factor)
# If e.g. gets 12 samples, gets s[4] and s[9], and # If e.g. gets 12 samples, gets s[4] and s[9], and
# stoves s[10:] to the next round # stoves s[10:] to the next round
''' """
decimated = [ original[ self.factor * i + self.factor - 1 ] \ decimated = [ original[ self.factor * i + self.factor - 1 ] \
for i in range(0, len(original) // self.factor) ] for i in range(0, len(original) // self.factor) ]
''' """
decimated = original[(self.factor - 1)::self.factor] decimated = original[(self.factor - 1) :: self.factor]
self.buf2 = original[:-len(original) % self.factor] self.buf2 = original[: -len(original) % self.factor]
return decimated return decimated

View File

@ -19,12 +19,24 @@ from formats import FORMATS
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true') parser.add_argument(
parser.add_argument('-f', '--format', choices=list(FORMATS.keys()), help='Input sample format', required=True) "-v", "--verbose", help="Print additional informational output", action="store_true"
parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True) )
parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True) parser.add_argument(
"-f",
"--format",
choices=list(FORMATS.keys()),
help="Input sample format",
required=True,
)
parser.add_argument(
"-s", "--sample-rate", metavar="rate", help="Source sample rate (Hz)", required=True
)
parser.add_argument(
"-d", "--demod-rate", metavar="rate", help="Output sample rate (Hz)", required=True
)
# TODO JMT: Output to file # TODO JMT: Output to file
#parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.') # parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
args = parser.parse_args() args = parser.parse_args()
INPUT_RATE = int(prefixed.Float(args.sample_rate)) INPUT_RATE = int(prefixed.Float(args.sample_rate))
@ -32,16 +44,19 @@ OUTPUT_RATE = int(prefixed.Float(args.demod_rate))
DECIMATION = INPUT_RATE / OUTPUT_RATE DECIMATION = INPUT_RATE / OUTPUT_RATE
if DECIMATION != math.floor(DECIMATION): if DECIMATION != math.floor(DECIMATION):
print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr) print(
sys.exit(1) f"The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}",
file=sys.stderr,
)
sys.exit(1)
FORMAT = FORMATS[args.format].numpy FORMAT = FORMATS[args.format].numpy
DT = np.dtype(FORMAT) DT = np.dtype(FORMAT)
BYTES_PER_SAMPLE = 2 * DT.itemsize BYTES_PER_SAMPLE = 2 * DT.itemsize
MAX_DEVIATION = 200000.0 # Hz MAX_DEVIATION = 200000.0 # Hz
FM_BANDWIDTH = 15000 # Hz FM_BANDWIDTH = 15000 # Hz
STEREO_CARRIER = 38000 # Hz STEREO_CARRIER = 38000 # Hz
DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2)) DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2))
pll = math.pi - random.random() * 2 * math.pi pll = math.pi - random.random() * 2 * math.pi
@ -63,137 +78,138 @@ decimate2 = filters.decimator(DECIMATION)
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120) lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Band-pass filter for stereo (L-R) modulated audio # Band-pass filter for stereo (L-R) modulated audio
hi = filters.band_pass(INPUT_RATE, hi = filters.band_pass(
STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120) INPUT_RATE, STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120
)
# Filter to extract pilot signal # Filter to extract pilot signal
pilot = filters.band_pass(INPUT_RATE, pilot = filters.band_pass(
STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120) INPUT_RATE, STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120
)
last_angle = 0.0 last_angle = 0.0
remaining_data = b'' remaining_data = b""
while True: while True:
# Ingest 0.1s worth of data # Ingest 0.1s worth of data
data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10) data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10)
if not data: if not data:
break break
# TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1 # TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1
#data = remaining_data + data # data = remaining_data + data
if len(data) < 2 * BYTES_PER_SAMPLE: if len(data) < 2 * BYTES_PER_SAMPLE:
remaining_data = data remaining_data = data
continue continue
# Save one sample to next batch, and the odd byte if exists # Save one sample to next batch, and the odd byte if exists
if len(data) % 2 == 1: if len(data) % 2 == 1:
print("Odd byte, that's odd", file=sys.stderr) print("Odd byte, that's odd", file=sys.stderr)
remaining_data = data[-3:] remaining_data = data[-3:]
data = data[:-1] data = data[:-1]
else: else:
remaining_data = data[-2:] remaining_data = data[-2:]
samples = len(data) // BYTES_PER_SAMPLE samples = len(data) // BYTES_PER_SAMPLE
# Find angle (phase) of I/Q pairs # Find angle (phase) of I/Q pairs
iqdata = np.frombuffer(data, dtype=FORMAT) iqdata = np.frombuffer(data, dtype=FORMAT)
if args.verbose: if args.verbose:
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
if np.issubdtype(FORMAT, np.integer): if np.issubdtype(FORMAT, np.integer):
iinfo = np.iinfo(FORMAT) iinfo = np.iinfo(FORMAT)
if np.issubdtype(FORMAT, np.unsignedinteger): if np.issubdtype(FORMAT, np.unsignedinteger):
iqdata = iqdata - (iinfo.max / 2.0) iqdata = iqdata - (iinfo.max / 2.0)
iqdata = iqdata / (iinfo.max / 2.0) iqdata = iqdata / (iinfo.max / 2.0)
else: else:
iqdata = iqdata / np.float64(iinfo.max) iqdata = iqdata / np.float64(iinfo.max)
else: else:
iqdata = iqdata.astype(np.float64) iqdata = iqdata.astype(np.float64)
if args.verbose: if args.verbose:
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
iqdata = iqdata.view(complex) iqdata = iqdata.view(complex)
angles = np.angle(iqdata) angles = np.angle(iqdata)
# Determine phase rotation between samples # Determine phase rotation between samples
rotations = np.ediff1d(angles) rotations = np.ediff1d(angles)
# Wrap rotations >= +/-180º # Wrap rotations >= +/-180º
rotations = (rotations + np.pi) % (2 * np.pi) - np.pi rotations = (rotations + np.pi) % (2 * np.pi) - np.pi
# Convert rotations to baseband signal # Convert rotations to baseband signal
output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL) output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL)
output_raw = np.clip(output_raw, -0.999, +0.999) output_raw = np.clip(output_raw, -0.999, +0.999)
# At this point, output_raw contains two audio signals: # At this point, output_raw contains two audio signals:
# L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC, # L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC,
# carrier 38kHz # carrier 38kHz
# Downsample and low-pass L+R (mono) signal # Downsample and low-pass L+R (mono) signal
output_mono = lo.feed(output_raw) output_mono = lo.feed(output_raw)
output_mono = decimate1.feed(output_mono) output_mono = decimate1.feed(output_mono)
# Filter pilot tone # Filter pilot tone
detected_pilot = pilot.feed(output_raw) detected_pilot = pilot.feed(output_raw)
# Separate ultrasonic L-R signal by high-pass filtering # Separate ultrasonic L-R signal by high-pass filtering
output_jstereo_mod = hi.feed(output_raw) output_jstereo_mod = hi.feed(output_raw)
output_jstereo = [] output_jstereo = []
# Demodulate L-R, which is AM-SC with 53kHz carrier # Demodulate L-R, which is AM-SC with 53kHz carrier
for n in range(0, len(output_jstereo_mod)): for n in range(0, len(output_jstereo_mod)):
# Advance carrier # Advance carrier
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
# Standard demodulation # Standard demodulation
output_jstereo.append(math.cos(pll) * output_jstereo_mod[n]) output_jstereo.append(math.cos(pll) * output_jstereo_mod[n])
# Detect pilot zero-crossing # Detect pilot zero-crossing
cur_pilot = detected_pilot[n] cur_pilot = detected_pilot[n]
zero_crossed = (cur_pilot * last_pilot) <= 0 zero_crossed = (cur_pilot * last_pilot) <= 0
last_pilot = cur_pilot last_pilot = cur_pilot
if not zero_crossed: if not zero_crossed:
continue continue
# When pilot is at 90º or 270º, carrier should be around 180º # When pilot is at 90º or 270º, carrier should be around 180º
ideal = math.pi ideal = math.pi
deviation = pll - ideal deviation = pll - ideal
if deviation > math.pi: if deviation > math.pi:
deviation -= tau deviation -= tau
deviation_avg = 0.99 * deviation_avg + 0.01 * deviation deviation_avg = 0.99 * deviation_avg + 0.01 * deviation
rotation = deviation_avg - last_deviation_avg rotation = deviation_avg - last_deviation_avg
last_deviation_avg = deviation_avg last_deviation_avg = deviation_avg
if abs(deviation_avg) > math.pi / 8: if abs(deviation_avg) > math.pi / 8:
# big phase deviation, reset PLL # big phase deviation, reset PLL
pll = ideal pll = ideal
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
deviation_avg = 0.0 deviation_avg = 0.0
last_deviation_avg = 0.0 last_deviation_avg = 0.0
# Translate rotation to frequency deviation # Translate rotation to frequency deviation
STEREO_CARRIER /= (1 + (rotation * 1.05) / tau) STEREO_CARRIER /= 1 + (rotation * 1.05) / tau
# Downsample, Low-pass/deemphasis demodulated L-R
output_jstereo = lo_r.feed(output_jstereo)
output_jstereo = decimate2.feed(output_jstereo)
# Downsample, Low-pass/deemphasis demodulated L-R assert len(output_jstereo) == len(output_mono)
output_jstereo = lo_r.feed(output_jstereo)
output_jstereo = decimate2.feed(output_jstereo)
assert len(output_jstereo) == len(output_mono) # Scale to 16-bit and divide by 2 for channel sum
output_mono = np.multiply(output_mono, 32767 / 2.0)
output_jstereo = np.multiply(output_jstereo, 32767 / 2.0)
# Scale to 16-bit and divide by 2 for channel sum # Output stereo by adding or subtracting joint-stereo to mono
output_mono = np.multiply(output_mono, 32767 / 2.0) output_left = output_mono + output_jstereo
output_jstereo = np.multiply(output_jstereo, 32767 / 2.0) output_right = output_mono - output_jstereo
# Output stereo by adding or subtracting joint-stereo to mono # Interleave L and R samples using np trickery
output_left = output_mono + output_jstereo output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype)
output_right = output_mono - output_jstereo output[0::2] = output_left
output[1::2] = output_right
output = output.astype(int)
# Interleave L and R samples using np trickery sys.stdout.buffer.write(struct.pack("<%dh" % len(output), *output))
output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype)
output[0::2] = output_left
output[1::2] = output_right
output = output.astype(int)
sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))

View File

@ -2,6 +2,7 @@ import numpy as np
from SoapySDR import * from SoapySDR import *
from dataclasses import dataclass from dataclasses import dataclass
@dataclass @dataclass
class FormatSpec: class FormatSpec:
name: str name: str
@ -9,9 +10,10 @@ class FormatSpec:
numpy: np.dtype numpy: np.dtype
packing: str packing: str
FORMATS = { FORMATS = {
'CU8': FormatSpec('CU8', SOAPY_SDR_CU8, np.uint8, '=%dB'), "CU8": FormatSpec("CU8", SOAPY_SDR_CU8, np.uint8, "=%dB"),
'CS8': FormatSpec('CS8', SOAPY_SDR_CS8, np.int8, '=%db'), "CS8": FormatSpec("CS8", SOAPY_SDR_CS8, np.int8, "=%db"),
'CS16': FormatSpec('CS16', SOAPY_SDR_CS16, np.int16, '=%dh'), "CS16": FormatSpec("CS16", SOAPY_SDR_CS16, np.int16, "=%dh"),
'CF32': FormatSpec('CF32', SOAPY_SDR_CF32, np.float32, '=%df'), "CF32": FormatSpec("CF32", SOAPY_SDR_CF32, np.float32, "=%df"),
} }

View File

@ -1,64 +0,0 @@
from kafka import KafkaProducer
import subprocess
import os
kafka_servers = ['localhost:9092']
producers = {}
# Do anything that might take more than a few ms early, otherwise you hit overflow errors
def prep_io(process, process_name=None):
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
if os.get_blocking(pipe.fileno()):
os.set_blocking(pipe.fileno(), False)
if process.pid not in producers:
producers[process.pid] = {}
if pipe_name not in producers[process.pid]:
producers[process.pid][pipe_name] = KafkaProducer(
bootstrap_servers=kafka_servers,
max_block_ms=200
)
# Inspired by gist:
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
def check_io(process, process_name=None):
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
while True:
# The python official docs recommend using Popen.communicate() instead of reading
# from the pipes directly. Using Popen.communicate is a blocking call though,
# which would stall threads for processes producing no output.
output = pipe.readline()
if output:
producers[process.pid][pipe_name].send(topic_name, value=output)
else:
break
producers[process.pid][pipe_name].flush()
if __name__ == '__main__':
import time
ping = subprocess.Popen(
[
'/usr/bin/ping',
'1.1.1.1',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
while True:
check_io(ping, 'ping')
time.sleep(0.1)

View File

@ -38,10 +38,10 @@ udpMaxPayloadSize: 1472
externalAuthenticationURL: externalAuthenticationURL:
# Enable the HTTP API. # Enable the HTTP API.
api: no api: yes
# Enable Prometheus-compatible metrics. # Enable Prometheus-compatible metrics.
metrics: no metrics: yes
# Enable pprof-compatible endpoint to monitor performances. # Enable pprof-compatible endpoint to monitor performances.
pprof: no pprof: no

View File

@ -1,7 +1,12 @@
#! /usr/bin/env python3 #! /usr/bin/env python3
import os
import sys
import requests
import SoapySDR as soapy import SoapySDR as soapy
from radio import Radio from radio import Radio
from tuuube import Tuuube
from fileradio import FileRadio
from flask import Flask, jsonify from flask import Flask, jsonify
from flasgger import Swagger from flasgger import Swagger
@ -13,8 +18,30 @@ swag = Swagger(app)
radios = {} radios = {}
@app.route('/radio/report') @app.route("/report")
def report(): def report():
"""Get streams report from the RTSP relay.
---
responses:
200:
description: JSON with streams report.
400:
description: JSON with error message.
"""
try:
r = requests.get("http://localhost:9997/v1/paths/list")
j = r.json()
for item in j["items"]:
del j["items"][item]["conf"]
del j["items"][item]["confName"]
return jsonify(j)
except Exception as e:
return _error_message_json(e.message), 500
@app.route("/radio/report")
def radio_report():
"""List radio devices available to the system. """List radio devices available to the system.
--- ---
responses: responses:
@ -25,7 +52,8 @@ def report():
devices = [dict(device) for device in soapy.Device.enumerate()] devices = [dict(device) for device in soapy.Device.enumerate()]
return jsonify(devices) return jsonify(devices)
@app.route('/radio/<radio>/connect')
@app.route("/radio/<radio>/connect")
def connect(radio): def connect(radio):
"""Connect to a radio device, by driver name or serial number. """Connect to a radio device, by driver name or serial number.
--- ---
@ -37,29 +65,29 @@ def connect(radio):
required: true required: true
responses: responses:
200: 200:
description: Successfully connected to a radio. description: JSON with message successfully connected to a radio.
400: 400:
description: No radio device by that name is available. description: JSON with error message No radio device by that name is available.
500: 500:
description: Failed to connect to radio. description: JSON with error message failed to connect to radio.
""" """
if radio in radios: if radio in radios:
return "Radio device already connected", 400 return _error_message_json("Radio device already connected"), 400
devices = [dict(device) for device in soapy.Device.enumerate()] devices = [dict(device) for device in soapy.Device.enumerate()]
for device in devices: for device in devices:
if radio in device.values(): if radio in device.values():
try: try:
radios[radio] = Radio(radio, device) radios[radio] = Radio(radio, device)
return "", 200 return jsonify("message", "successfully connected radio"), 200
except Exception as e: except Exception as e:
radios.pop(radio) radios.pop(radio)
return str(e), 500 return _error_message_json(e.message), 500
return "Radio device not found", 400 return "Radio device not found", 400
@app.route('/radio/<radio>/disconnect')
@app.route("/radio/<radio>/disconnect")
def disconnect(radio): def disconnect(radio):
"""Disconnect from a radio device. """Disconnect from a radio device.
--- ---
@ -79,11 +107,12 @@ def disconnect(radio):
""" """
if radio in radios: if radio in radios:
radios.pop(radio) radios.pop(radio)
return "", 200 return jsonify("message", "succesfully disconnected radio"), 200
else: else:
return "Radio not connected", 400 return _error_message_json("Radio not connected"), 400
@app.route('/radio/<radio>/configure/<frequency>')
@app.route("/radio/<radio>/configure/<frequency>")
def configure(radio, frequency): def configure(radio, frequency):
"""Tune the radio to a frequency. """Tune the radio to a frequency.
You must connect to the radio before attempting to configure it. You must connect to the radio before attempting to configure it.
@ -102,20 +131,21 @@ def configure(radio, frequency):
required: true required: true
responses: responses:
200: 200:
description: JSON description: JSON with radio configuration.
400: 400:
description: The specified radio is not connected. description: The specified radio is not connected.
""" """
if radio in radios: if radio in radios:
return jsonify(radios[radio].configure(frequency)) return jsonify(radios[radio].configure(frequency))
else: else:
return "Radio not connected", 400 return _error_message_json("Radio not connected"), 400
@app.route('/radio/<radio>/start')
@app.route("/radio/<radio>/start")
def start_stream(radio): def start_stream(radio):
"""Start the radio stream. """Start the radio stream.
Once the stream has been started, connect to the stream at: Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/radio/[radio]/stream rtsp://[host]:8554/radio/[radio]
--- ---
parameters: parameters:
- name: radio - name: radio
@ -123,14 +153,20 @@ def start_stream(radio):
in: path in: path
type: string type: string
required: true required: true
responses:
200:
description: JSON with message successful start of radio stream.
400:
description: JSON with error message.
""" """
try: try:
radios[radio].start_stream() radios[radio].start_stream()
return "", 200 return jsonify("message", "successfully started radio stream"), 200
except Exception as e: except Exception as e:
return str(e), 400 return _error_message_json(e.message), 400
@app.route('/radio/<radio>/end')
@app.route("/radio/<radio>/end")
def end_stream(radio): def end_stream(radio):
"""Terminate the radio stream. """Terminate the radio stream.
--- ---
@ -140,14 +176,21 @@ def end_stream(radio):
in: path in: path
type: string type: string
required: true required: true
responses:
200:
description: JSON with message successful termination of radio stream.
400:
description: JSON with error message.
""" """
try: try:
radios[radio].end_stream() radios[radio].end_stream()
return "", 200 return jsonify("message", "successfully ended radio stream"), 200
except Exception as e: except Exception as e:
return str(e), 400 error_message = {"error_message": e.message}
return _error_message_json(e.message), 400
@app.route('/radio/<radio>/info')
@app.route("/radio/<radio>/info")
def radio_info(radio): def radio_info(radio):
"""Get information about a radio. """Get information about a radio.
--- ---
@ -159,40 +202,117 @@ def radio_info(radio):
required: true required: true
responses: responses:
200: 200:
description: JSON description: JSON with radio information.
400: 400:
description: The specified radio is not connected. description: JSON with error message.
""" """
try: try:
return jsonify(radios[radio].get_info()) return jsonify(radios[radio].get_info())
except Exception as e: except Exception as e:
return str(e), 400 return _error_message_json(e.message), 400
if __name__ == '__main__': tubes = {}
@app.route("/tuuube/<id>/start")
def start_tuuube_stream(id):
"""Start streaming from a youtube source.
Once the stream has been started, connect to the stream at:
rtsp://[host]:8554/tuuube/[id]
---
parameters:
- name: id
description:
The youtube video ID. That is the part following the `watch?v=` in the URL. For example, `dQw4w9WgXcQ`.\n
Other good options are - \n
`BaW_jenozKc`, yt_dlp package test video.\n
`b2je8uBlgFM`, stereo audio test.\n
`LDU_Txk06tM`, crab rave, a commonly used audio fidelity test.\n
`sPT_epMLkwQ`, Kilsyth CFA major factory fire dispatch radio traffic.
in: path
type: string
required: true
responses:
200:
description: JSON with message successful start of youtube stream.
400:
description: JSON with error message.
"""
if id not in tubes:
tubes[id] = Tuuube(id)
try:
tubes[id].start_stream()
return jsonify("message", "successfully started youtube stream"), 200
except Exception as e:
return _error_message_json(e.message), 400
@app.route("/tuuube/<id>/end")
def end_tuuube_stream(id):
"""Terminate the youtube stream.
---
parameters:
- name: id
description: The youtube video ID.
in: path
type: string
required: true
responses:
200:
description: JSON with message successful termination of youtube stream.
400:
description: JSON with error message.
"""
try:
tubes[id].end_stream()
return jsonify("message", "succesfully ended youtube stream"), 200
except Exception as e:
return _error_message_json(e.message), 400
"""
Helper function to return a JSON error message
parameters: error_message - the error message to return
returns: a JSON object with the error message
"""
def _error_message_json(error_message: str):
error_message = {"error_message": error_message}
return jsonify(error_message)
if __name__ == "__main__":
import subprocess import subprocess
rtsp_relay = subprocess.Popen( rtsp_relay = subprocess.Popen(
[ ["./dependencies/mediamtx/mediamtx", "./mediamtx.yml"],
'./dependencies/mediamtx/mediamtx',
'./mediamtx.yml'
],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL stderr=subprocess.DEVNULL,
) )
app.run( for path, _, files in os.walk("./data/sampleaudio"):
host='0.0.0.0', for file in files:
threaded=True, name, ext = os.path.splitext(file)
debug=False if ext == ".mp3":
) tubes[name] = FileRadio(f"{path}/{file}")
tubes[name].start_stream()
print('Stopping any currently streaming radios...') app.run(host="0.0.0.0", threaded=True, debug=False)
print("Stopping any currently streaming radios...")
for radio in radios: for radio in radios:
if radios[radio].is_streaming(): if radios[radio].is_streaming():
radios[radio].end_stream() radios[radio].end_stream()
radios = None radios = None
print('Killing RTSP relay...') for tube in tubes:
if tubes[tube].is_streaming():
tubes[tube].end_stream()
tubes = None
print("Killing RTSP relay...")
rtsp_relay.kill() rtsp_relay.kill()
rtsp_relay.wait() # Necessary? rtsp_relay.wait() # Necessary?

161
radio.py
View File

@ -7,20 +7,19 @@ import subprocess
import struct import struct
from soapyhelpers import * from soapyhelpers import *
from samplerates import * from samplerates import *
from kafkalogging import * from streamer import Streamer, is_alive
class Radio: class Radio(Streamer):
FORMAT = 'CS16' REST_PATH = "radio"
FORMAT = "CS16"
SAMPLES = 8192 SAMPLES = 8192
PORT = 8554
def __init__(self, name, device_info): def __init__(self, name, device_info):
super().__init__()
self.name = name self.name = name
self.device_info = device_info self.device_info = device_info
self.run = False
self.thread = None
self.device = soapy.Device(device_info) self.device = soapy.Device(device_info)
if self.device is None: if self.device is None:
raise RuntimeError("Failed to connect to radio device") raise RuntimeError("Failed to connect to radio device")
@ -34,7 +33,7 @@ class Radio:
frequency = int(prefixed.Float(frequency)) frequency = int(prefixed.Float(frequency))
bandwidth = 200000 bandwidth = 200000
sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates']) sample_rates = preferred_sample_rates(self.capabilities["rx"]["sample-rates"])
if len(sample_rates) == 0: if len(sample_rates) == 0:
raise RuntimeError("No suitable sample rates are available") raise RuntimeError("No suitable sample rates are available")
self.sample_rate, self.output_rate = sample_rates[0] self.sample_rate, self.output_rate = sample_rates[0]
@ -47,72 +46,52 @@ class Radio:
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
return { return {
'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), "frequency": self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), "sample-rate": self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), "bandwidth": self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual', "gain-mode": "auto"
if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0)
else "manual",
} }
def get_info(self): def get_info(self):
return { return {
'name': self.name, "name": self.name,
'device': self.device_info, "device": self.device_info,
'capabilities': self.capabilities, "capabilities": self.capabilities,
'stream-path': self._stream_path(), "stream-path": self._stream_path(),
'streaming': self.is_streaming(), "streaming": self.is_streaming(),
} }
def _get_capabilities(self): def _get_capabilities(self):
def get_direction_capabilities(direction): def get_direction_capabilities(direction):
return { return {
'antennas': self.device.listAntennas(direction, 0), "antennas": self.device.listAntennas(direction, 0),
'gains': self.device.listGains(direction, 0), "gains": self.device.listGains(direction, 0),
'frequencies': self.device.listFrequencies(direction, 0), "frequencies": self.device.listFrequencies(direction, 0),
'sample-rates': self.device.listSampleRates(direction, 0), "sample-rates": self.device.listSampleRates(direction, 0),
'bandwidths': self.device.listBandwidths(direction, 0), "bandwidths": self.device.listBandwidths(direction, 0),
'sensors': self.device.listSensors(direction, 0), "sensors": self.device.listSensors(direction, 0),
'formats': self.device.getStreamFormats(direction, 0), "formats": self.device.getStreamFormats(direction, 0),
} }
return { return {
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX), "rx": get_direction_capabilities(soapy.SOAPY_SDR_RX),
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX), "tx": get_direction_capabilities(soapy.SOAPY_SDR_TX),
'clock-sources': self.device.listClockSources(), "clock-sources": self.device.listClockSources(),
'time-sources': self.device.listTimeSources(), "time-sources": self.device.listTimeSources(),
'register-interfaces': self.device.listRegisterInterfaces(), "register-interfaces": self.device.listRegisterInterfaces(),
'gpios': self.device.listGPIOBanks(), "gpios": self.device.listGPIOBanks(),
'uarts': self.device.listUARTs(), "uarts": self.device.listUARTs(),
} }
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError('Stream thread is already running')
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError('No stream thread to terminate')
self.run = False
self.thread.join()
self.thread = None
def _stream_thread(self): def _stream_thread(self):
self._init_stream() self._init_stream()
def is_alive(subprocess):
return (subprocess.poll() is None)
while self.run: while self.run:
# Check that the child processes are still running # Check that the child processes are still running
if (not is_alive(self.demod)) or (not is_alive(self.playback)): if (not is_alive(self.demod)) or (not is_alive(self.playback)):
print('DSP chain failed, aborting stream.', file=sys.stderr) print("DSP chain failed, aborting stream.", file=sys.stderr)
break break
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
@ -122,46 +101,63 @@ class Radio:
elif result.ret < 0: elif result.ret < 0:
error = SoapyError(result.ret) error = SoapyError(result.ret)
if error is not SoapyError.Timeout: if error is not SoapyError.Timeout:
print("Stream read failed, aborting stream:", error, file=sys.stderr) print(
"Stream read failed, aborting stream:", error, file=sys.stderr
)
break break
continue continue
else: else:
read_size = int(result.ret * 2) read_size = int(result.ret * 2)
self.demod.stdin.write( self.demod.stdin.write(
struct.pack(FORMATS[Radio.FORMAT].packing % read_size, struct.pack(
*self.buffer[:read_size]) FORMATS[Radio.FORMAT].packing % read_size,
*self.buffer[:read_size],
)
) )
# handle subprocess logs
check_io(self.demod, f"{self.name}-demod")
check_io(self.playback, f"{self.name}-playback")
self._cleanup_stream() self._cleanup_stream()
def _init_stream(self): def _init_stream(self):
self.playback = subprocess.Popen( self.playback = subprocess.Popen(
[ [
'/usr/bin/ffmpeg', '-f', 's16le', '-ar', str(self.output_rate), '-ac', '2', '-i', '-', "/usr/bin/ffmpeg",
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}" "-f",
"s16le",
"-ar",
str(self.output_rate),
"-ac",
"2",
"-i",
"-",
"-f",
"rtsp",
self.stream_address(),
], ],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE stderr=subprocess.DEVNULL,
) )
self.demod = subprocess.Popen( self.demod = subprocess.Popen(
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)], [
"/usr/bin/python3",
"fm_demod.py",
"-f",
"CS16",
"-s",
str(self.sample_rate),
"-d",
str(self.output_rate),
],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=self.playback.stdin, stdout=self.playback.stdin,
stderr=subprocess.PIPE stderr=subprocess.DEVNULL,
) )
prep_io(self.demod, f"{self.name}-demod")
prep_io(self.playback, f"{self.name}-playback")
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy) self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy) self.stream = self.device.setupStream(
soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy
)
result = self.device.activateStream(self.stream) result = self.device.activateStream(self.stream)
if result != 0: if result != 0:
@ -185,29 +181,26 @@ class Radio:
self.playback.wait() self.playback.wait()
self.playback = None self.playback = None
def _stream_path(self):
return f":{Radio.PORT}/radio/{self.name}"
""" """
Quick and dirty test of the Radio class. Quick and dirty test of the Radio class.
""" """
if __name__ == '__main__': if __name__ == "__main__":
import time import time
sdr = Radio('demo', {'driver': 'sdrplay'}) sdr = Radio("demo", {"driver": "sdrplay"})
print('Configuring...') print("Configuring...")
sdr.configure('105.5M') sdr.configure("105.5M")
print('Configured.') print("Configured.")
print('Starting stream...') print("Starting stream...")
sdr.start_stream() sdr.start_stream()
print('Stream started.') print("Stream started.")
# Let the stream play for a while # Let the stream play for a while
time.sleep(15) time.sleep(15)
print('Ending stream...') print("Ending stream...")
sdr.end_stream() sdr.end_stream()
print('Stream ended.') print("Stream ended.")

View File

@ -3,3 +3,5 @@ flasgger==0.9.5
Flask==2.2.3 Flask==2.2.3
numpy==1.17.4 numpy==1.17.4
prefixed==0.7.0 prefixed==0.7.0
PyYAML==6.0
requests==2.22.0

View File

@ -6,7 +6,7 @@ Sample rates stolen from:
https://en.wikipedia.org/wiki/Sampling_(signal_processing) https://en.wikipedia.org/wiki/Sampling_(signal_processing)
""" """
supported_ouput_rates = [ supported_ouput_rates = [
8000, # Telephone, P25 audio (sufficient for speech, some consonants unintelligble) 8000, # Telephone, P25 audio (sufficient for speech, some consonants unintelligble)
16000, # Modern telephone/VoIP, good quality speech 16000, # Modern telephone/VoIP, good quality speech
32000, # FM radio 32000, # FM radio
44100, # CD audio quality 44100, # CD audio quality
@ -14,9 +14,10 @@ supported_ouput_rates = [
50000, # Uncommon but supported 50000, # Uncommon but supported
88200, 88200,
96000, # DVD/Blu-ray audio 96000, # DVD/Blu-ray audio
192000, # Too much. 192000, # Too much.
] ]
def score(pair, target_output=32000, target_ratio=10): def score(pair, target_output=32000, target_ratio=10):
""" """
Heuristic for scoring input & output sample rates. The criteria are: Heuristic for scoring input & output sample rates. The criteria are:
@ -36,29 +37,38 @@ def score(pair, target_output=32000, target_ratio=10):
ratio = pair[0] // pair[1] ratio = pair[0] // pair[1]
return abs(pair[1] - target_output)/2500 \ return (
+ max(0, target_output - pair[1])/2500 \ abs(pair[1] - target_output) / 2500
+ abs(ratio - target_ratio)**0.8 \ + max(0, target_output - pair[1]) / 2500
+ max(0, target_ratio - ratio)**2 + abs(ratio - target_ratio) ** 0.8
+ max(0, target_ratio - ratio) ** 2
)
def flatten(l): def flatten(l):
return [item for sublist in l for item in sublist] return [item for sublist in l for item in sublist]
def flatten_dict(d): def flatten_dict(d):
return [(key,value) for key,rates in d.items() for value in rates] return [(key, value) for key, rates in d.items() for value in rates]
def get_pairs(input_rate): def get_pairs(input_rate):
return [ return [
(input_rate, rate) (input_rate, rate) for rate in supported_ouput_rates if (input_rate % rate == 0)
for rate in supported_ouput_rates
if (input_rate % rate == 0)
] ]
def supported_sample_rates(supported_input_rates): def supported_sample_rates(supported_input_rates):
return { return {
in_rate: [out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0] in_rate: [
out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0
]
for in_rate in supported_input_rates for in_rate in supported_input_rates
} }
def preferred_sample_rates(supported_input_rates): def preferred_sample_rates(supported_input_rates):
return sorted(flatten_dict(supported_sample_rates(supported_input_rates)), key=score) return sorted(
flatten_dict(supported_sample_rates(supported_input_rates)), key=score
)

View File

@ -1,34 +0,0 @@
#! /bin/bash
set -e
server="localhost:5000"
radio="sdrplay"
frequency="105.5M"
trap deinit INT
deinit() {
curl $server/radio/$radio/end
curl $server/radio/$radio/disconnect
#kill $pid
killall ffplay
echo -n ""
}
init() {
curl $server/radio/$radio/connect
curl $server/radio/$radio/configure/$frequency
curl $server/radio/$radio/start
}
init
echo "Waiting for stream to become active..."
sleep 3
ffplay -nodisp -hide_banner rtsp://localhost:8554/radio/sdrplay &
pid=$!
while true; do
sleep 1
done

8
scripts/wav2mp3.sh Executable file
View File

@ -0,0 +1,8 @@
#! /bin/bash
for arg in "$@"; do
path="${arg%/*}"
file="${arg##*/}"
filename="${file%%.*}"
ffmpeg -i $path/$file -vn -ar 44100 -ac 2 -b:a 100k $path/$filename.mp3
done

View File

@ -8,6 +8,10 @@ setup() {
sudo xargs apt-get install -y < requirements.apt sudo xargs apt-get install -y < requirements.apt
sudo pip install -r requirements.pip sudo pip install -r requirements.pip
# Requires yt-dlp, but a very recent version (as youtube broke their API for the pip version)
# https://stackoverflow.com/a/75504772
python3 -m pip install --force-reinstall https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz
# Install device driver # Install device driver
./scripts/SDRplay_RSP_API-Linux-3.07.1.run ./scripts/SDRplay_RSP_API-Linux-3.07.1.run

View File

@ -1,6 +1,7 @@
import SoapySDR as soapy import SoapySDR as soapy
from enum import IntEnum from enum import IntEnum
class SoapyError(IntEnum): class SoapyError(IntEnum):
Timeout = soapy.SOAPY_SDR_TIMEOUT Timeout = soapy.SOAPY_SDR_TIMEOUT
StreamError = soapy.SOAPY_SDR_STREAM_ERROR StreamError = soapy.SOAPY_SDR_STREAM_ERROR
@ -10,6 +11,7 @@ class SoapyError(IntEnum):
TimeError = soapy.SOAPY_SDR_TIME_ERROR TimeError = soapy.SOAPY_SDR_TIME_ERROR
Underflow = soapy.SOAPY_SDR_UNDERFLOW Underflow = soapy.SOAPY_SDR_UNDERFLOW
class SoapyFlag(IntEnum): class SoapyFlag(IntEnum):
EndBurst = soapy.SOAPY_SDR_END_BURST EndBurst = soapy.SOAPY_SDR_END_BURST
HasTime = soapy.SOAPY_SDR_HAS_TIME HasTime = soapy.SOAPY_SDR_HAS_TIME

51
streamer.py Normal file
View File

@ -0,0 +1,51 @@
from threading import Thread
import yaml
with open("mediamtx.yml", "r") as config_file:
MEDIASERVER_CONFIG = yaml.safe_load(config_file)
def is_alive(subprocess):
return True if (subprocess and subprocess.poll() is None) else False
class Streamer:
PROTOCOL = "rtsp"
REST_PATH = "stream"
def __init__(self):
self.run = False
self.thread = None
self.name = None
def stream_path(self):
return f"{MEDIASERVER_CONFIG['rtspAddress']}/{type(self).REST_PATH}/{self.name}"
def stream_address(self, host):
return f"{Streamer.PROTOCOL}://{host}{self.stream_path()}"
def is_streaming(self):
return True if (self.thread and self.thread.is_alive()) else False
def start_stream(self):
if self.is_streaming():
raise RuntimeError("Stream thread is already running")
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def end_stream(self):
if self.thread is None:
raise RuntimeError("No stream thread to terminate")
self.run = False
self.thread.join()
self.thread = None
if __name__ == "__main__":
from pprint import pprint
pprint(MEDIASERVER_CONFIG)

92
tuuube.py Executable file
View File

@ -0,0 +1,92 @@
#! /usr/bin/env python3
"""
We aren't using either the apt or the pip repositories for the youtube_dl
as there is a known bug affecting those two versions. The youtube API has changed
since their release, causing downloads to fail.
Make sure you use the ./setup.sh script to obtain the latest github release of
yt_dlp, as this version carries the latest fixes.
"""
# import youtube_dl
import yt_dlp as youtube_dl
from streamer import Streamer, is_alive
import subprocess
import time
import sys
import os
class Tuuube(Streamer):
REST_PATH = "tuuube"
def __init__(self, name):
super().__init__()
self.name = name
self.playback = None
def source_path(self):
return f"/tmp/{self.name}.mp3"
def _stream_thread(self):
if not os.path.exists(self.source_path()) or not os.path.isfile(
self.source_path()
):
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": f"/tmp/{self.name}.%(ext)s", # yt_dlp will append %(ext) if not specified,
"postprocessors": [ # resulting in `/tmp/file.mp3.mp3` :/
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
}
try:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"https://www.youtube.com/watch?v={self.name}"])
except Exception as e:
print(f"File sourcing failed, aborting stream. {e}", file=sys.stderr)
self.run = False
return
self.playback = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-re",
"-stream_loop",
"-1",
"-i",
self.source_path(),
"-c",
"copy",
"-f",
"rtsp",
self.stream_address("localhost"),
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while self.run:
if not is_alive(self.playback):
print("Playback failed, aborting stream.", file=sys.stderr)
break
time.sleep(0.1)
self.run = False
self.playback.kill()
self.playback.wait()
self.playback = None
if __name__ == "__main__":
tube = Tuuube("BaW_jenozKc")
tube.start_stream()
while True:
print(tube.is_streaming())
time.sleep(1)