From 3206a9b1ff4a9a08e1b8b9afe917139dc8e30aec Mon Sep 17 00:00:00 2001 From: Jono Targett Date: Thu, 15 Jun 2023 15:16:24 +0930 Subject: [PATCH] Ran everything through the 'black' formatter --- fileradio.py | 29 +++--- filters.py | 219 +++++++++++++++++++++---------------------- fm_demod.py | 240 ++++++++++++++++++++++++++---------------------- formats.py | 12 ++- microservice.py | 94 +++++++++---------- radio.py | 123 ++++++++++++++----------- samplerates.py | 34 ++++--- soapyhelpers.py | 2 + streamer.py | 16 ++-- tuuube.py | 57 +++++++----- 10 files changed, 444 insertions(+), 382 deletions(-) diff --git a/fileradio.py b/fileradio.py index 66ae818..b597934 100644 --- a/fileradio.py +++ b/fileradio.py @@ -6,7 +6,7 @@ import os class FileRadio(Streamer): - REST_PATH = 'sample' + REST_PATH = "sample" def __init__(self, path): super().__init__() @@ -17,22 +17,26 @@ class FileRadio(Streamer): def _stream_thread(self): self.playback = subprocess.Popen( [ - '/usr/bin/ffmpeg', - '-re', # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag - '-stream_loop', '-1', # Loop the stream indefinitely - '-i', self.path, - '-c', 'copy', - '-f', 'rtsp', - self.stream_address('localhost') + "/usr/bin/ffmpeg", + "-re", # http://trac.ffmpeg.org/wiki/StreamingGuide#The-reflag + "-stream_loop", # Loop the stream - + "-1", # ...indefinitely + "-i", + self.path, + "-c", + "copy", + "-f", + "rtsp", + self.stream_address("localhost"), ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, ) while self.run: if not is_alive(self.playback): - print('Playback failed, aborting stream.', file=sys.stderr) + print("Playback failed, aborting stream.", file=sys.stderr) break time.sleep(0.1) @@ -43,9 +47,8 @@ class FileRadio(Streamer): self.playback = None - -if __name__ == '__main__': - fr = FileRadio('./data/sampleaudio/taunt.mp3') +if __name__ == "__main__": + fr = FileRadio("./data/sampleaudio/taunt.mp3") fr.start_stream() while True: diff --git a/filters.py b/filters.py index 7d5e974..f0fdd84 100644 --- a/filters.py +++ b/filters.py @@ -10,156 +10,159 @@ import numpy, math, sys, time from numpy import fft + def impulse(mask): - ''' Convert frequency domain mask to time-domain ''' - # Negative side, a mirror of positive side - negatives = mask[1:-1] - negatives.reverse() - mask = mask + negatives - fft_length = len(mask) + """Convert frequency domain mask to time-domain""" + # Negative side, a mirror of positive side + negatives = mask[1:-1] + negatives.reverse() + mask = mask + negatives + fft_length = len(mask) - # Convert FFT filter mask to FIR coefficients - impulse_response = fft.ifft(mask).real.tolist() + # Convert FFT filter mask to FIR coefficients + impulse_response = fft.ifft(mask).real.tolist() - # swap left and right sides - left = impulse_response[:fft_length // 2] - right = impulse_response[fft_length // 2:] - impulse_response = right + left + # swap left and right sides + left = impulse_response[: fft_length // 2] + right = impulse_response[fft_length // 2 :] + impulse_response = right + left - return impulse_response + return impulse_response def lo_mask(sample_rate, tap_count, freq, dboct): - ''' Create a freq domain mask for a lowpass filter ''' - order = dboct / 6 - max_freq = sample_rate / 2.0 - f2s = max_freq / (tap_count / 2.0) - # Convert freq to filter step unit - freq /= f2s - l = tap_count // 2 - mask = [] - for f in range(0, l+1): - H = 1.0 / ( 1 + (f / freq) ** (2 * order) ) ** 0.5 - mask.append(H) - return mask + """Create a freq domain mask for a lowpass filter""" + order = dboct / 6 + max_freq = sample_rate / 2.0 + f2s = max_freq / (tap_count / 2.0) + # Convert freq to filter step unit + freq /= f2s + l = tap_count // 2 + mask = [] + for f in range(0, l + 1): + H = 1.0 / (1 + (f / freq) ** (2 * order)) ** 0.5 + mask.append(H) + return mask def hi_mask(sample_rate, tap_count, freq, dboct): - ''' Create a freq domain mask for a highpass filter ''' - order = dboct / 6 - max_freq = sample_rate / 2.0 - f2s = max_freq / (tap_count / 2.0) - # Convert freq frequency to filter step unit - freq /= f2s - l = tap_count // 2 - mask = [] - for f in range(0, l+1): - H = 1.0 / ( 1 + (freq / (f + 0.0001)) ** (2 * order) ) ** 0.5 - mask.append(H) - return mask + """Create a freq domain mask for a highpass filter""" + order = dboct / 6 + max_freq = sample_rate / 2.0 + f2s = max_freq / (tap_count / 2.0) + # Convert freq frequency to filter step unit + freq /= f2s + l = tap_count // 2 + mask = [] + for f in range(0, l + 1): + H = 1.0 / (1 + (freq / (f + 0.0001)) ** (2 * order)) ** 0.5 + mask.append(H) + return mask def combine_masks(mask1, mask2): - ''' Combine two filter masks ''' - assert len(mask1) == len(mask2) - return [ mask1[i] * mask2[i] for i in range(0, len(mask1)) ] + """Combine two filter masks""" + assert len(mask1) == len(mask2) + return [mask1[i] * mask2[i] for i in range(0, len(mask1))] def taps(sample_rate, freq, dboct, is_highpass): - cutoff_octaves = 60 / dboct + cutoff_octaves = 60 / dboct - if is_highpass: - cutoff = freq / 2 ** cutoff_octaves - else: - cutoff = freq * 2 ** cutoff_octaves - cutoff = min(cutoff, sample_rate / 2) + if is_highpass: + cutoff = freq / 2**cutoff_octaves + else: + cutoff = freq * 2**cutoff_octaves + cutoff = min(cutoff, sample_rate / 2) - transition_band = abs(freq - cutoff) - Bt = transition_band / sample_rate - taps = int(60 / (22 * Bt)) - # print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr) - return taps + transition_band = abs(freq - cutoff) + Bt = transition_band / sample_rate + taps = int(60 / (22 * Bt)) + # print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr) + return taps class filter: - def __init__(self, sample_rate, cutoff): - raise "Abstract" + def __init__(self, sample_rate, cutoff): + raise "Abstract" - def feed(self, original): - unfiltered = numpy.concatenate((self.buf, original)) - self.buf = unfiltered[-len(self.coefs):] - filtered = numpy.convolve(unfiltered, self.coefs, mode='valid') - assert len(filtered) == len(original) + 1 - return filtered[1:] + def feed(self, original): + unfiltered = numpy.concatenate((self.buf, original)) + self.buf = unfiltered[-len(self.coefs) :] + filtered = numpy.convolve(unfiltered, self.coefs, mode="valid") + assert len(filtered) == len(original) + 1 + return filtered[1:] class low_pass(filter): - def __init__(self, sample_rate, f, dbo): - tap_count = taps(sample_rate, f, dbo, False) - mask = lo_mask(sample_rate, tap_count, f, dbo) - self.coefs = impulse(mask) - self.buf = [ 0 for n in self.coefs ] + def __init__(self, sample_rate, f, dbo): + tap_count = taps(sample_rate, f, dbo, False) + mask = lo_mask(sample_rate, tap_count, f, dbo) + self.coefs = impulse(mask) + self.buf = [0 for n in self.coefs] class high_pass(filter): - def __init__(self, sample_rate, f, dbo): - tap_count = taps(sample_rate, f, dbo, True) - mask = hi_mask(sample_rate, tap_count, f, dbo) - self.coefs = impulse(mask) - self.buf = [ 0 for n in self.coefs ] + def __init__(self, sample_rate, f, dbo): + tap_count = taps(sample_rate, f, dbo, True) + mask = hi_mask(sample_rate, tap_count, f, dbo) + self.coefs = impulse(mask) + self.buf = [0 for n in self.coefs] class band_pass(filter): - def __init__(self, sample_rate, lo, hi, dbo): - tap_count = max(taps(sample_rate, lo, dbo, True), - taps(sample_rate, hi, dbo, False)) - lomask = lo_mask(sample_rate, tap_count, hi, dbo) - himask = hi_mask(sample_rate, tap_count, lo, dbo) - mask = combine_masks(lomask, himask) - self.coefs = impulse(mask) - self.buf = [ 0 for n in self.coefs ] + def __init__(self, sample_rate, lo, hi, dbo): + tap_count = max( + taps(sample_rate, lo, dbo, True), taps(sample_rate, hi, dbo, False) + ) + lomask = lo_mask(sample_rate, tap_count, hi, dbo) + himask = hi_mask(sample_rate, tap_count, lo, dbo) + mask = combine_masks(lomask, himask) + self.coefs = impulse(mask) + self.buf = [0 for n in self.coefs] class deemphasis(filter): - def __init__(self, sample_rate, us, hi, final_dbo): - # us = RC constant of the hypothetical deemphasis filter - us /= 1000000 - # 0..lo is not deemphasized - lo = 1.0 / (2 * math.pi * us) - # attenuation from lo to hi should be 10dB - octaves = math.log(hi / lo) / math.log(2) - # slope in dB/octave of deemphasis filter - dedbo = 10 / octaves + def __init__(self, sample_rate, us, hi, final_dbo): + # us = RC constant of the hypothetical deemphasis filter + us /= 1000000 + # 0..lo is not deemphasized + lo = 1.0 / (2 * math.pi * us) + # attenuation from lo to hi should be 10dB + octaves = math.log(hi / lo) / math.log(2) + # slope in dB/octave of deemphasis filter + dedbo = 10 / octaves - tap_count = max(taps(sample_rate, lo, dedbo, False), - taps(sample_rate, hi, final_dbo, False)) + tap_count = max( + taps(sample_rate, lo, dedbo, False), taps(sample_rate, hi, final_dbo, False) + ) - # Calculate deemphasis filter - demask = lo_mask(sample_rate, tap_count, lo, dedbo) - # Calculate low-pass filter after deemphasis - fmask = lo_mask(sample_rate, tap_count, hi, final_dbo) + # Calculate deemphasis filter + demask = lo_mask(sample_rate, tap_count, lo, dedbo) + # Calculate low-pass filter after deemphasis + fmask = lo_mask(sample_rate, tap_count, hi, final_dbo) - mask = combine_masks(demask, fmask) - self.coefs = impulse(mask) - self.buf = [ 0 for n in self.coefs ] + mask = combine_masks(demask, fmask) + self.coefs = impulse(mask) + self.buf = [0 for n in self.coefs] class decimator(filter): - def __init__(self, factor): - self.buf2 = [] - self.factor = int(factor) + def __init__(self, factor): + self.buf2 = [] + self.factor = int(factor) - def feed(self, original): - original = numpy.concatenate((self.buf2, original)) + def feed(self, original): + original = numpy.concatenate((self.buf2, original)) - # Gets the last n-th sample of every n (n = factor) - # If e.g. gets 12 samples, gets s[4] and s[9], and - # stoves s[10:] to the next round - ''' + # Gets the last n-th sample of every n (n = factor) + # If e.g. gets 12 samples, gets s[4] and s[9], and + # stoves s[10:] to the next round + """ decimated = [ original[ self.factor * i + self.factor - 1 ] \ for i in range(0, len(original) // self.factor) ] - ''' - decimated = original[(self.factor - 1)::self.factor] - self.buf2 = original[:-len(original) % self.factor] + """ + decimated = original[(self.factor - 1) :: self.factor] + self.buf2 = original[: -len(original) % self.factor] - return decimated + return decimated diff --git a/fm_demod.py b/fm_demod.py index 9e45130..75b5dd1 100755 --- a/fm_demod.py +++ b/fm_demod.py @@ -19,12 +19,24 @@ from formats import FORMATS parser = argparse.ArgumentParser() -parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true') -parser.add_argument('-f', '--format', choices=list(FORMATS.keys()), help='Input sample format', required=True) -parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True) -parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True) +parser.add_argument( + "-v", "--verbose", help="Print additional informational output", action="store_true" +) +parser.add_argument( + "-f", + "--format", + choices=list(FORMATS.keys()), + help="Input sample format", + required=True, +) +parser.add_argument( + "-s", "--sample-rate", metavar="rate", help="Source sample rate (Hz)", required=True +) +parser.add_argument( + "-d", "--demod-rate", metavar="rate", help="Output sample rate (Hz)", required=True +) # TODO JMT: Output to file -#parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.') +# parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.') args = parser.parse_args() INPUT_RATE = int(prefixed.Float(args.sample_rate)) @@ -32,16 +44,19 @@ OUTPUT_RATE = int(prefixed.Float(args.demod_rate)) DECIMATION = INPUT_RATE / OUTPUT_RATE if DECIMATION != math.floor(DECIMATION): - print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr) - sys.exit(1) + print( + f"The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}", + file=sys.stderr, + ) + sys.exit(1) FORMAT = FORMATS[args.format].numpy DT = np.dtype(FORMAT) BYTES_PER_SAMPLE = 2 * DT.itemsize -MAX_DEVIATION = 200000.0 # Hz -FM_BANDWIDTH = 15000 # Hz -STEREO_CARRIER = 38000 # Hz +MAX_DEVIATION = 200000.0 # Hz +FM_BANDWIDTH = 15000 # Hz +STEREO_CARRIER = 38000 # Hz DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2)) pll = math.pi - random.random() * 2 * math.pi @@ -63,137 +78,138 @@ decimate2 = filters.decimator(DECIMATION) lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120) # Band-pass filter for stereo (L-R) modulated audio -hi = filters.band_pass(INPUT_RATE, - STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120) +hi = filters.band_pass( + INPUT_RATE, STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120 +) # Filter to extract pilot signal -pilot = filters.band_pass(INPUT_RATE, - STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120) +pilot = filters.band_pass( + INPUT_RATE, STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120 +) last_angle = 0.0 -remaining_data = b'' +remaining_data = b"" while True: - # Ingest 0.1s worth of data - data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10) - if not data: - break - # TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1 - #data = remaining_data + data + # Ingest 0.1s worth of data + data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10) + if not data: + break + # TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1 + # data = remaining_data + data - if len(data) < 2 * BYTES_PER_SAMPLE: - remaining_data = data - continue + if len(data) < 2 * BYTES_PER_SAMPLE: + remaining_data = data + continue - # Save one sample to next batch, and the odd byte if exists - if len(data) % 2 == 1: - print("Odd byte, that's odd", file=sys.stderr) - remaining_data = data[-3:] - data = data[:-1] - else: - remaining_data = data[-2:] + # Save one sample to next batch, and the odd byte if exists + if len(data) % 2 == 1: + print("Odd byte, that's odd", file=sys.stderr) + remaining_data = data[-3:] + data = data[:-1] + else: + remaining_data = data[-2:] - samples = len(data) // BYTES_PER_SAMPLE + samples = len(data) // BYTES_PER_SAMPLE - # Find angle (phase) of I/Q pairs - iqdata = np.frombuffer(data, dtype=FORMAT) + # Find angle (phase) of I/Q pairs + iqdata = np.frombuffer(data, dtype=FORMAT) - if args.verbose: - print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) + if args.verbose: + print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) - if np.issubdtype(FORMAT, np.integer): - iinfo = np.iinfo(FORMAT) - if np.issubdtype(FORMAT, np.unsignedinteger): - iqdata = iqdata - (iinfo.max / 2.0) - iqdata = iqdata / (iinfo.max / 2.0) - else: - iqdata = iqdata / np.float64(iinfo.max) - else: - iqdata = iqdata.astype(np.float64) + if np.issubdtype(FORMAT, np.integer): + iinfo = np.iinfo(FORMAT) + if np.issubdtype(FORMAT, np.unsignedinteger): + iqdata = iqdata - (iinfo.max / 2.0) + iqdata = iqdata / (iinfo.max / 2.0) + else: + iqdata = iqdata / np.float64(iinfo.max) + else: + iqdata = iqdata.astype(np.float64) - if args.verbose: - print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) + if args.verbose: + print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) - iqdata = iqdata.view(complex) - angles = np.angle(iqdata) + iqdata = iqdata.view(complex) + angles = np.angle(iqdata) - # Determine phase rotation between samples - rotations = np.ediff1d(angles) + # Determine phase rotation between samples + rotations = np.ediff1d(angles) - # Wrap rotations >= +/-180º - rotations = (rotations + np.pi) % (2 * np.pi) - np.pi + # Wrap rotations >= +/-180º + rotations = (rotations + np.pi) % (2 * np.pi) - np.pi - # Convert rotations to baseband signal - output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL) - output_raw = np.clip(output_raw, -0.999, +0.999) + # Convert rotations to baseband signal + output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL) + output_raw = np.clip(output_raw, -0.999, +0.999) - # At this point, output_raw contains two audio signals: - # L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC, - # carrier 38kHz + # At this point, output_raw contains two audio signals: + # L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC, + # carrier 38kHz - # Downsample and low-pass L+R (mono) signal - output_mono = lo.feed(output_raw) - output_mono = decimate1.feed(output_mono) + # Downsample and low-pass L+R (mono) signal + output_mono = lo.feed(output_raw) + output_mono = decimate1.feed(output_mono) - # Filter pilot tone - detected_pilot = pilot.feed(output_raw) + # Filter pilot tone + detected_pilot = pilot.feed(output_raw) - # Separate ultrasonic L-R signal by high-pass filtering - output_jstereo_mod = hi.feed(output_raw) - output_jstereo = [] + # Separate ultrasonic L-R signal by high-pass filtering + output_jstereo_mod = hi.feed(output_raw) + output_jstereo = [] - # Demodulate L-R, which is AM-SC with 53kHz carrier - for n in range(0, len(output_jstereo_mod)): - # Advance carrier - pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau - # Standard demodulation - output_jstereo.append(math.cos(pll) * output_jstereo_mod[n]) + # Demodulate L-R, which is AM-SC with 53kHz carrier + for n in range(0, len(output_jstereo_mod)): + # Advance carrier + pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau + # Standard demodulation + output_jstereo.append(math.cos(pll) * output_jstereo_mod[n]) - # Detect pilot zero-crossing - cur_pilot = detected_pilot[n] - zero_crossed = (cur_pilot * last_pilot) <= 0 - last_pilot = cur_pilot - if not zero_crossed: - continue + # Detect pilot zero-crossing + cur_pilot = detected_pilot[n] + zero_crossed = (cur_pilot * last_pilot) <= 0 + last_pilot = cur_pilot + if not zero_crossed: + continue - # When pilot is at 90º or 270º, carrier should be around 180º - ideal = math.pi - deviation = pll - ideal - if deviation > math.pi: - deviation -= tau - deviation_avg = 0.99 * deviation_avg + 0.01 * deviation - rotation = deviation_avg - last_deviation_avg - last_deviation_avg = deviation_avg + # When pilot is at 90º or 270º, carrier should be around 180º + ideal = math.pi + deviation = pll - ideal + if deviation > math.pi: + deviation -= tau + deviation_avg = 0.99 * deviation_avg + 0.01 * deviation + rotation = deviation_avg - last_deviation_avg + last_deviation_avg = deviation_avg - if abs(deviation_avg) > math.pi / 8: - # big phase deviation, reset PLL - pll = ideal - pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau - deviation_avg = 0.0 - last_deviation_avg = 0.0 + if abs(deviation_avg) > math.pi / 8: + # big phase deviation, reset PLL + pll = ideal + pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau + deviation_avg = 0.0 + last_deviation_avg = 0.0 - # Translate rotation to frequency deviation - STEREO_CARRIER /= (1 + (rotation * 1.05) / tau) + # Translate rotation to frequency deviation + STEREO_CARRIER /= 1 + (rotation * 1.05) / tau + # Downsample, Low-pass/deemphasis demodulated L-R + output_jstereo = lo_r.feed(output_jstereo) + output_jstereo = decimate2.feed(output_jstereo) - # Downsample, Low-pass/deemphasis demodulated L-R - output_jstereo = lo_r.feed(output_jstereo) - output_jstereo = decimate2.feed(output_jstereo) + assert len(output_jstereo) == len(output_mono) - assert len(output_jstereo) == len(output_mono) + # Scale to 16-bit and divide by 2 for channel sum + output_mono = np.multiply(output_mono, 32767 / 2.0) + output_jstereo = np.multiply(output_jstereo, 32767 / 2.0) - # Scale to 16-bit and divide by 2 for channel sum - output_mono = np.multiply(output_mono, 32767 / 2.0) - output_jstereo = np.multiply(output_jstereo, 32767 / 2.0) + # Output stereo by adding or subtracting joint-stereo to mono + output_left = output_mono + output_jstereo + output_right = output_mono - output_jstereo - # Output stereo by adding or subtracting joint-stereo to mono - output_left = output_mono + output_jstereo - output_right = output_mono - output_jstereo + # Interleave L and R samples using np trickery + output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype) + output[0::2] = output_left + output[1::2] = output_right + output = output.astype(int) - # Interleave L and R samples using np trickery - output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype) - output[0::2] = output_left - output[1::2] = output_right - output = output.astype(int) - - sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output)) + sys.stdout.buffer.write(struct.pack("<%dh" % len(output), *output)) diff --git a/formats.py b/formats.py index 7af86fb..acc2301 100644 --- a/formats.py +++ b/formats.py @@ -2,6 +2,7 @@ import numpy as np from SoapySDR import * from dataclasses import dataclass + @dataclass class FormatSpec: name: str @@ -9,9 +10,10 @@ class FormatSpec: numpy: np.dtype packing: str + FORMATS = { - 'CU8': FormatSpec('CU8', SOAPY_SDR_CU8, np.uint8, '=%dB'), - 'CS8': FormatSpec('CS8', SOAPY_SDR_CS8, np.int8, '=%db'), - 'CS16': FormatSpec('CS16', SOAPY_SDR_CS16, np.int16, '=%dh'), - 'CF32': FormatSpec('CF32', SOAPY_SDR_CF32, np.float32, '=%df'), -} \ No newline at end of file + "CU8": FormatSpec("CU8", SOAPY_SDR_CU8, np.uint8, "=%dB"), + "CS8": FormatSpec("CS8", SOAPY_SDR_CS8, np.int8, "=%db"), + "CS16": FormatSpec("CS16", SOAPY_SDR_CS16, np.int16, "=%dh"), + "CF32": FormatSpec("CF32", SOAPY_SDR_CF32, np.float32, "=%df"), +} diff --git a/microservice.py b/microservice.py index 5e3a27a..dda1a89 100755 --- a/microservice.py +++ b/microservice.py @@ -17,7 +17,8 @@ swag = Swagger(app) radios = {} -@app.route('/report') + +@app.route("/report") def report(): """Get streams report from the RTSP relay. --- @@ -28,19 +29,18 @@ def report(): description: JSON with error message. """ try: - r = requests.get("http://localhost:9997/v1/paths/list") - j = r.json() - for item in j['items']: - del j['items'][item]['conf'] - del j['items'][item]['confName'] + r = requests.get("http://localhost:9997/v1/paths/list") + j = r.json() + for item in j["items"]: + del j["items"][item]["conf"] + del j["items"][item]["confName"] - - return jsonify(j) + return jsonify(j) except Exception as e: return _error_message_json(e.message), 500 -@app.route('/radio/report') +@app.route("/radio/report") def radio_report(): """List radio devices available to the system. --- @@ -52,7 +52,8 @@ def radio_report(): devices = [dict(device) for device in soapy.Device.enumerate()] return jsonify(devices) -@app.route('/radio//connect') + +@app.route("/radio//connect") def connect(radio): """Connect to a radio device, by driver name or serial number. --- @@ -78,15 +79,15 @@ def connect(radio): if radio in device.values(): try: radios[radio] = Radio(radio, device) - return jsonify("message","successfully connected radio"), 200 + return jsonify("message", "successfully connected radio"), 200 except Exception as e: radios.pop(radio) return _error_message_json(e.message), 500 - return "Radio device not found", 400 -@app.route('/radio//disconnect') + +@app.route("/radio//disconnect") def disconnect(radio): """Disconnect from a radio device. --- @@ -106,11 +107,12 @@ def disconnect(radio): """ if radio in radios: radios.pop(radio) - return jsonify("message","succesfully disconnected radio"), 200 + return jsonify("message", "succesfully disconnected radio"), 200 else: return _error_message_json("Radio not connected"), 400 -@app.route('/radio//configure/') + +@app.route("/radio//configure/") def configure(radio, frequency): """Tune the radio to a frequency. You must connect to the radio before attempting to configure it. @@ -138,7 +140,8 @@ def configure(radio, frequency): else: return _error_message_json("Radio not connected"), 400 -@app.route('/radio//start') + +@app.route("/radio//start") def start_stream(radio): """Start the radio stream. Once the stream has been started, connect to the stream at: @@ -151,18 +154,19 @@ def start_stream(radio): type: string required: true responses: - 200: + 200: description: JSON with message successful start of radio stream. 400: description: JSON with error message. """ try: radios[radio].start_stream() - return jsonify("message","successfully started radio stream"), 200 + return jsonify("message", "successfully started radio stream"), 200 except Exception as e: return _error_message_json(e.message), 400 -@app.route('/radio//end') + +@app.route("/radio//end") def end_stream(radio): """Terminate the radio stream. --- @@ -180,12 +184,13 @@ def end_stream(radio): """ try: radios[radio].end_stream() - return jsonify("message","successfully ended radio stream"), 200 + return jsonify("message", "successfully ended radio stream"), 200 except Exception as e: error_message = {"error_message": e.message} return _error_message_json(e.message), 400 -@app.route('/radio//info') + +@app.route("/radio//info") def radio_info(radio): """Get information about a radio. --- @@ -209,7 +214,8 @@ def radio_info(radio): tubes = {} -@app.route('/tuuube//start') + +@app.route("/tuuube//start") def start_tuuube_stream(id): """Start streaming from a youtube source. Once the stream has been started, connect to the stream at: @@ -238,11 +244,12 @@ def start_tuuube_stream(id): try: tubes[id].start_stream() - return jsonify("message","successfully started youtube stream"), 200 + return jsonify("message", "successfully started youtube stream"), 200 except Exception as e: return _error_message_json(e.message), 400 -@app.route('/tuuube//end') + +@app.route("/tuuube//end") def end_tuuube_stream(id): """Terminate the youtube stream. --- @@ -260,47 +267,42 @@ def end_tuuube_stream(id): """ try: tubes[id].end_stream() - return jsonify("message","succesfully ended youtube stream"), 200 + return jsonify("message", "succesfully ended youtube stream"), 200 except Exception as e: return _error_message_json(e.message), 400 - """ Helper function to return a JSON error message parameters: error_message - the error message to return returns: a JSON object with the error message """ -def _error_message_json(error_message:str): + + +def _error_message_json(error_message: str): error_message = {"error_message": error_message} return jsonify(error_message) -if __name__ == '__main__': + +if __name__ == "__main__": import subprocess rtsp_relay = subprocess.Popen( - [ - './dependencies/mediamtx/mediamtx', - './mediamtx.yml' - ], + ["./dependencies/mediamtx/mediamtx", "./mediamtx.yml"], stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, ) - for path, _, files in os.walk('./data/sampleaudio'): + for path, _, files in os.walk("./data/sampleaudio"): for file in files: - name,ext = os.path.splitext(file) - if ext == '.mp3': - tubes[name] = FileRadio(f"{path}/{file}") - tubes[name].start_stream() + name, ext = os.path.splitext(file) + if ext == ".mp3": + tubes[name] = FileRadio(f"{path}/{file}") + tubes[name].start_stream() - app.run( - host='0.0.0.0', - threaded=True, - debug=False - ) + app.run(host="0.0.0.0", threaded=True, debug=False) - print('Stopping any currently streaming radios...') + print("Stopping any currently streaming radios...") for radio in radios: if radios[radio].is_streaming(): radios[radio].end_stream() @@ -311,6 +313,6 @@ if __name__ == '__main__': tubes[tube].end_stream() tubes = None - print('Killing RTSP relay...') + print("Killing RTSP relay...") rtsp_relay.kill() - rtsp_relay.wait() # Necessary? + rtsp_relay.wait() # Necessary? diff --git a/radio.py b/radio.py index 24d0578..5f08b10 100644 --- a/radio.py +++ b/radio.py @@ -11,8 +11,8 @@ from streamer import Streamer, is_alive class Radio(Streamer): - REST_PATH = 'radio' - FORMAT = 'CS16' + REST_PATH = "radio" + FORMAT = "CS16" SAMPLES = 8192 def __init__(self, name, device_info): @@ -33,7 +33,7 @@ class Radio(Streamer): frequency = int(prefixed.Float(frequency)) bandwidth = 200000 - sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates']) + sample_rates = preferred_sample_rates(self.capabilities["rx"]["sample-rates"]) if len(sample_rates) == 0: raise RuntimeError("No suitable sample rates are available") self.sample_rate, self.output_rate = sample_rates[0] @@ -46,41 +46,43 @@ class Radio(Streamer): self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) return { - 'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), - 'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), - 'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), - 'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual', + "frequency": self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), + "sample-rate": self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), + "bandwidth": self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), + "gain-mode": "auto" + if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) + else "manual", } def get_info(self): return { - 'name': self.name, - 'device': self.device_info, - 'capabilities': self.capabilities, - 'stream-path': self._stream_path(), - 'streaming': self.is_streaming(), + "name": self.name, + "device": self.device_info, + "capabilities": self.capabilities, + "stream-path": self._stream_path(), + "streaming": self.is_streaming(), } def _get_capabilities(self): def get_direction_capabilities(direction): return { - 'antennas': self.device.listAntennas(direction, 0), - 'gains': self.device.listGains(direction, 0), - 'frequencies': self.device.listFrequencies(direction, 0), - 'sample-rates': self.device.listSampleRates(direction, 0), - 'bandwidths': self.device.listBandwidths(direction, 0), - 'sensors': self.device.listSensors(direction, 0), - 'formats': self.device.getStreamFormats(direction, 0), + "antennas": self.device.listAntennas(direction, 0), + "gains": self.device.listGains(direction, 0), + "frequencies": self.device.listFrequencies(direction, 0), + "sample-rates": self.device.listSampleRates(direction, 0), + "bandwidths": self.device.listBandwidths(direction, 0), + "sensors": self.device.listSensors(direction, 0), + "formats": self.device.getStreamFormats(direction, 0), } return { - 'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX), - 'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX), - 'clock-sources': self.device.listClockSources(), - 'time-sources': self.device.listTimeSources(), - 'register-interfaces': self.device.listRegisterInterfaces(), - 'gpios': self.device.listGPIOBanks(), - 'uarts': self.device.listUARTs(), + "rx": get_direction_capabilities(soapy.SOAPY_SDR_RX), + "tx": get_direction_capabilities(soapy.SOAPY_SDR_TX), + "clock-sources": self.device.listClockSources(), + "time-sources": self.device.listTimeSources(), + "register-interfaces": self.device.listRegisterInterfaces(), + "gpios": self.device.listGPIOBanks(), + "uarts": self.device.listUARTs(), } def _stream_thread(self): @@ -89,7 +91,7 @@ class Radio(Streamer): while self.run: # Check that the child processes are still running if (not is_alive(self.demod)) or (not is_alive(self.playback)): - print('DSP chain failed, aborting stream.', file=sys.stderr) + print("DSP chain failed, aborting stream.", file=sys.stderr) break result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) @@ -99,14 +101,18 @@ class Radio(Streamer): elif result.ret < 0: error = SoapyError(result.ret) if error is not SoapyError.Timeout: - print("Stream read failed, aborting stream:", error, file=sys.stderr) + print( + "Stream read failed, aborting stream:", error, file=sys.stderr + ) break continue else: read_size = int(result.ret * 2) self.demod.stdin.write( - struct.pack(FORMATS[Radio.FORMAT].packing % read_size, - *self.buffer[:read_size]) + struct.pack( + FORMATS[Radio.FORMAT].packing % read_size, + *self.buffer[:read_size], + ) ) self._cleanup_stream() @@ -114,32 +120,44 @@ class Radio(Streamer): def _init_stream(self): self.playback = subprocess.Popen( [ - '/usr/bin/ffmpeg', - '-f', 's16le', - '-ar', str(self.output_rate), - '-ac', '2', - '-i', '-', - '-f', 'rtsp', self.stream_address() + "/usr/bin/ffmpeg", + "-f", + "s16le", + "-ar", + str(self.output_rate), + "-ac", + "2", + "-i", + "-", + "-f", + "rtsp", + self.stream_address(), ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, ) self.demod = subprocess.Popen( [ - '/usr/bin/python3', 'fm_demod.py', - '-f', 'CS16', - '-s', str(self.sample_rate), - '-d', str(self.output_rate) + "/usr/bin/python3", + "fm_demod.py", + "-f", + "CS16", + "-s", + str(self.sample_rate), + "-d", + str(self.output_rate), ], stdin=subprocess.PIPE, stdout=self.playback.stdin, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, ) self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy) - self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy) + self.stream = self.device.setupStream( + soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy + ) result = self.device.activateStream(self.stream) if result != 0: @@ -164,26 +182,25 @@ class Radio(Streamer): self.playback = None - """ Quick and dirty test of the Radio class. """ -if __name__ == '__main__': +if __name__ == "__main__": import time - sdr = Radio('demo', {'driver': 'sdrplay'}) + sdr = Radio("demo", {"driver": "sdrplay"}) - print('Configuring...') - sdr.configure('105.5M') - print('Configured.') + print("Configuring...") + sdr.configure("105.5M") + print("Configured.") - print('Starting stream...') + print("Starting stream...") sdr.start_stream() - print('Stream started.') + print("Stream started.") # Let the stream play for a while time.sleep(15) - print('Ending stream...') + print("Ending stream...") sdr.end_stream() - print('Stream ended.') + print("Stream ended.") diff --git a/samplerates.py b/samplerates.py index 7ec7ad1..6d8e7eb 100644 --- a/samplerates.py +++ b/samplerates.py @@ -6,7 +6,7 @@ Sample rates stolen from: https://en.wikipedia.org/wiki/Sampling_(signal_processing) """ supported_ouput_rates = [ - 8000, # Telephone, P25 audio (sufficient for speech, some consonants unintelligble) + 8000, # Telephone, P25 audio (sufficient for speech, some consonants unintelligble) 16000, # Modern telephone/VoIP, good quality speech 32000, # FM radio 44100, # CD audio quality @@ -14,9 +14,10 @@ supported_ouput_rates = [ 50000, # Uncommon but supported 88200, 96000, # DVD/Blu-ray audio - 192000, # Too much. + 192000, # Too much. ] + def score(pair, target_output=32000, target_ratio=10): """ Heuristic for scoring input & output sample rates. The criteria are: @@ -36,29 +37,38 @@ def score(pair, target_output=32000, target_ratio=10): ratio = pair[0] // pair[1] - return abs(pair[1] - target_output)/2500 \ - + max(0, target_output - pair[1])/2500 \ - + abs(ratio - target_ratio)**0.8 \ - + max(0, target_ratio - ratio)**2 + return ( + abs(pair[1] - target_output) / 2500 + + max(0, target_output - pair[1]) / 2500 + + abs(ratio - target_ratio) ** 0.8 + + max(0, target_ratio - ratio) ** 2 + ) + def flatten(l): return [item for sublist in l for item in sublist] + def flatten_dict(d): - return [(key,value) for key,rates in d.items() for value in rates] + return [(key, value) for key, rates in d.items() for value in rates] + def get_pairs(input_rate): return [ - (input_rate, rate) - for rate in supported_ouput_rates - if (input_rate % rate == 0) + (input_rate, rate) for rate in supported_ouput_rates if (input_rate % rate == 0) ] + def supported_sample_rates(supported_input_rates): return { - in_rate: [out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0] + in_rate: [ + out_rate for out_rate in supported_ouput_rates if in_rate % out_rate == 0 + ] for in_rate in supported_input_rates } + def preferred_sample_rates(supported_input_rates): - return sorted(flatten_dict(supported_sample_rates(supported_input_rates)), key=score) + return sorted( + flatten_dict(supported_sample_rates(supported_input_rates)), key=score + ) diff --git a/soapyhelpers.py b/soapyhelpers.py index 4721de6..6bd8578 100644 --- a/soapyhelpers.py +++ b/soapyhelpers.py @@ -1,6 +1,7 @@ import SoapySDR as soapy from enum import IntEnum + class SoapyError(IntEnum): Timeout = soapy.SOAPY_SDR_TIMEOUT StreamError = soapy.SOAPY_SDR_STREAM_ERROR @@ -10,6 +11,7 @@ class SoapyError(IntEnum): TimeError = soapy.SOAPY_SDR_TIME_ERROR Underflow = soapy.SOAPY_SDR_UNDERFLOW + class SoapyFlag(IntEnum): EndBurst = soapy.SOAPY_SDR_END_BURST HasTime = soapy.SOAPY_SDR_HAS_TIME diff --git a/streamer.py b/streamer.py index 865542a..0818705 100644 --- a/streamer.py +++ b/streamer.py @@ -2,7 +2,7 @@ from threading import Thread import yaml -with open('mediamtx.yml', 'r') as config_file: +with open("mediamtx.yml", "r") as config_file: MEDIASERVER_CONFIG = yaml.safe_load(config_file) @@ -11,8 +11,8 @@ def is_alive(subprocess): class Streamer: - PROTOCOL = 'rtsp' - REST_PATH = 'stream' + PROTOCOL = "rtsp" + REST_PATH = "stream" def __init__(self): self.run = False @@ -30,7 +30,7 @@ class Streamer: def start_stream(self): if self.is_streaming(): - raise RuntimeError('Stream thread is already running') + raise RuntimeError("Stream thread is already running") self.run = True self.thread = Thread(target=self._stream_thread, daemon=True, args=()) @@ -38,14 +38,14 @@ class Streamer: def end_stream(self): if self.thread is None: - raise RuntimeError('No stream thread to terminate') + raise RuntimeError("No stream thread to terminate") self.run = False self.thread.join() self.thread = None - -if __name__ == '__main__': +if __name__ == "__main__": from pprint import pprint - pprint(MEDIASERVER_CONFIG) \ No newline at end of file + + pprint(MEDIASERVER_CONFIG) diff --git a/tuuube.py b/tuuube.py index 1fd94d0..5a2460a 100755 --- a/tuuube.py +++ b/tuuube.py @@ -7,7 +7,7 @@ since their release, causing downloads to fail. Make sure you use the ./setup.sh script to obtain the latest github release of yt_dlp, as this version carries the latest fixes. """ -#import youtube_dl +# import youtube_dl import yt_dlp as youtube_dl from streamer import Streamer, is_alive import subprocess @@ -17,7 +17,7 @@ import os class Tuuube(Streamer): - REST_PATH = 'tuuube' + REST_PATH = "tuuube" def __init__(self, name): super().__init__() @@ -28,43 +28,51 @@ class Tuuube(Streamer): return f"/tmp/{self.name}.mp3" def _stream_thread(self): - if not os.path.exists(self.source_path()) or not os.path.isfile(self.source_path()): + if not os.path.exists(self.source_path()) or not os.path.isfile( + self.source_path() + ): ydl_opts = { - 'format': 'bestaudio/best', - 'outtmpl': f'/tmp/{self.name}.%(ext)s', # yt_dlp will append %(ext) if not specified, - 'postprocessors': [{ # resulting in `/tmp/file.mp3.mp3` :/ - 'key': 'FFmpegExtractAudio', - 'preferredcodec': 'mp3', - 'preferredquality': '192', - }], + "format": "bestaudio/best", + "outtmpl": f"/tmp/{self.name}.%(ext)s", # yt_dlp will append %(ext) if not specified, + "postprocessors": [ # resulting in `/tmp/file.mp3.mp3` :/ + { + "key": "FFmpegExtractAudio", + "preferredcodec": "mp3", + "preferredquality": "192", + } + ], } try: with youtube_dl.YoutubeDL(ydl_opts) as ydl: - ydl.download([f'https://www.youtube.com/watch?v={self.name}']) + ydl.download([f"https://www.youtube.com/watch?v={self.name}"]) except Exception as e: - print(f'File sourcing failed, aborting stream. {e}', file=sys.stderr) + print(f"File sourcing failed, aborting stream. {e}", file=sys.stderr) self.run = False return self.playback = subprocess.Popen( [ - '/usr/bin/ffmpeg', - '-re', - '-stream_loop', '-1', - '-i', self.source_path(), - '-c', 'copy', - '-f', 'rtsp', - self.stream_address('localhost') + "/usr/bin/ffmpeg", + "-re", + "-stream_loop", + "-1", + "-i", + self.source_path(), + "-c", + "copy", + "-f", + "rtsp", + self.stream_address("localhost"), ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, ) while self.run: if not is_alive(self.playback): - print('Playback failed, aborting stream.', file=sys.stderr) + print("Playback failed, aborting stream.", file=sys.stderr) break time.sleep(0.1) @@ -75,11 +83,10 @@ class Tuuube(Streamer): self.playback = None - -if __name__ == '__main__': - tube = Tuuube('BaW_jenozKc') +if __name__ == "__main__": + tube = Tuuube("BaW_jenozKc") tube.start_stream() while True: print(tube.is_streaming()) - time.sleep(1) \ No newline at end of file + time.sleep(1)