sdrplay-fm-radio/sdrplay_sample

187 lines
6.3 KiB
Python
Executable File

#! /usr/bin/env python3
import SoapySDR as soapy
import numpy as np
import sys
import struct
import argparse
import prefixed
from pprint import pprint
from colorama import Fore, Style
FREQUENCY = 105500000
BANDWIDTH = 200000
SAMPLE_RATE = 1000000
SAMPLE_COUNT = 0
BLOCK_SIZE = 16384
FORMATS = {
'CS16': soapy.SOAPY_SDR_CS16,
'CF32': soapy.SOAPY_SDR_CF32,
}
PACKINGS = {
soapy.SOAPY_SDR_CS16: '=%dh',
soapy.SOAPY_SDR_CF32: '=%df',
}
TYPES = {
soapy.SOAPY_SDR_CS16: np.int16,
soapy.SOAPY_SDR_CF32: np.float32,
}
def get_capabilities(device):
def get_direction_capabilities(direction):
return {
'antennas': device.listAntennas(direction, 0),
'gains': device.listGains(direction, 0),
'frequencies': device.listFrequencies(direction, 0),
'sample-rates': device.listSampleRates(direction, 0),
'bandwidths': device.listBandwidths(direction, 0),
'sensors': device.listSensors(direction, 0),
'formats': device.getStreamFormats(direction, 0),
}
capabilities = {
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
'clock-sources': device.listClockSources(),
'time-sources': device.listTimeSources(),
'register-interfaces': device.listRegisterInterfaces(),
'gpios': device.listGPIOBanks(),
'uarts': device.listUARTs(),
}
return capabilities
parser = argparse.ArgumentParser(epilog='For numerical quantities, SI & IEC prefix suffixing is supported, ie: 1k -> 1000, 1K -> 1024, 1M -> 1000000.')
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list-devices', help='List available devices and exit', action='store_true')
group.add_argument('-d', metavar='device', help='Specify a device by driver or serial')
parser.add_argument('-f', metavar='frequency', help='Frequency to tune to (Hz)')#, required=True)
parser.add_argument('-b', metavar='bandwidth', help='Sampling bandwidth (Hz)', default=BANDWIDTH)
parser.add_argument('-s', metavar='rate', help=f'Sample rate (Hz)', default=SAMPLE_RATE)
parser.add_argument('-g', metavar='gain', help='Signal gain (dB). Omit for automatic gain setting.')
parser.add_argument('-n', metavar='count', help='Specify a number of samples to collect, or 0 for continuous.', default=SAMPLE_COUNT)
parser.add_argument('-o', metavar='file', help='Specify an output file for received samples. For stdout use \'-\'.')
parser.add_argument('--format', choices=FORMATS.keys(), help='Specify the binary format of each sample. Defaults to the first reported supported stream format for the device. Use `-v --list-devices` to find supported output formats.')
args = parser.parse_args()
if args.list_devices:
devices = [dict(device) for device in soapy.Device.enumerate()]
for index,device in enumerate(devices):
print(f"{Fore.RED}{index}: {device['label']}{Style.RESET_ALL}")
if args.verbose:
sdr = soapy.Device(device)
device.update(get_capabilities(sdr))
pprint(device)
sys.exit(0 if len(devices) > 0 else 1)
sdr = None
if args.d:
if args.verbose:
print(f"Searching for device '{args.d}'...", file=sys.stderr)
devices = [dict(device) for device in soapy.Device.enumerate()]
for index,device in enumerate(devices):
if args.d in device.values():
if args.verbose:
print(f"Found device '{args.d}' as '{device['label']}'.", file=sys.stderr)
sdr = soapy.Device(device)
break
if sdr is None:
print(f"Unable to connect to device '{args.d}'.", file=sys.stderr)
sys.exit(1)
if args.f is None:
print('No frequency specified, exiting.', file=sys.stderr)
sys.exit(1)
outfile = None
if args.o:
if args.o == '-':
outfile = sys.stdout.buffer
else:
outfile = open(args.o, 'wb+')
frequency = int(prefixed.Float(args.f))
bandwidth = int(prefixed.Float(args.b))
sample_rate = int(prefixed.Float(args.s))
sample_count = int(prefixed.Float(args.n))
gain = float(prefixed.Float(args.g)) if args.g else None
format = args.format
if format is None:
format = sdr.getStreamFormats(soapy.SOAPY_SDR_RX, 0)[0]
format = FORMATS[format]
if args.verbose:
print(f"Frequency: {frequency} Hz", file=sys.stderr)
print(f"Bandwidth: {bandwidth} Hz", file=sys.stderr)
print(f"Sample rate: {sample_rate} Hz", file=sys.stderr)
print(f"Sample count: {sample_count if sample_count > 0 else 'Continuous'}", file=sys.stderr)
print(f"Gain: {gain} dB" if gain else 'Gain: Automatic', file=sys.stderr)
print(f"Format: {format}", file=sys.stderr)
#apply settings
sdr.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency)
sdr.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate)
sdr.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth)
sdr.setGainMode(soapy.SOAPY_SDR_RX, 0, (gain == None))
if gain is not None:
sdr.setGain(soapy.SOAPY_SDR_RX, 0, gain)
#setup a stream
samples = 1000
buffer = np.array([0] * samples * 2, TYPES[format])
stream = sdr.setupStream(soapy.SOAPY_SDR_RX, format)
sdr.activateStream(stream)
total_samples = 0
keep_going = True
while keep_going:
collect_samples = samples
if sample_count > 0:
collect_samples = min(samples, sample_count - total_samples)
if collect_samples <= 0:
keep_going = False
break
result = sdr.readStream(stream, [buffer], collect_samples)
if args.verbose:
print('Rx:', result.ret, result.flags, result.timeNs, file=sys.stderr)
if result.ret != samples:
continue
if result.ret < 1:
print('Stream read failed, exiting.', file=sys.stderr)
keep_going = False
if outfile:
outfile.write(struct.pack(PACKINGS[format] % len(buffer), *buffer))
if sample_count > 0:
total_samples += result.ret
if outfile and outfile != sys.stdout.buffer:
outfile.close()
#shutdown the stream
sdr.deactivateStream(stream)
sdr.closeStream(stream)