213 lines
7.0 KiB
Python
213 lines
7.0 KiB
Python
from threading import Thread
|
|
import SoapySDR as soapy
|
|
import prefixed
|
|
from formats import *
|
|
import sys
|
|
import subprocess
|
|
import struct
|
|
from soapyhelpers import *
|
|
from samplerates import *
|
|
from kafkalogging import *
|
|
|
|
|
|
class Radio:
|
|
FORMAT = 'CS16'
|
|
SAMPLES = 8192
|
|
PORT = 8554
|
|
|
|
def __init__(self, name, device_info):
|
|
self.name = name
|
|
self.device_info = device_info
|
|
self.run = False
|
|
self.thread = None
|
|
|
|
self.device = soapy.Device(device_info)
|
|
if self.device is None:
|
|
raise RuntimeError("Failed to connect to radio device")
|
|
|
|
self.capabilities = self._get_capabilities()
|
|
|
|
def configure(self, frequency):
|
|
if self.is_streaming():
|
|
raise RuntimeError("Cannot configure radio while a stream is active")
|
|
|
|
frequency = int(prefixed.Float(frequency))
|
|
bandwidth = 200000
|
|
|
|
sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates'])
|
|
if len(sample_rates) == 0:
|
|
raise RuntimeError("No suitable sample rates are available")
|
|
self.sample_rate, self.output_rate = sample_rates[0]
|
|
|
|
self.device.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency)
|
|
self.device.setSampleRate(soapy.SOAPY_SDR_RX, 0, self.sample_rate)
|
|
self.device.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth)
|
|
|
|
# Set automatic gain
|
|
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
|
|
|
|
return {
|
|
'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
|
|
'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
|
|
'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
|
|
'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual',
|
|
}
|
|
|
|
def get_info(self):
|
|
return {
|
|
'name': self.name,
|
|
'device': self.device_info,
|
|
'capabilities': self.capabilities,
|
|
'stream-path': self._stream_path(),
|
|
'streaming': self.is_streaming(),
|
|
}
|
|
|
|
def _get_capabilities(self):
|
|
def get_direction_capabilities(direction):
|
|
return {
|
|
'antennas': self.device.listAntennas(direction, 0),
|
|
'gains': self.device.listGains(direction, 0),
|
|
'frequencies': self.device.listFrequencies(direction, 0),
|
|
'sample-rates': self.device.listSampleRates(direction, 0),
|
|
'bandwidths': self.device.listBandwidths(direction, 0),
|
|
'sensors': self.device.listSensors(direction, 0),
|
|
'formats': self.device.getStreamFormats(direction, 0),
|
|
}
|
|
|
|
return {
|
|
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
|
|
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
|
|
'clock-sources': self.device.listClockSources(),
|
|
'time-sources': self.device.listTimeSources(),
|
|
'register-interfaces': self.device.listRegisterInterfaces(),
|
|
'gpios': self.device.listGPIOBanks(),
|
|
'uarts': self.device.listUARTs(),
|
|
}
|
|
|
|
def is_streaming(self):
|
|
return True if (self.thread and self.thread.is_alive()) else False
|
|
|
|
def start_stream(self):
|
|
if self.is_streaming():
|
|
raise RuntimeError('Stream thread is already running')
|
|
|
|
self.run = True
|
|
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
|
|
self.thread.start()
|
|
|
|
def end_stream(self):
|
|
if self.thread is None:
|
|
raise RuntimeError('No stream thread to terminate')
|
|
|
|
self.run = False
|
|
self.thread.join()
|
|
self.thread = None
|
|
|
|
def _stream_thread(self):
|
|
self._init_stream()
|
|
|
|
def is_alive(subprocess):
|
|
return (subprocess.poll() is None)
|
|
|
|
while self.run:
|
|
# Check that the child processes are still running
|
|
if (not is_alive(self.demod)) or (not is_alive(self.playback)):
|
|
print('DSP chain failed, aborting stream.', file=sys.stderr)
|
|
break
|
|
|
|
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
|
|
|
|
if result == 0:
|
|
continue
|
|
elif result.ret < 0:
|
|
error = SoapyError(result.ret)
|
|
if error is not SoapyError.Timeout:
|
|
print("Stream read failed, aborting stream:", error, file=sys.stderr)
|
|
break
|
|
continue
|
|
else:
|
|
read_size = int(result.ret * 2)
|
|
self.demod.stdin.write(
|
|
struct.pack(FORMATS[Radio.FORMAT].packing % read_size,
|
|
*self.buffer[:read_size])
|
|
)
|
|
|
|
# handle subprocess logs
|
|
check_io(self.demod, f"{self.name}-demod")
|
|
check_io(self.playback, f"{self.name}-playback")
|
|
|
|
|
|
self._cleanup_stream()
|
|
|
|
def _init_stream(self):
|
|
self.playback = subprocess.Popen(
|
|
[
|
|
'/usr/bin/ffmpeg', '-f', 's16le', '-ar', str(self.output_rate), '-ac', '2', '-i', '-',
|
|
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
|
|
],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
self.demod = subprocess.Popen(
|
|
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
|
|
stdin=subprocess.PIPE,
|
|
stdout=self.playback.stdin,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
prep_io(self.demod, f"{self.name}-demod")
|
|
prep_io(self.playback, f"{self.name}-playback")
|
|
|
|
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
|
|
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
|
|
result = self.device.activateStream(self.stream)
|
|
|
|
if result != 0:
|
|
raise RuntimeError(f"Error activating stream: {result}")
|
|
|
|
def _cleanup_stream(self):
|
|
self.run = False
|
|
|
|
# Cleanup the streaming objects in the reverse order to the _init method.
|
|
self.device.deactivateStream(self.stream)
|
|
self.device.closeStream(self.stream)
|
|
self.buffer = None
|
|
|
|
# .terminate()/.kill() followed by .wait() is required to properly clear
|
|
# killed zombie processes from the process table.
|
|
# https://stackoverflow.com/a/41961462
|
|
self.demod.kill()
|
|
self.demod.wait()
|
|
self.demod = None
|
|
self.playback.kill()
|
|
self.playback.wait()
|
|
self.playback = None
|
|
|
|
def _stream_path(self):
|
|
return f":{Radio.PORT}/radio/{self.name}"
|
|
|
|
|
|
"""
|
|
Quick and dirty test of the Radio class.
|
|
"""
|
|
if __name__ == '__main__':
|
|
import time
|
|
|
|
sdr = Radio('demo', {'driver': 'sdrplay'})
|
|
|
|
print('Configuring...')
|
|
sdr.configure('105.5M')
|
|
print('Configured.')
|
|
|
|
print('Starting stream...')
|
|
sdr.start_stream()
|
|
print('Stream started.')
|
|
|
|
# Let the stream play for a while
|
|
time.sleep(15)
|
|
|
|
print('Ending stream...')
|
|
sdr.end_stream()
|
|
print('Stream ended.') |