Compare commits
1 Commits
main
...
9132-kafka
| Author | SHA1 | Date | |
|---|---|---|---|
| d84268847c |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1 +0,0 @@
|
|||||||
https://www2.cs.uic.edu/~i101/SoundFiles/
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
43
docker-compose.yml
Normal file
43
docker-compose.yml
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
version: "3"
|
||||||
|
|
||||||
|
services:
|
||||||
|
kafka:
|
||||||
|
container_name: kafka
|
||||||
|
hostname: kafka
|
||||||
|
image: bitnami/kafka:latest
|
||||||
|
ports:
|
||||||
|
# Flip the ports around, external gets default.
|
||||||
|
- "9092:9094"
|
||||||
|
- "9094:9092"
|
||||||
|
networks:
|
||||||
|
- kafka-internal
|
||||||
|
volumes:
|
||||||
|
- "kafka_data:/bitnami"
|
||||||
|
environment:
|
||||||
|
# https://hub.docker.com/r/bitnami/kafka/
|
||||||
|
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||||
|
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
|
||||||
|
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
|
||||||
|
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||||
|
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
|
||||||
|
|
||||||
|
kafka-ui:
|
||||||
|
container_name: kafka-ui
|
||||||
|
image: provectuslabs/kafka-ui:latest
|
||||||
|
ports:
|
||||||
|
- 8080:8080
|
||||||
|
networks:
|
||||||
|
- kafka-internal
|
||||||
|
environment:
|
||||||
|
# https://docs.kafka-ui.provectus.io/configuration/misc-configuration-properties
|
||||||
|
KAFKA_CLUSTERS_0_NAME: test
|
||||||
|
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
|
||||||
|
depends_on:
|
||||||
|
- kafka
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
kafka_data:
|
||||||
|
driver: local
|
||||||
|
|
||||||
|
networks:
|
||||||
|
kafka-internal:
|
||||||
55
fileradio.py
55
fileradio.py
@ -1,55 +0,0 @@
|
|||||||
from streamer import Streamer, is_alive
|
|
||||||
import subprocess
|
|
||||||
import time
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
class FileRadio(Streamer):
|
|
||||||
REST_PATH = "sample"
|
|
||||||
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__()
|
|
||||||
self.path = path
|
|
||||||
self.basename = os.path.basename(self.path)
|
|
||||||
self.name, self.ext = os.path.splitext(self.basename)
|
|
||||||
|
|
||||||
def _stream_thread(self):
|
|
||||||
self.playback = subprocess.Popen(
|
|
||||||
[
|
|
||||||
"/usr/bin/ffmpeg",
|
|
||||||
"-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag
|
|
||||||
"-stream_loop", # Loop the stream -
|
|
||||||
"-1", # ...indefinitely
|
|
||||||
"-i",
|
|
||||||
self.path,
|
|
||||||
"-c",
|
|
||||||
"copy",
|
|
||||||
"-f",
|
|
||||||
"rtsp",
|
|
||||||
self.stream_address("localhost"),
|
|
||||||
],
|
|
||||||
stdin=subprocess.PIPE,
|
|
||||||
stdout=subprocess.DEVNULL,
|
|
||||||
stderr=subprocess.DEVNULL,
|
|
||||||
)
|
|
||||||
|
|
||||||
while self.run:
|
|
||||||
if not is_alive(self.playback):
|
|
||||||
print("Playback failed, aborting stream.", file=sys.stderr)
|
|
||||||
break
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
self.run = False
|
|
||||||
|
|
||||||
self.playback.kill()
|
|
||||||
self.playback.wait()
|
|
||||||
self.playback = None
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
fr = FileRadio("./data/sampleaudio/taunt.mp3")
|
|
||||||
fr.start_stream()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
time.sleep(1)
|
|
||||||
219
filters.py
219
filters.py
@ -10,159 +10,156 @@
|
|||||||
import numpy, math, sys, time
|
import numpy, math, sys, time
|
||||||
from numpy import fft
|
from numpy import fft
|
||||||
|
|
||||||
|
|
||||||
def impulse(mask):
|
def impulse(mask):
|
||||||
"""Convert frequency domain mask to time-domain"""
|
''' Convert frequency domain mask to time-domain '''
|
||||||
# Negative side, a mirror of positive side
|
# Negative side, a mirror of positive side
|
||||||
negatives = mask[1:-1]
|
negatives = mask[1:-1]
|
||||||
negatives.reverse()
|
negatives.reverse()
|
||||||
mask = mask + negatives
|
mask = mask + negatives
|
||||||
fft_length = len(mask)
|
fft_length = len(mask)
|
||||||
|
|
||||||
# Convert FFT filter mask to FIR coefficients
|
# Convert FFT filter mask to FIR coefficients
|
||||||
impulse_response = fft.ifft(mask).real.tolist()
|
impulse_response = fft.ifft(mask).real.tolist()
|
||||||
|
|
||||||
# swap left and right sides
|
# swap left and right sides
|
||||||
left = impulse_response[: fft_length // 2]
|
left = impulse_response[:fft_length // 2]
|
||||||
right = impulse_response[fft_length // 2 :]
|
right = impulse_response[fft_length // 2:]
|
||||||
impulse_response = right + left
|
impulse_response = right + left
|
||||||
|
|
||||||
return impulse_response
|
return impulse_response
|
||||||
|
|
||||||
|
|
||||||
def lo_mask(sample_rate, tap_count, freq, dboct):
|
def lo_mask(sample_rate, tap_count, freq, dboct):
|
||||||
"""Create a freq domain mask for a lowpass filter"""
|
''' Create a freq domain mask for a lowpass filter '''
|
||||||
order = dboct / 6
|
order = dboct / 6
|
||||||
max_freq = sample_rate / 2.0
|
max_freq = sample_rate / 2.0
|
||||||
f2s = max_freq / (tap_count / 2.0)
|
f2s = max_freq / (tap_count / 2.0)
|
||||||
# Convert freq to filter step unit
|
# Convert freq to filter step unit
|
||||||
freq /= f2s
|
freq /= f2s
|
||||||
l = tap_count // 2
|
l = tap_count // 2
|
||||||
mask = []
|
mask = []
|
||||||
for f in range(0, l + 1):
|
for f in range(0, l+1):
|
||||||
H = 1.0 / (1 + (f / freq) ** (2 * order)) ** 0.5
|
H = 1.0 / ( 1 + (f / freq) ** (2 * order) ) ** 0.5
|
||||||
mask.append(H)
|
mask.append(H)
|
||||||
return mask
|
return mask
|
||||||
|
|
||||||
|
|
||||||
def hi_mask(sample_rate, tap_count, freq, dboct):
|
def hi_mask(sample_rate, tap_count, freq, dboct):
|
||||||
"""Create a freq domain mask for a highpass filter"""
|
''' Create a freq domain mask for a highpass filter '''
|
||||||
order = dboct / 6
|
order = dboct / 6
|
||||||
max_freq = sample_rate / 2.0
|
max_freq = sample_rate / 2.0
|
||||||
f2s = max_freq / (tap_count / 2.0)
|
f2s = max_freq / (tap_count / 2.0)
|
||||||
# Convert freq frequency to filter step unit
|
# Convert freq frequency to filter step unit
|
||||||
freq /= f2s
|
freq /= f2s
|
||||||
l = tap_count // 2
|
l = tap_count // 2
|
||||||
mask = []
|
mask = []
|
||||||
for f in range(0, l + 1):
|
for f in range(0, l+1):
|
||||||
H = 1.0 / (1 + (freq / (f + 0.0001)) ** (2 * order)) ** 0.5
|
H = 1.0 / ( 1 + (freq / (f + 0.0001)) ** (2 * order) ) ** 0.5
|
||||||
mask.append(H)
|
mask.append(H)
|
||||||
return mask
|
return mask
|
||||||
|
|
||||||
|
|
||||||
def combine_masks(mask1, mask2):
|
def combine_masks(mask1, mask2):
|
||||||
"""Combine two filter masks"""
|
''' Combine two filter masks '''
|
||||||
assert len(mask1) == len(mask2)
|
assert len(mask1) == len(mask2)
|
||||||
return [mask1[i] * mask2[i] for i in range(0, len(mask1))]
|
return [ mask1[i] * mask2[i] for i in range(0, len(mask1)) ]
|
||||||
|
|
||||||
|
|
||||||
def taps(sample_rate, freq, dboct, is_highpass):
|
def taps(sample_rate, freq, dboct, is_highpass):
|
||||||
cutoff_octaves = 60 / dboct
|
cutoff_octaves = 60 / dboct
|
||||||
|
|
||||||
if is_highpass:
|
if is_highpass:
|
||||||
cutoff = freq / 2**cutoff_octaves
|
cutoff = freq / 2 ** cutoff_octaves
|
||||||
else:
|
else:
|
||||||
cutoff = freq * 2**cutoff_octaves
|
cutoff = freq * 2 ** cutoff_octaves
|
||||||
cutoff = min(cutoff, sample_rate / 2)
|
cutoff = min(cutoff, sample_rate / 2)
|
||||||
|
|
||||||
transition_band = abs(freq - cutoff)
|
transition_band = abs(freq - cutoff)
|
||||||
Bt = transition_band / sample_rate
|
Bt = transition_band / sample_rate
|
||||||
taps = int(60 / (22 * Bt))
|
taps = int(60 / (22 * Bt))
|
||||||
# print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr)
|
# print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr)
|
||||||
return taps
|
return taps
|
||||||
|
|
||||||
|
|
||||||
class filter:
|
class filter:
|
||||||
def __init__(self, sample_rate, cutoff):
|
def __init__(self, sample_rate, cutoff):
|
||||||
raise "Abstract"
|
raise "Abstract"
|
||||||
|
|
||||||
def feed(self, original):
|
def feed(self, original):
|
||||||
unfiltered = numpy.concatenate((self.buf, original))
|
unfiltered = numpy.concatenate((self.buf, original))
|
||||||
self.buf = unfiltered[-len(self.coefs) :]
|
self.buf = unfiltered[-len(self.coefs):]
|
||||||
filtered = numpy.convolve(unfiltered, self.coefs, mode="valid")
|
filtered = numpy.convolve(unfiltered, self.coefs, mode='valid')
|
||||||
assert len(filtered) == len(original) + 1
|
assert len(filtered) == len(original) + 1
|
||||||
return filtered[1:]
|
return filtered[1:]
|
||||||
|
|
||||||
|
|
||||||
class low_pass(filter):
|
class low_pass(filter):
|
||||||
def __init__(self, sample_rate, f, dbo):
|
def __init__(self, sample_rate, f, dbo):
|
||||||
tap_count = taps(sample_rate, f, dbo, False)
|
tap_count = taps(sample_rate, f, dbo, False)
|
||||||
mask = lo_mask(sample_rate, tap_count, f, dbo)
|
mask = lo_mask(sample_rate, tap_count, f, dbo)
|
||||||
self.coefs = impulse(mask)
|
self.coefs = impulse(mask)
|
||||||
self.buf = [0 for n in self.coefs]
|
self.buf = [ 0 for n in self.coefs ]
|
||||||
|
|
||||||
|
|
||||||
class high_pass(filter):
|
class high_pass(filter):
|
||||||
def __init__(self, sample_rate, f, dbo):
|
def __init__(self, sample_rate, f, dbo):
|
||||||
tap_count = taps(sample_rate, f, dbo, True)
|
tap_count = taps(sample_rate, f, dbo, True)
|
||||||
mask = hi_mask(sample_rate, tap_count, f, dbo)
|
mask = hi_mask(sample_rate, tap_count, f, dbo)
|
||||||
self.coefs = impulse(mask)
|
self.coefs = impulse(mask)
|
||||||
self.buf = [0 for n in self.coefs]
|
self.buf = [ 0 for n in self.coefs ]
|
||||||
|
|
||||||
|
|
||||||
class band_pass(filter):
|
class band_pass(filter):
|
||||||
def __init__(self, sample_rate, lo, hi, dbo):
|
def __init__(self, sample_rate, lo, hi, dbo):
|
||||||
tap_count = max(
|
tap_count = max(taps(sample_rate, lo, dbo, True),
|
||||||
taps(sample_rate, lo, dbo, True), taps(sample_rate, hi, dbo, False)
|
taps(sample_rate, hi, dbo, False))
|
||||||
)
|
lomask = lo_mask(sample_rate, tap_count, hi, dbo)
|
||||||
lomask = lo_mask(sample_rate, tap_count, hi, dbo)
|
himask = hi_mask(sample_rate, tap_count, lo, dbo)
|
||||||
himask = hi_mask(sample_rate, tap_count, lo, dbo)
|
mask = combine_masks(lomask, himask)
|
||||||
mask = combine_masks(lomask, himask)
|
self.coefs = impulse(mask)
|
||||||
self.coefs = impulse(mask)
|
self.buf = [ 0 for n in self.coefs ]
|
||||||
self.buf = [0 for n in self.coefs]
|
|
||||||
|
|
||||||
|
|
||||||
class deemphasis(filter):
|
class deemphasis(filter):
|
||||||
def __init__(self, sample_rate, us, hi, final_dbo):
|
def __init__(self, sample_rate, us, hi, final_dbo):
|
||||||
# us = RC constant of the hypothetical deemphasis filter
|
# us = RC constant of the hypothetical deemphasis filter
|
||||||
us /= 1000000
|
us /= 1000000
|
||||||
# 0..lo is not deemphasized
|
# 0..lo is not deemphasized
|
||||||
lo = 1.0 / (2 * math.pi * us)
|
lo = 1.0 / (2 * math.pi * us)
|
||||||
# attenuation from lo to hi should be 10dB
|
# attenuation from lo to hi should be 10dB
|
||||||
octaves = math.log(hi / lo) / math.log(2)
|
octaves = math.log(hi / lo) / math.log(2)
|
||||||
# slope in dB/octave of deemphasis filter
|
# slope in dB/octave of deemphasis filter
|
||||||
dedbo = 10 / octaves
|
dedbo = 10 / octaves
|
||||||
|
|
||||||
tap_count = max(
|
tap_count = max(taps(sample_rate, lo, dedbo, False),
|
||||||
taps(sample_rate, lo, dedbo, False), taps(sample_rate, hi, final_dbo, False)
|
taps(sample_rate, hi, final_dbo, False))
|
||||||
)
|
|
||||||
|
|
||||||
# Calculate deemphasis filter
|
# Calculate deemphasis filter
|
||||||
demask = lo_mask(sample_rate, tap_count, lo, dedbo)
|
demask = lo_mask(sample_rate, tap_count, lo, dedbo)
|
||||||
# Calculate low-pass filter after deemphasis
|
# Calculate low-pass filter after deemphasis
|
||||||
fmask = lo_mask(sample_rate, tap_count, hi, final_dbo)
|
fmask = lo_mask(sample_rate, tap_count, hi, final_dbo)
|
||||||
|
|
||||||
mask = combine_masks(demask, fmask)
|
mask = combine_masks(demask, fmask)
|
||||||
self.coefs = impulse(mask)
|
self.coefs = impulse(mask)
|
||||||
self.buf = [0 for n in self.coefs]
|
self.buf = [ 0 for n in self.coefs ]
|
||||||
|
|
||||||
|
|
||||||
class decimator(filter):
|
class decimator(filter):
|
||||||
def __init__(self, factor):
|
def __init__(self, factor):
|
||||||
self.buf2 = []
|
self.buf2 = []
|
||||||
self.factor = int(factor)
|
self.factor = int(factor)
|
||||||
|
|
||||||
def feed(self, original):
|
def feed(self, original):
|
||||||
original = numpy.concatenate((self.buf2, original))
|
original = numpy.concatenate((self.buf2, original))
|
||||||
|
|
||||||
# Gets the last n-th sample of every n (n = factor)
|
# Gets the last n-th sample of every n (n = factor)
|
||||||
# If e.g. gets 12 samples, gets s[4] and s[9], and
|
# If e.g. gets 12 samples, gets s[4] and s[9], and
|
||||||
# stoves s[10:] to the next round
|
# stoves s[10:] to the next round
|
||||||
"""
|
'''
|
||||||
decimated = [ original[ self.factor * i + self.factor - 1 ] \
|
decimated = [ original[ self.factor * i + self.factor - 1 ] \
|
||||||
for i in range(0, len(original) // self.factor) ]
|
for i in range(0, len(original) // self.factor) ]
|
||||||
"""
|
'''
|
||||||
decimated = original[(self.factor - 1) :: self.factor]
|
decimated = original[(self.factor - 1)::self.factor]
|
||||||
self.buf2 = original[: -len(original) % self.factor]
|
self.buf2 = original[:-len(original) % self.factor]
|
||||||
|
|
||||||
return decimated
|
return decimated
|
||||||
|
|||||||
240
fm_demod.py
240
fm_demod.py
@ -19,24 +19,12 @@ from formats import FORMATS
|
|||||||
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument(
|
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
|
||||||
"-v", "--verbose", help="Print additional informational output", action="store_true"
|
parser.add_argument('-f', '--format', choices=list(FORMATS.keys()), help='Input sample format', required=True)
|
||||||
)
|
parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True)
|
||||||
parser.add_argument(
|
parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True)
|
||||||
"-f",
|
|
||||||
"--format",
|
|
||||||
choices=list(FORMATS.keys()),
|
|
||||||
help="Input sample format",
|
|
||||||
required=True,
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-s", "--sample-rate", metavar="rate", help="Source sample rate (Hz)", required=True
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-d", "--demod-rate", metavar="rate", help="Output sample rate (Hz)", required=True
|
|
||||||
)
|
|
||||||
# TODO JMT: Output to file
|
# TODO JMT: Output to file
|
||||||
# parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
|
#parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
INPUT_RATE = int(prefixed.Float(args.sample_rate))
|
INPUT_RATE = int(prefixed.Float(args.sample_rate))
|
||||||
@ -44,19 +32,16 @@ OUTPUT_RATE = int(prefixed.Float(args.demod_rate))
|
|||||||
DECIMATION = INPUT_RATE / OUTPUT_RATE
|
DECIMATION = INPUT_RATE / OUTPUT_RATE
|
||||||
|
|
||||||
if DECIMATION != math.floor(DECIMATION):
|
if DECIMATION != math.floor(DECIMATION):
|
||||||
print(
|
print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr)
|
||||||
f"The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}",
|
sys.exit(1)
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
FORMAT = FORMATS[args.format].numpy
|
FORMAT = FORMATS[args.format].numpy
|
||||||
DT = np.dtype(FORMAT)
|
DT = np.dtype(FORMAT)
|
||||||
BYTES_PER_SAMPLE = 2 * DT.itemsize
|
BYTES_PER_SAMPLE = 2 * DT.itemsize
|
||||||
|
|
||||||
MAX_DEVIATION = 200000.0 # Hz
|
MAX_DEVIATION = 200000.0 # Hz
|
||||||
FM_BANDWIDTH = 15000 # Hz
|
FM_BANDWIDTH = 15000 # Hz
|
||||||
STEREO_CARRIER = 38000 # Hz
|
STEREO_CARRIER = 38000 # Hz
|
||||||
DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2))
|
DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2))
|
||||||
|
|
||||||
pll = math.pi - random.random() * 2 * math.pi
|
pll = math.pi - random.random() * 2 * math.pi
|
||||||
@ -78,138 +63,137 @@ decimate2 = filters.decimator(DECIMATION)
|
|||||||
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
|
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
|
||||||
|
|
||||||
# Band-pass filter for stereo (L-R) modulated audio
|
# Band-pass filter for stereo (L-R) modulated audio
|
||||||
hi = filters.band_pass(
|
hi = filters.band_pass(INPUT_RATE,
|
||||||
INPUT_RATE, STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120
|
STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120)
|
||||||
)
|
|
||||||
|
|
||||||
# Filter to extract pilot signal
|
# Filter to extract pilot signal
|
||||||
pilot = filters.band_pass(
|
pilot = filters.band_pass(INPUT_RATE,
|
||||||
INPUT_RATE, STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120
|
STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120)
|
||||||
)
|
|
||||||
|
|
||||||
last_angle = 0.0
|
last_angle = 0.0
|
||||||
remaining_data = b""
|
remaining_data = b''
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Ingest 0.1s worth of data
|
# Ingest 0.1s worth of data
|
||||||
data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10)
|
data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10)
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
# TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1
|
# TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1
|
||||||
# data = remaining_data + data
|
#data = remaining_data + data
|
||||||
|
|
||||||
if len(data) < 2 * BYTES_PER_SAMPLE:
|
if len(data) < 2 * BYTES_PER_SAMPLE:
|
||||||
remaining_data = data
|
remaining_data = data
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Save one sample to next batch, and the odd byte if exists
|
# Save one sample to next batch, and the odd byte if exists
|
||||||
if len(data) % 2 == 1:
|
if len(data) % 2 == 1:
|
||||||
print("Odd byte, that's odd", file=sys.stderr)
|
print("Odd byte, that's odd", file=sys.stderr)
|
||||||
remaining_data = data[-3:]
|
remaining_data = data[-3:]
|
||||||
data = data[:-1]
|
data = data[:-1]
|
||||||
else:
|
else:
|
||||||
remaining_data = data[-2:]
|
remaining_data = data[-2:]
|
||||||
|
|
||||||
samples = len(data) // BYTES_PER_SAMPLE
|
samples = len(data) // BYTES_PER_SAMPLE
|
||||||
|
|
||||||
# Find angle (phase) of I/Q pairs
|
# Find angle (phase) of I/Q pairs
|
||||||
iqdata = np.frombuffer(data, dtype=FORMAT)
|
iqdata = np.frombuffer(data, dtype=FORMAT)
|
||||||
|
|
||||||
if args.verbose:
|
if args.verbose:
|
||||||
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
|
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
|
||||||
|
|
||||||
if np.issubdtype(FORMAT, np.integer):
|
if np.issubdtype(FORMAT, np.integer):
|
||||||
iinfo = np.iinfo(FORMAT)
|
iinfo = np.iinfo(FORMAT)
|
||||||
if np.issubdtype(FORMAT, np.unsignedinteger):
|
if np.issubdtype(FORMAT, np.unsignedinteger):
|
||||||
iqdata = iqdata - (iinfo.max / 2.0)
|
iqdata = iqdata - (iinfo.max / 2.0)
|
||||||
iqdata = iqdata / (iinfo.max / 2.0)
|
iqdata = iqdata / (iinfo.max / 2.0)
|
||||||
else:
|
else:
|
||||||
iqdata = iqdata / np.float64(iinfo.max)
|
iqdata = iqdata / np.float64(iinfo.max)
|
||||||
else:
|
else:
|
||||||
iqdata = iqdata.astype(np.float64)
|
iqdata = iqdata.astype(np.float64)
|
||||||
|
|
||||||
if args.verbose:
|
if args.verbose:
|
||||||
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
|
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
|
||||||
|
|
||||||
iqdata = iqdata.view(complex)
|
iqdata = iqdata.view(complex)
|
||||||
angles = np.angle(iqdata)
|
angles = np.angle(iqdata)
|
||||||
|
|
||||||
# Determine phase rotation between samples
|
# Determine phase rotation between samples
|
||||||
rotations = np.ediff1d(angles)
|
rotations = np.ediff1d(angles)
|
||||||
|
|
||||||
# Wrap rotations >= +/-180º
|
# Wrap rotations >= +/-180º
|
||||||
rotations = (rotations + np.pi) % (2 * np.pi) - np.pi
|
rotations = (rotations + np.pi) % (2 * np.pi) - np.pi
|
||||||
|
|
||||||
# Convert rotations to baseband signal
|
# Convert rotations to baseband signal
|
||||||
output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL)
|
output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL)
|
||||||
output_raw = np.clip(output_raw, -0.999, +0.999)
|
output_raw = np.clip(output_raw, -0.999, +0.999)
|
||||||
|
|
||||||
# At this point, output_raw contains two audio signals:
|
# At this point, output_raw contains two audio signals:
|
||||||
# L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC,
|
# L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC,
|
||||||
# carrier 38kHz
|
# carrier 38kHz
|
||||||
|
|
||||||
# Downsample and low-pass L+R (mono) signal
|
# Downsample and low-pass L+R (mono) signal
|
||||||
output_mono = lo.feed(output_raw)
|
output_mono = lo.feed(output_raw)
|
||||||
output_mono = decimate1.feed(output_mono)
|
output_mono = decimate1.feed(output_mono)
|
||||||
|
|
||||||
# Filter pilot tone
|
# Filter pilot tone
|
||||||
detected_pilot = pilot.feed(output_raw)
|
detected_pilot = pilot.feed(output_raw)
|
||||||
|
|
||||||
# Separate ultrasonic L-R signal by high-pass filtering
|
# Separate ultrasonic L-R signal by high-pass filtering
|
||||||
output_jstereo_mod = hi.feed(output_raw)
|
output_jstereo_mod = hi.feed(output_raw)
|
||||||
output_jstereo = []
|
output_jstereo = []
|
||||||
|
|
||||||
# Demodulate L-R, which is AM-SC with 53kHz carrier
|
# Demodulate L-R, which is AM-SC with 53kHz carrier
|
||||||
for n in range(0, len(output_jstereo_mod)):
|
for n in range(0, len(output_jstereo_mod)):
|
||||||
# Advance carrier
|
# Advance carrier
|
||||||
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
|
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
|
||||||
# Standard demodulation
|
# Standard demodulation
|
||||||
output_jstereo.append(math.cos(pll) * output_jstereo_mod[n])
|
output_jstereo.append(math.cos(pll) * output_jstereo_mod[n])
|
||||||
|
|
||||||
# Detect pilot zero-crossing
|
# Detect pilot zero-crossing
|
||||||
cur_pilot = detected_pilot[n]
|
cur_pilot = detected_pilot[n]
|
||||||
zero_crossed = (cur_pilot * last_pilot) <= 0
|
zero_crossed = (cur_pilot * last_pilot) <= 0
|
||||||
last_pilot = cur_pilot
|
last_pilot = cur_pilot
|
||||||
if not zero_crossed:
|
if not zero_crossed:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# When pilot is at 90º or 270º, carrier should be around 180º
|
# When pilot is at 90º or 270º, carrier should be around 180º
|
||||||
ideal = math.pi
|
ideal = math.pi
|
||||||
deviation = pll - ideal
|
deviation = pll - ideal
|
||||||
if deviation > math.pi:
|
if deviation > math.pi:
|
||||||
deviation -= tau
|
deviation -= tau
|
||||||
deviation_avg = 0.99 * deviation_avg + 0.01 * deviation
|
deviation_avg = 0.99 * deviation_avg + 0.01 * deviation
|
||||||
rotation = deviation_avg - last_deviation_avg
|
rotation = deviation_avg - last_deviation_avg
|
||||||
last_deviation_avg = deviation_avg
|
last_deviation_avg = deviation_avg
|
||||||
|
|
||||||
if abs(deviation_avg) > math.pi / 8:
|
if abs(deviation_avg) > math.pi / 8:
|
||||||
# big phase deviation, reset PLL
|
# big phase deviation, reset PLL
|
||||||
pll = ideal
|
pll = ideal
|
||||||
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
|
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
|
||||||
deviation_avg = 0.0
|
deviation_avg = 0.0
|
||||||
last_deviation_avg = 0.0
|
last_deviation_avg = 0.0
|
||||||
|
|
||||||
# Translate rotation to frequency deviation
|
# Translate rotation to frequency deviation
|
||||||
STEREO_CARRIER /= 1 + (rotation * 1.05) / tau
|
STEREO_CARRIER /= (1 + (rotation * 1.05) / tau)
|
||||||
|
|
||||||
# Downsample, Low-pass/deemphasis demodulated L-R
|
|
||||||
output_jstereo = lo_r.feed(output_jstereo)
|
|
||||||
output_jstereo = decimate2.feed(output_jstereo)
|
|
||||||
|
|
||||||
assert len(output_jstereo) == len(output_mono)
|
# Downsample, Low-pass/deemphasis demodulated L-R
|
||||||
|
output_jstereo = lo_r.feed(output_jstereo)
|
||||||
|
output_jstereo = decimate2.feed(output_jstereo)
|
||||||
|
|
||||||
# Scale to 16-bit and divide by 2 for channel sum
|
assert len(output_jstereo) == len(output_mono)
|
||||||
output_mono = np.multiply(output_mono, 32767 / 2.0)
|
|
||||||
output_jstereo = np.multiply(output_jstereo, 32767 / 2.0)
|
|
||||||
|
|
||||||
# Output stereo by adding or subtracting joint-stereo to mono
|
# Scale to 16-bit and divide by 2 for channel sum
|
||||||
output_left = output_mono + output_jstereo
|
output_mono = np.multiply(output_mono, 32767 / 2.0)
|
||||||
output_right = output_mono - output_jstereo
|
output_jstereo = np.multiply(output_jstereo, 32767 / 2.0)
|
||||||
|
|
||||||
# Interleave L and R samples using np trickery
|
# Output stereo by adding or subtracting joint-stereo to mono
|
||||||
output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype)
|
output_left = output_mono + output_jstereo
|
||||||
output[0::2] = output_left
|
output_right = output_mono - output_jstereo
|
||||||
output[1::2] = output_right
|
|
||||||
output = output.astype(int)
|
|
||||||
|
|
||||||
sys.stdout.buffer.write(struct.pack("<%dh" % len(output), *output))
|
# Interleave L and R samples using np trickery
|
||||||
|
output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype)
|
||||||
|
output[0::2] = output_left
|
||||||
|
output[1::2] = output_right
|
||||||
|
output = output.astype(int)
|
||||||
|
|
||||||
|
sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))
|
||||||
|
|||||||
12
formats.py
12
formats.py
@ -2,7 +2,6 @@ import numpy as np
|
|||||||
from SoapySDR import *
|
from SoapySDR import *
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class FormatSpec:
|
class FormatSpec:
|
||||||
name: str
|
name: str
|
||||||
@ -10,10 +9,9 @@ class FormatSpec:
|
|||||||
numpy: np.dtype
|
numpy: np.dtype
|
||||||
packing: str
|
packing: str
|
||||||
|
|
||||||
|
|
||||||
FORMATS = {
|
FORMATS = {
|
||||||
"CU8": FormatSpec("CU8", SOAPY_SDR_CU8, np.uint8, "=%dB"),
|
'CU8': FormatSpec('CU8', SOAPY_SDR_CU8, np.uint8, '=%dB'),
|
||||||
"CS8": FormatSpec("CS8", SOAPY_SDR_CS8, np.int8, "=%db"),
|
'CS8': FormatSpec('CS8', SOAPY_SDR_CS8, np.int8, '=%db'),
|
||||||
"CS16": FormatSpec("CS16", SOAPY_SDR_CS16, np.int16, "=%dh"),
|
'CS16': FormatSpec('CS16', SOAPY_SDR_CS16, np.int16, '=%dh'),
|
||||||
"CF32": FormatSpec("CF32", SOAPY_SDR_CF32, np.float32, "=%df"),
|
'CF32': FormatSpec('CF32', SOAPY_SDR_CF32, np.float32, '=%df'),
|
||||||
}
|
}
|
||||||
64
kafkalogging.py
Normal file
64
kafkalogging.py
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
from kafka import KafkaProducer
|
||||||
|
import subprocess
|
||||||
|
import os
|
||||||
|
|
||||||
|
kafka_servers = ['localhost:9092']
|
||||||
|
producers = {}
|
||||||
|
|
||||||
|
|
||||||
|
# Do anything that might take more than a few ms early, otherwise you hit overflow errors
|
||||||
|
def prep_io(process, process_name=None):
|
||||||
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
||||||
|
if pipe is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if os.get_blocking(pipe.fileno()):
|
||||||
|
os.set_blocking(pipe.fileno(), False)
|
||||||
|
|
||||||
|
if process.pid not in producers:
|
||||||
|
producers[process.pid] = {}
|
||||||
|
if pipe_name not in producers[process.pid]:
|
||||||
|
producers[process.pid][pipe_name] = KafkaProducer(
|
||||||
|
bootstrap_servers=kafka_servers,
|
||||||
|
max_block_ms=200
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Inspired by gist:
|
||||||
|
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
|
||||||
|
def check_io(process, process_name=None):
|
||||||
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
||||||
|
if pipe is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# The python official docs recommend using Popen.communicate() instead of reading
|
||||||
|
# from the pipes directly. Using Popen.communicate is a blocking call though,
|
||||||
|
# which would stall threads for processes producing no output.
|
||||||
|
output = pipe.readline()
|
||||||
|
if output:
|
||||||
|
producers[process.pid][pipe_name].send(topic_name, value=output)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
producers[process.pid][pipe_name].flush()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import time
|
||||||
|
|
||||||
|
ping = subprocess.Popen(
|
||||||
|
[
|
||||||
|
'/usr/bin/ping',
|
||||||
|
'1.1.1.1',
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE
|
||||||
|
)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
check_io(ping, 'ping')
|
||||||
|
time.sleep(0.1)
|
||||||
@ -38,10 +38,10 @@ udpMaxPayloadSize: 1472
|
|||||||
externalAuthenticationURL:
|
externalAuthenticationURL:
|
||||||
|
|
||||||
# Enable the HTTP API.
|
# Enable the HTTP API.
|
||||||
api: yes
|
api: no
|
||||||
|
|
||||||
# Enable Prometheus-compatible metrics.
|
# Enable Prometheus-compatible metrics.
|
||||||
metrics: yes
|
metrics: no
|
||||||
|
|
||||||
# Enable pprof-compatible endpoint to monitor performances.
|
# Enable pprof-compatible endpoint to monitor performances.
|
||||||
pprof: no
|
pprof: no
|
||||||
|
|||||||
204
microservice.py
204
microservice.py
@ -1,12 +1,7 @@
|
|||||||
#! /usr/bin/env python3
|
#! /usr/bin/env python3
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import requests
|
|
||||||
import SoapySDR as soapy
|
import SoapySDR as soapy
|
||||||
from radio import Radio
|
from radio import Radio
|
||||||
from tuuube import Tuuube
|
|
||||||
from fileradio import FileRadio
|
|
||||||
|
|
||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
from flasgger import Swagger
|
from flasgger import Swagger
|
||||||
@ -18,30 +13,8 @@ swag = Swagger(app)
|
|||||||
radios = {}
|
radios = {}
|
||||||
|
|
||||||
|
|
||||||
@app.route("/report")
|
@app.route('/radio/report')
|
||||||
def report():
|
def report():
|
||||||
"""Get streams report from the RTSP relay.
|
|
||||||
---
|
|
||||||
responses:
|
|
||||||
200:
|
|
||||||
description: JSON with streams report.
|
|
||||||
400:
|
|
||||||
description: JSON with error message.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
r = requests.get("http://localhost:9997/v1/paths/list")
|
|
||||||
j = r.json()
|
|
||||||
for item in j["items"]:
|
|
||||||
del j["items"][item]["conf"]
|
|
||||||
del j["items"][item]["confName"]
|
|
||||||
|
|
||||||
return jsonify(j)
|
|
||||||
except Exception as e:
|
|
||||||
return _error_message_json(e.message), 500
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/radio/report")
|
|
||||||
def radio_report():
|
|
||||||
"""List radio devices available to the system.
|
"""List radio devices available to the system.
|
||||||
---
|
---
|
||||||
responses:
|
responses:
|
||||||
@ -52,8 +25,7 @@ def radio_report():
|
|||||||
devices = [dict(device) for device in soapy.Device.enumerate()]
|
devices = [dict(device) for device in soapy.Device.enumerate()]
|
||||||
return jsonify(devices)
|
return jsonify(devices)
|
||||||
|
|
||||||
|
@app.route('/radio/<radio>/connect')
|
||||||
@app.route("/radio/<radio>/connect")
|
|
||||||
def connect(radio):
|
def connect(radio):
|
||||||
"""Connect to a radio device, by driver name or serial number.
|
"""Connect to a radio device, by driver name or serial number.
|
||||||
---
|
---
|
||||||
@ -65,29 +37,29 @@ def connect(radio):
|
|||||||
required: true
|
required: true
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: JSON with message successfully connected to a radio.
|
description: Successfully connected to a radio.
|
||||||
400:
|
400:
|
||||||
description: JSON with error message No radio device by that name is available.
|
description: No radio device by that name is available.
|
||||||
500:
|
500:
|
||||||
description: JSON with error message failed to connect to radio.
|
description: Failed to connect to radio.
|
||||||
"""
|
"""
|
||||||
if radio in radios:
|
if radio in radios:
|
||||||
return _error_message_json("Radio device already connected"), 400
|
return "Radio device already connected", 400
|
||||||
|
|
||||||
devices = [dict(device) for device in soapy.Device.enumerate()]
|
devices = [dict(device) for device in soapy.Device.enumerate()]
|
||||||
for device in devices:
|
for device in devices:
|
||||||
if radio in device.values():
|
if radio in device.values():
|
||||||
try:
|
try:
|
||||||
radios[radio] = Radio(radio, device)
|
radios[radio] = Radio(radio, device)
|
||||||
return jsonify("message", "successfully connected radio"), 200
|
return "", 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
radios.pop(radio)
|
radios.pop(radio)
|
||||||
return _error_message_json(e.message), 500
|
return str(e), 500
|
||||||
|
|
||||||
|
|
||||||
return "Radio device not found", 400
|
return "Radio device not found", 400
|
||||||
|
|
||||||
|
@app.route('/radio/<radio>/disconnect')
|
||||||
@app.route("/radio/<radio>/disconnect")
|
|
||||||
def disconnect(radio):
|
def disconnect(radio):
|
||||||
"""Disconnect from a radio device.
|
"""Disconnect from a radio device.
|
||||||
---
|
---
|
||||||
@ -107,12 +79,11 @@ def disconnect(radio):
|
|||||||
"""
|
"""
|
||||||
if radio in radios:
|
if radio in radios:
|
||||||
radios.pop(radio)
|
radios.pop(radio)
|
||||||
return jsonify("message", "succesfully disconnected radio"), 200
|
return "", 200
|
||||||
else:
|
else:
|
||||||
return _error_message_json("Radio not connected"), 400
|
return "Radio not connected", 400
|
||||||
|
|
||||||
|
@app.route('/radio/<radio>/configure/<frequency>')
|
||||||
@app.route("/radio/<radio>/configure/<frequency>")
|
|
||||||
def configure(radio, frequency):
|
def configure(radio, frequency):
|
||||||
"""Tune the radio to a frequency.
|
"""Tune the radio to a frequency.
|
||||||
You must connect to the radio before attempting to configure it.
|
You must connect to the radio before attempting to configure it.
|
||||||
@ -131,21 +102,20 @@ def configure(radio, frequency):
|
|||||||
required: true
|
required: true
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: JSON with radio configuration.
|
description: JSON
|
||||||
400:
|
400:
|
||||||
description: The specified radio is not connected.
|
description: The specified radio is not connected.
|
||||||
"""
|
"""
|
||||||
if radio in radios:
|
if radio in radios:
|
||||||
return jsonify(radios[radio].configure(frequency))
|
return jsonify(radios[radio].configure(frequency))
|
||||||
else:
|
else:
|
||||||
return _error_message_json("Radio not connected"), 400
|
return "Radio not connected", 400
|
||||||
|
|
||||||
|
@app.route('/radio/<radio>/start')
|
||||||
@app.route("/radio/<radio>/start")
|
|
||||||
def start_stream(radio):
|
def start_stream(radio):
|
||||||
"""Start the radio stream.
|
"""Start the radio stream.
|
||||||
Once the stream has been started, connect to the stream at:
|
Once the stream has been started, connect to the stream at:
|
||||||
rtsp://[host]:8554/radio/[radio]
|
rtsp://[host]:8554/radio/[radio]/stream
|
||||||
---
|
---
|
||||||
parameters:
|
parameters:
|
||||||
- name: radio
|
- name: radio
|
||||||
@ -153,20 +123,14 @@ def start_stream(radio):
|
|||||||
in: path
|
in: path
|
||||||
type: string
|
type: string
|
||||||
required: true
|
required: true
|
||||||
responses:
|
|
||||||
200:
|
|
||||||
description: JSON with message successful start of radio stream.
|
|
||||||
400:
|
|
||||||
description: JSON with error message.
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
radios[radio].start_stream()
|
radios[radio].start_stream()
|
||||||
return jsonify("message", "successfully started radio stream"), 200
|
return "", 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return _error_message_json(e.message), 400
|
return str(e), 400
|
||||||
|
|
||||||
|
@app.route('/radio/<radio>/end')
|
||||||
@app.route("/radio/<radio>/end")
|
|
||||||
def end_stream(radio):
|
def end_stream(radio):
|
||||||
"""Terminate the radio stream.
|
"""Terminate the radio stream.
|
||||||
---
|
---
|
||||||
@ -176,21 +140,14 @@ def end_stream(radio):
|
|||||||
in: path
|
in: path
|
||||||
type: string
|
type: string
|
||||||
required: true
|
required: true
|
||||||
responses:
|
|
||||||
200:
|
|
||||||
description: JSON with message successful termination of radio stream.
|
|
||||||
400:
|
|
||||||
description: JSON with error message.
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
radios[radio].end_stream()
|
radios[radio].end_stream()
|
||||||
return jsonify("message", "successfully ended radio stream"), 200
|
return "", 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = {"error_message": e.message}
|
return str(e), 400
|
||||||
return _error_message_json(e.message), 400
|
|
||||||
|
|
||||||
|
@app.route('/radio/<radio>/info')
|
||||||
@app.route("/radio/<radio>/info")
|
|
||||||
def radio_info(radio):
|
def radio_info(radio):
|
||||||
"""Get information about a radio.
|
"""Get information about a radio.
|
||||||
---
|
---
|
||||||
@ -202,117 +159,40 @@ def radio_info(radio):
|
|||||||
required: true
|
required: true
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: JSON with radio information.
|
description: JSON
|
||||||
400:
|
400:
|
||||||
description: JSON with error message.
|
description: The specified radio is not connected.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
return jsonify(radios[radio].get_info())
|
return jsonify(radios[radio].get_info())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return _error_message_json(e.message), 400
|
return str(e), 400
|
||||||
|
|
||||||
|
|
||||||
tubes = {}
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
|
||||||
@app.route("/tuuube/<id>/start")
|
|
||||||
def start_tuuube_stream(id):
|
|
||||||
"""Start streaming from a youtube source.
|
|
||||||
Once the stream has been started, connect to the stream at:
|
|
||||||
rtsp://[host]:8554/tuuube/[id]
|
|
||||||
---
|
|
||||||
parameters:
|
|
||||||
- name: id
|
|
||||||
description:
|
|
||||||
The youtube video ID. That is the part following the `watch?v=` in the URL. For example, `dQw4w9WgXcQ`.\n
|
|
||||||
Other good options are - \n
|
|
||||||
`BaW_jenozKc`, yt_dlp package test video.\n
|
|
||||||
`b2je8uBlgFM`, stereo audio test.\n
|
|
||||||
`LDU_Txk06tM`, crab rave, a commonly used audio fidelity test.\n
|
|
||||||
`sPT_epMLkwQ`, Kilsyth CFA major factory fire dispatch radio traffic.
|
|
||||||
in: path
|
|
||||||
type: string
|
|
||||||
required: true
|
|
||||||
responses:
|
|
||||||
200:
|
|
||||||
description: JSON with message successful start of youtube stream.
|
|
||||||
400:
|
|
||||||
description: JSON with error message.
|
|
||||||
"""
|
|
||||||
if id not in tubes:
|
|
||||||
tubes[id] = Tuuube(id)
|
|
||||||
|
|
||||||
try:
|
|
||||||
tubes[id].start_stream()
|
|
||||||
return jsonify("message", "successfully started youtube stream"), 200
|
|
||||||
except Exception as e:
|
|
||||||
return _error_message_json(e.message), 400
|
|
||||||
|
|
||||||
|
|
||||||
@app.route("/tuuube/<id>/end")
|
|
||||||
def end_tuuube_stream(id):
|
|
||||||
"""Terminate the youtube stream.
|
|
||||||
---
|
|
||||||
parameters:
|
|
||||||
- name: id
|
|
||||||
description: The youtube video ID.
|
|
||||||
in: path
|
|
||||||
type: string
|
|
||||||
required: true
|
|
||||||
responses:
|
|
||||||
200:
|
|
||||||
description: JSON with message successful termination of youtube stream.
|
|
||||||
400:
|
|
||||||
description: JSON with error message.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
tubes[id].end_stream()
|
|
||||||
return jsonify("message", "succesfully ended youtube stream"), 200
|
|
||||||
except Exception as e:
|
|
||||||
return _error_message_json(e.message), 400
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
|
||||||
Helper function to return a JSON error message
|
|
||||||
parameters: error_message - the error message to return
|
|
||||||
returns: a JSON object with the error message
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
def _error_message_json(error_message: str):
|
|
||||||
error_message = {"error_message": error_message}
|
|
||||||
return jsonify(error_message)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
rtsp_relay = subprocess.Popen(
|
rtsp_relay = subprocess.Popen(
|
||||||
["./dependencies/mediamtx/mediamtx", "./mediamtx.yml"],
|
[
|
||||||
|
'./dependencies/mediamtx/mediamtx',
|
||||||
|
'./mediamtx.yml'
|
||||||
|
],
|
||||||
stdout=subprocess.DEVNULL,
|
stdout=subprocess.DEVNULL,
|
||||||
stderr=subprocess.DEVNULL,
|
stderr=subprocess.DEVNULL
|
||||||
)
|
)
|
||||||
|
|
||||||
for path, _, files in os.walk("./data/sampleaudio"):
|
app.run(
|
||||||
for file in files:
|
host='0.0.0.0',
|
||||||
name, ext = os.path.splitext(file)
|
threaded=True,
|
||||||
if ext == ".mp3":
|
debug=False
|
||||||
tubes[name] = FileRadio(f"{path}/{file}")
|
)
|
||||||
tubes[name].start_stream()
|
|
||||||
|
|
||||||
app.run(host="0.0.0.0", threaded=True, debug=False)
|
print('Stopping any currently streaming radios...')
|
||||||
|
|
||||||
print("Stopping any currently streaming radios...")
|
|
||||||
for radio in radios:
|
for radio in radios:
|
||||||
if radios[radio].is_streaming():
|
if radios[radio].is_streaming():
|
||||||
radios[radio].end_stream()
|
radios[radio].end_stream()
|
||||||
radios = None
|
radios = None
|
||||||
|
|
||||||
for tube in tubes:
|
print('Killing RTSP relay...')
|
||||||
if tubes[tube].is_streaming():
|
|
||||||
tubes[tube].end_stream()
|
|
||||||
tubes = None
|
|
||||||
|
|
||||||
print("Killing RTSP relay...")
|
|
||||||
rtsp_relay.kill()
|
rtsp_relay.kill()
|
||||||
rtsp_relay.wait() # Necessary?
|
rtsp_relay.wait() # Necessary?
|
||||||
|
|||||||
161
radio.py
161
radio.py
@ -7,19 +7,20 @@ import subprocess
|
|||||||
import struct
|
import struct
|
||||||
from soapyhelpers import *
|
from soapyhelpers import *
|
||||||
from samplerates import *
|
from samplerates import *
|
||||||
from streamer import Streamer, is_alive
|
from kafkalogging import *
|
||||||
|
|
||||||
|
|
||||||
class Radio(Streamer):
|
class Radio:
|
||||||
REST_PATH = "radio"
|
FORMAT = 'CS16'
|
||||||
FORMAT = "CS16"
|
|
||||||
SAMPLES = 8192
|
SAMPLES = 8192
|
||||||
|
PORT = 8554
|
||||||
|
|
||||||
def __init__(self, name, device_info):
|
def __init__(self, name, device_info):
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.name = name
|
self.name = name
|
||||||
self.device_info = device_info
|
self.device_info = device_info
|
||||||
|
self.run = False
|
||||||
|
self.thread = None
|
||||||
|
|
||||||
self.device = soapy.Device(device_info)
|
self.device = soapy.Device(device_info)
|
||||||
if self.device is None:
|
if self.device is None:
|
||||||
raise RuntimeError("Failed to connect to radio device")
|
raise RuntimeError("Failed to connect to radio device")
|
||||||
@ -33,7 +34,7 @@ class Radio(Streamer):
|
|||||||
frequency = int(prefixed.Float(frequency))
|
frequency = int(prefixed.Float(frequency))
|
||||||
bandwidth = 200000
|
bandwidth = 200000
|
||||||
|
|
||||||
sample_rates = preferred_sample_rates(self.capabilities["rx"]["sample-rates"])
|
sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates'])
|
||||||
if len(sample_rates) == 0:
|
if len(sample_rates) == 0:
|
||||||
raise RuntimeError("No suitable sample rates are available")
|
raise RuntimeError("No suitable sample rates are available")
|
||||||
self.sample_rate, self.output_rate = sample_rates[0]
|
self.sample_rate, self.output_rate = sample_rates[0]
|
||||||
@ -46,52 +47,72 @@ class Radio(Streamer):
|
|||||||
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
|
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"frequency": self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
|
'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
|
||||||
"sample-rate": self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
|
'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
|
||||||
"bandwidth": self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
|
'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
|
||||||
"gain-mode": "auto"
|
'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual',
|
||||||
if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0)
|
|
||||||
else "manual",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_info(self):
|
def get_info(self):
|
||||||
return {
|
return {
|
||||||
"name": self.name,
|
'name': self.name,
|
||||||
"device": self.device_info,
|
'device': self.device_info,
|
||||||
"capabilities": self.capabilities,
|
'capabilities': self.capabilities,
|
||||||
"stream-path": self._stream_path(),
|
'stream-path': self._stream_path(),
|
||||||
"streaming": self.is_streaming(),
|
'streaming': self.is_streaming(),
|
||||||
}
|
}
|
||||||
|
|
||||||
def _get_capabilities(self):
|
def _get_capabilities(self):
|
||||||
def get_direction_capabilities(direction):
|
def get_direction_capabilities(direction):
|
||||||
return {
|
return {
|
||||||
"antennas": self.device.listAntennas(direction, 0),
|
'antennas': self.device.listAntennas(direction, 0),
|
||||||
"gains": self.device.listGains(direction, 0),
|
'gains': self.device.listGains(direction, 0),
|
||||||
"frequencies": self.device.listFrequencies(direction, 0),
|
'frequencies': self.device.listFrequencies(direction, 0),
|
||||||
"sample-rates": self.device.listSampleRates(direction, 0),
|
'sample-rates': self.device.listSampleRates(direction, 0),
|
||||||
"bandwidths": self.device.listBandwidths(direction, 0),
|
'bandwidths': self.device.listBandwidths(direction, 0),
|
||||||
"sensors": self.device.listSensors(direction, 0),
|
'sensors': self.device.listSensors(direction, 0),
|
||||||
"formats": self.device.getStreamFormats(direction, 0),
|
'formats': self.device.getStreamFormats(direction, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"rx": get_direction_capabilities(soapy.SOAPY_SDR_RX),
|
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
|
||||||
"tx": get_direction_capabilities(soapy.SOAPY_SDR_TX),
|
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
|
||||||
"clock-sources": self.device.listClockSources(),
|
'clock-sources': self.device.listClockSources(),
|
||||||
"time-sources": self.device.listTimeSources(),
|
'time-sources': self.device.listTimeSources(),
|
||||||
"register-interfaces": self.device.listRegisterInterfaces(),
|
'register-interfaces': self.device.listRegisterInterfaces(),
|
||||||
"gpios": self.device.listGPIOBanks(),
|
'gpios': self.device.listGPIOBanks(),
|
||||||
"uarts": self.device.listUARTs(),
|
'uarts': self.device.listUARTs(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def is_streaming(self):
|
||||||
|
return True if (self.thread and self.thread.is_alive()) else False
|
||||||
|
|
||||||
|
def start_stream(self):
|
||||||
|
if self.is_streaming():
|
||||||
|
raise RuntimeError('Stream thread is already running')
|
||||||
|
|
||||||
|
self.run = True
|
||||||
|
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def end_stream(self):
|
||||||
|
if self.thread is None:
|
||||||
|
raise RuntimeError('No stream thread to terminate')
|
||||||
|
|
||||||
|
self.run = False
|
||||||
|
self.thread.join()
|
||||||
|
self.thread = None
|
||||||
|
|
||||||
def _stream_thread(self):
|
def _stream_thread(self):
|
||||||
self._init_stream()
|
self._init_stream()
|
||||||
|
|
||||||
|
def is_alive(subprocess):
|
||||||
|
return (subprocess.poll() is None)
|
||||||
|
|
||||||
while self.run:
|
while self.run:
|
||||||
# Check that the child processes are still running
|
# Check that the child processes are still running
|
||||||
if (not is_alive(self.demod)) or (not is_alive(self.playback)):
|
if (not is_alive(self.demod)) or (not is_alive(self.playback)):
|
||||||
print("DSP chain failed, aborting stream.", file=sys.stderr)
|
print('DSP chain failed, aborting stream.', file=sys.stderr)
|
||||||
break
|
break
|
||||||
|
|
||||||
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
|
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
|
||||||
@ -101,63 +122,46 @@ class Radio(Streamer):
|
|||||||
elif result.ret < 0:
|
elif result.ret < 0:
|
||||||
error = SoapyError(result.ret)
|
error = SoapyError(result.ret)
|
||||||
if error is not SoapyError.Timeout:
|
if error is not SoapyError.Timeout:
|
||||||
print(
|
print("Stream read failed, aborting stream:", error, file=sys.stderr)
|
||||||
"Stream read failed, aborting stream:", error, file=sys.stderr
|
|
||||||
)
|
|
||||||
break
|
break
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
read_size = int(result.ret * 2)
|
read_size = int(result.ret * 2)
|
||||||
self.demod.stdin.write(
|
self.demod.stdin.write(
|
||||||
struct.pack(
|
struct.pack(FORMATS[Radio.FORMAT].packing % read_size,
|
||||||
FORMATS[Radio.FORMAT].packing % read_size,
|
*self.buffer[:read_size])
|
||||||
*self.buffer[:read_size],
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# handle subprocess logs
|
||||||
|
check_io(self.demod, f"{self.name}-demod")
|
||||||
|
check_io(self.playback, f"{self.name}-playback")
|
||||||
|
|
||||||
|
|
||||||
self._cleanup_stream()
|
self._cleanup_stream()
|
||||||
|
|
||||||
def _init_stream(self):
|
def _init_stream(self):
|
||||||
self.playback = subprocess.Popen(
|
self.playback = subprocess.Popen(
|
||||||
[
|
[
|
||||||
"/usr/bin/ffmpeg",
|
'/usr/bin/ffmpeg', '-f', 's16le', '-ar', str(self.output_rate), '-ac', '2', '-i', '-',
|
||||||
"-f",
|
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
|
||||||
"s16le",
|
|
||||||
"-ar",
|
|
||||||
str(self.output_rate),
|
|
||||||
"-ac",
|
|
||||||
"2",
|
|
||||||
"-i",
|
|
||||||
"-",
|
|
||||||
"-f",
|
|
||||||
"rtsp",
|
|
||||||
self.stream_address(),
|
|
||||||
],
|
],
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
stdout=subprocess.DEVNULL,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.DEVNULL,
|
stderr=subprocess.PIPE
|
||||||
)
|
)
|
||||||
|
|
||||||
self.demod = subprocess.Popen(
|
self.demod = subprocess.Popen(
|
||||||
[
|
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
|
||||||
"/usr/bin/python3",
|
|
||||||
"fm_demod.py",
|
|
||||||
"-f",
|
|
||||||
"CS16",
|
|
||||||
"-s",
|
|
||||||
str(self.sample_rate),
|
|
||||||
"-d",
|
|
||||||
str(self.output_rate),
|
|
||||||
],
|
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
stdout=self.playback.stdin,
|
stdout=self.playback.stdin,
|
||||||
stderr=subprocess.DEVNULL,
|
stderr=subprocess.PIPE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
prep_io(self.demod, f"{self.name}-demod")
|
||||||
|
prep_io(self.playback, f"{self.name}-playback")
|
||||||
|
|
||||||
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
|
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
|
||||||
self.stream = self.device.setupStream(
|
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
|
||||||
soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy
|
|
||||||
)
|
|
||||||
result = self.device.activateStream(self.stream)
|
result = self.device.activateStream(self.stream)
|
||||||
|
|
||||||
if result != 0:
|
if result != 0:
|
||||||
@ -181,26 +185,29 @@ class Radio(Streamer):
|
|||||||
self.playback.wait()
|
self.playback.wait()
|
||||||
self.playback = None
|
self.playback = None
|
||||||
|
|
||||||
|
def _stream_path(self):
|
||||||
|
return f":{Radio.PORT}/radio/{self.name}"
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Quick and dirty test of the Radio class.
|
Quick and dirty test of the Radio class.
|
||||||
"""
|
"""
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
import time
|
import time
|
||||||
|
|
||||||
sdr = Radio("demo", {"driver": "sdrplay"})
|
sdr = Radio('demo', {'driver': 'sdrplay'})
|
||||||
|
|
||||||
print("Configuring...")
|
print('Configuring...')
|
||||||
sdr.configure("105.5M")
|
sdr.configure('105.5M')
|
||||||
print("Configured.")
|
print('Configured.')
|
||||||
|
|
||||||
print("Starting stream...")
|
print('Starting stream...')
|
||||||
sdr.start_stream()
|
sdr.start_stream()
|
||||||
print("Stream started.")
|
print('Stream started.')
|
||||||
|
|
||||||
# Let the stream play for a while
|
# Let the stream play for a while
|
||||||
time.sleep(15)
|
time.sleep(15)
|
||||||
|
|
||||||
print("Ending stream...")
|
print('Ending stream...')
|
||||||
sdr.end_stream()
|
sdr.end_stream()
|
||||||
print("Stream ended.")
|
print('Stream ended.')
|
||||||
@ -3,5 +3,3 @@ flasgger==0.9.5
|
|||||||
Flask==2.2.3
|
Flask==2.2.3
|
||||||
numpy==1.17.4
|
numpy==1.17.4
|
||||||
prefixed==0.7.0
|
prefixed==0.7.0
|
||||||
PyYAML==6.0
|
|
||||||
requests==2.22.0
|
|
||||||
|
|||||||
@ -6,7 +6,7 @@ Sample rates stolen from:
|
|||||||
https://en.wikipedia.org/wiki/Sampling_(signal_processing)
|
https://en.wikipedia.org/wiki/Sampling_(signal_processing)
|
||||||
"""
|
"""
|
||||||
supported_ouput_rates = [
|
supported_ouput_rates = [
|
||||||
8000, # Telephone, P25 audio (sufficient for speech, some consonants unintelligble)
|
8000, # Telephone, P25 audio (sufficient for speech, some consonants unintelligble)
|
||||||
16000, # Modern telephone/VoIP, good quality speech
|
16000, # Modern telephone/VoIP, good quality speech
|
||||||
32000, # FM radio
|
32000, # FM radio
|
||||||
44100, # CD audio quality
|
44100, # CD audio quality
|
||||||
@ -14,10 +14,9 @@ supported_ouput_rates = [
|
|||||||
50000, # Uncommon but supported
|
50000, # Uncommon but supported
|
||||||
88200,
|
88200,
|
||||||
96000, # DVD/Blu-ray audio
|
96000, # DVD/Blu-ray audio
|
||||||
192000, # Too much.
|
192000, # Too much.
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def score(pair, target_output=32000, target_ratio=10):
|
def score(pair, target_output=32000, target_ratio=10):
|
||||||
"""
|
"""
|
||||||
Heuristic for scoring input & output sample rates. The criteria are:
|
Heuristic for scoring input & output sample rates. The criteria are:
|
||||||
@ -37,38 +36,29 @@ def score(pair, target_output=32000, target_ratio=10):
|
|||||||
|
|
||||||
ratio = pair[0] // pair[1]
|
ratio = pair[0] // pair[1]
|
||||||
|
|
||||||
return (
|
return abs(pair[1] - target_output)/2500 \
|
||||||
abs(pair[1] - target_output) / 2500
|
+ max(0, target_output - pair[1])/2500 \
|
||||||
+ max(0, target_output - pair[1]) / 2500
|
+ abs(ratio - target_ratio)**0.8 \
|
||||||
+ abs(ratio - target_ratio) ** 0.8
|
+ max(0, target_ratio - ratio)**2
|
||||||
+ max(0, target_ratio - ratio) ** 2
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def flatten(l):
|
def flatten(l):
|
||||||
return [item for sublist in l for item in sublist]
|
return [item for sublist in l for item in sublist]
|
||||||
|
|
||||||
|
|
||||||
def flatten_dict(d):
|
def flatten_dict(d):
|
||||||
return [(key, value) for key, rates in d.items() for value in rates]
|
return [(key,value) for key,rates in d.items() for value in rates]
|
||||||
|
|
||||||
|
|
||||||
def get_pairs(input_rate):
|
def get_pairs(input_rate):
|
||||||
return [
|
return [
|
||||||
(input_rate, rate) for rate in supported_ouput_rates if (input_rate % rate == 0)
|
(input_rate, rate)
|
||||||
|
for rate in supported_ouput_rates
|
||||||
|
if (input_rate % rate == 0)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def supported_sample_rates(supported_input_rates):
|
def supported_sample_rates(supported_input_rates):
|
||||||
return {
|
return {
|
||||||
in_rate: [
|
in_rate: [out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0]
|
||||||
out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0
|
|
||||||
]
|
|
||||||
for in_rate in supported_input_rates
|
for in_rate in supported_input_rates
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def preferred_sample_rates(supported_input_rates):
|
def preferred_sample_rates(supported_input_rates):
|
||||||
return sorted(
|
return sorted(flatten_dict(supported_sample_rates(supported_input_rates)), key=score)
|
||||||
flatten_dict(supported_sample_rates(supported_input_rates)), key=score
|
|
||||||
)
|
|
||||||
|
|||||||
34
scripts/connect.sh
Executable file
34
scripts/connect.sh
Executable file
@ -0,0 +1,34 @@
|
|||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
|
||||||
|
server="localhost:5000"
|
||||||
|
radio="sdrplay"
|
||||||
|
frequency="105.5M"
|
||||||
|
|
||||||
|
trap deinit INT
|
||||||
|
|
||||||
|
deinit() {
|
||||||
|
curl $server/radio/$radio/end
|
||||||
|
curl $server/radio/$radio/disconnect
|
||||||
|
#kill $pid
|
||||||
|
killall ffplay
|
||||||
|
echo -n ""
|
||||||
|
}
|
||||||
|
|
||||||
|
init() {
|
||||||
|
curl $server/radio/$radio/connect
|
||||||
|
curl $server/radio/$radio/configure/$frequency
|
||||||
|
curl $server/radio/$radio/start
|
||||||
|
}
|
||||||
|
|
||||||
|
init
|
||||||
|
echo "Waiting for stream to become active..."
|
||||||
|
sleep 3
|
||||||
|
ffplay -nodisp -hide_banner rtsp://localhost:8554/radio/sdrplay &
|
||||||
|
pid=$!
|
||||||
|
|
||||||
|
while true; do
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
@ -1,8 +0,0 @@
|
|||||||
#! /bin/bash
|
|
||||||
|
|
||||||
for arg in "$@"; do
|
|
||||||
path="${arg%/*}"
|
|
||||||
file="${arg##*/}"
|
|
||||||
filename="${file%%.*}"
|
|
||||||
ffmpeg -i $path/$file -vn -ar 44100 -ac 2 -b:a 100k $path/$filename.mp3
|
|
||||||
done
|
|
||||||
4
setup.sh
4
setup.sh
@ -8,10 +8,6 @@ setup() {
|
|||||||
sudo xargs apt-get install -y < requirements.apt
|
sudo xargs apt-get install -y < requirements.apt
|
||||||
sudo pip install -r requirements.pip
|
sudo pip install -r requirements.pip
|
||||||
|
|
||||||
# Requires yt-dlp, but a very recent version (as youtube broke their API for the pip version)
|
|
||||||
# https://stackoverflow.com/a/75504772
|
|
||||||
python3 -m pip install --force-reinstall https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz
|
|
||||||
|
|
||||||
# Install device driver
|
# Install device driver
|
||||||
./scripts/SDRplay_RSP_API-Linux-3.07.1.run
|
./scripts/SDRplay_RSP_API-Linux-3.07.1.run
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import SoapySDR as soapy
|
import SoapySDR as soapy
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
|
|
||||||
|
|
||||||
class SoapyError(IntEnum):
|
class SoapyError(IntEnum):
|
||||||
Timeout = soapy.SOAPY_SDR_TIMEOUT
|
Timeout = soapy.SOAPY_SDR_TIMEOUT
|
||||||
StreamError = soapy.SOAPY_SDR_STREAM_ERROR
|
StreamError = soapy.SOAPY_SDR_STREAM_ERROR
|
||||||
@ -11,7 +10,6 @@ class SoapyError(IntEnum):
|
|||||||
TimeError = soapy.SOAPY_SDR_TIME_ERROR
|
TimeError = soapy.SOAPY_SDR_TIME_ERROR
|
||||||
Underflow = soapy.SOAPY_SDR_UNDERFLOW
|
Underflow = soapy.SOAPY_SDR_UNDERFLOW
|
||||||
|
|
||||||
|
|
||||||
class SoapyFlag(IntEnum):
|
class SoapyFlag(IntEnum):
|
||||||
EndBurst = soapy.SOAPY_SDR_END_BURST
|
EndBurst = soapy.SOAPY_SDR_END_BURST
|
||||||
HasTime = soapy.SOAPY_SDR_HAS_TIME
|
HasTime = soapy.SOAPY_SDR_HAS_TIME
|
||||||
|
|||||||
51
streamer.py
51
streamer.py
@ -1,51 +0,0 @@
|
|||||||
from threading import Thread
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
|
|
||||||
with open("mediamtx.yml", "r") as config_file:
|
|
||||||
MEDIASERVER_CONFIG = yaml.safe_load(config_file)
|
|
||||||
|
|
||||||
|
|
||||||
def is_alive(subprocess):
|
|
||||||
return True if (subprocess and subprocess.poll() is None) else False
|
|
||||||
|
|
||||||
|
|
||||||
class Streamer:
|
|
||||||
PROTOCOL = "rtsp"
|
|
||||||
REST_PATH = "stream"
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.run = False
|
|
||||||
self.thread = None
|
|
||||||
self.name = None
|
|
||||||
|
|
||||||
def stream_path(self):
|
|
||||||
return f"{MEDIASERVER_CONFIG['rtspAddress']}/{type(self).REST_PATH}/{self.name}"
|
|
||||||
|
|
||||||
def stream_address(self, host):
|
|
||||||
return f"{Streamer.PROTOCOL}://{host}{self.stream_path()}"
|
|
||||||
|
|
||||||
def is_streaming(self):
|
|
||||||
return True if (self.thread and self.thread.is_alive()) else False
|
|
||||||
|
|
||||||
def start_stream(self):
|
|
||||||
if self.is_streaming():
|
|
||||||
raise RuntimeError("Stream thread is already running")
|
|
||||||
|
|
||||||
self.run = True
|
|
||||||
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
|
|
||||||
self.thread.start()
|
|
||||||
|
|
||||||
def end_stream(self):
|
|
||||||
if self.thread is None:
|
|
||||||
raise RuntimeError("No stream thread to terminate")
|
|
||||||
|
|
||||||
self.run = False
|
|
||||||
self.thread.join()
|
|
||||||
self.thread = None
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
from pprint import pprint
|
|
||||||
|
|
||||||
pprint(MEDIASERVER_CONFIG)
|
|
||||||
92
tuuube.py
92
tuuube.py
@ -1,92 +0,0 @@
|
|||||||
#! /usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
We aren't using either the apt or the pip repositories for the youtube_dl
|
|
||||||
as there is a known bug affecting those two versions. The youtube API has changed
|
|
||||||
since their release, causing downloads to fail.
|
|
||||||
Make sure you use the ./setup.sh script to obtain the latest github release of
|
|
||||||
yt_dlp, as this version carries the latest fixes.
|
|
||||||
"""
|
|
||||||
# import youtube_dl
|
|
||||||
import yt_dlp as youtube_dl
|
|
||||||
from streamer import Streamer, is_alive
|
|
||||||
import subprocess
|
|
||||||
import time
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
class Tuuube(Streamer):
|
|
||||||
REST_PATH = "tuuube"
|
|
||||||
|
|
||||||
def __init__(self, name):
|
|
||||||
super().__init__()
|
|
||||||
self.name = name
|
|
||||||
self.playback = None
|
|
||||||
|
|
||||||
def source_path(self):
|
|
||||||
return f"/tmp/{self.name}.mp3"
|
|
||||||
|
|
||||||
def _stream_thread(self):
|
|
||||||
if not os.path.exists(self.source_path()) or not os.path.isfile(
|
|
||||||
self.source_path()
|
|
||||||
):
|
|
||||||
ydl_opts = {
|
|
||||||
"format": "bestaudio/best",
|
|
||||||
"outtmpl": f"/tmp/{self.name}.%(ext)s", # yt_dlp will append %(ext) if not specified,
|
|
||||||
"postprocessors": [ # resulting in `/tmp/file.mp3.mp3` :/
|
|
||||||
{
|
|
||||||
"key": "FFmpegExtractAudio",
|
|
||||||
"preferredcodec": "mp3",
|
|
||||||
"preferredquality": "192",
|
|
||||||
}
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
|
||||||
ydl.download([f"https://www.youtube.com/watch?v={self.name}"])
|
|
||||||
except Exception as e:
|
|
||||||
print(f"File sourcing failed, aborting stream. {e}", file=sys.stderr)
|
|
||||||
self.run = False
|
|
||||||
return
|
|
||||||
|
|
||||||
self.playback = subprocess.Popen(
|
|
||||||
[
|
|
||||||
"/usr/bin/ffmpeg",
|
|
||||||
"-re",
|
|
||||||
"-stream_loop",
|
|
||||||
"-1",
|
|
||||||
"-i",
|
|
||||||
self.source_path(),
|
|
||||||
"-c",
|
|
||||||
"copy",
|
|
||||||
"-f",
|
|
||||||
"rtsp",
|
|
||||||
self.stream_address("localhost"),
|
|
||||||
],
|
|
||||||
stdin=subprocess.PIPE,
|
|
||||||
stdout=subprocess.DEVNULL,
|
|
||||||
stderr=subprocess.DEVNULL,
|
|
||||||
)
|
|
||||||
|
|
||||||
while self.run:
|
|
||||||
if not is_alive(self.playback):
|
|
||||||
print("Playback failed, aborting stream.", file=sys.stderr)
|
|
||||||
break
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
self.run = False
|
|
||||||
|
|
||||||
self.playback.kill()
|
|
||||||
self.playback.wait()
|
|
||||||
self.playback = None
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
tube = Tuuube("BaW_jenozKc")
|
|
||||||
tube.start_stream()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
print(tube.is_streaming())
|
|
||||||
time.sleep(1)
|
|
||||||
Loading…
Reference in New Issue
Block a user