sdrplay-fm-radio/radio.py

81 lines
2.4 KiB
Python

from threading import Thread
import SoapySDR as soapy
import prefixed
from formats import *
import sys
class Radio:
FORMAT = 'CS16'
SAMPLES = 8192
def __init__(self, device_info):
self.device_info = device_info
self.stream = None
self.buffer = None
self.port = None
self.run = False
self.thread = None
self.device = soapy.Device(device_info)
if self.device is None:
raise RuntimeError("Failed to connect to radio device")
def configure(self, frequency):
frequency = int(prefixed.Float(frequency))
sample_rate = 384000
bandwidth = 200000
self.device.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency)
self.device.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate)
self.device.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth)
# Set automatic gain
self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True)
return {
'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0),
'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0),
'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0),
'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual',
}
def start_stream(self):
if self.thread:
raise RuntimeError('Stream thread is already running')
self.buffer = np.array([0] * Radio.SAMPLES * 2, TYPES[Radio.FORMAT])
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT])
self.device.activateStream(self.stream)
self.run = True
self.thread = Thread(target=self._stream_thread, daemon=True, args=())
self.thread.start()
def _stream_thread(self):
while self.run:
result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES)
if result.ret < 1:
print('Stream read failed, exiting.', file=sys.stderr)
self.keep_going = False
self._cleanup_stream()
def end_stream(self):
if self.thread is None:
raise RuntimeError('No stream thread to terminate')
self.run = False
self.thread.join()
self.thread = None
def _cleanup_stream(self):
#shutdown the stream
self.device.deactivateStream(self.stream)
self.device.closeStream(self.stream)