sdrplay-fm-radio/sdrplay_sample

171 lines
6.1 KiB
Python
Executable File

#! /usr/bin/env python3
import SoapySDR as soapy
import numpy as np
import sys
import struct
import argparse
import prefixed
from pprint import pprint
from colorama import Fore, Style
from formats import FORMATS, PACKINGS, TYPES
FREQUENCY = 105500000
BANDWIDTH = 200000
SAMPLE_RATE = 1000000
SAMPLE_COUNT = 0
BLOCK_SIZE = 16384
def get_capabilities(device):
def get_direction_capabilities(direction):
return {
'antennas': device.listAntennas(direction, 0),
'gains': device.listGains(direction, 0),
'frequencies': device.listFrequencies(direction, 0),
'sample-rates': device.listSampleRates(direction, 0),
'bandwidths': device.listBandwidths(direction, 0),
'sensors': device.listSensors(direction, 0),
'formats': device.getStreamFormats(direction, 0),
}
capabilities = {
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
'clock-sources': device.listClockSources(),
'time-sources': device.listTimeSources(),
'register-interfaces': device.listRegisterInterfaces(),
'gpios': device.listGPIOBanks(),
'uarts': device.listUARTs(),
}
return capabilities
parser = argparse.ArgumentParser(epilog='For numerical quantities, SI & IEC prefix suffixing is supported, ie: 1k -> 1000, 1K -> 1024, 1M -> 1000000.')
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list-devices', help='List available devices and exit', action='store_true')
group.add_argument('-d', metavar='device', help='Specify a device by driver or serial')
parser.add_argument('-f', metavar='frequency', help='Frequency to tune to (Hz)')#, required=True)
parser.add_argument('-b', metavar='bandwidth', help='Sampling bandwidth (Hz)', default=BANDWIDTH)
parser.add_argument('-s', metavar='rate', help=f'Sample rate (Hz)', default=SAMPLE_RATE)
parser.add_argument('-g', metavar='gain', help='Signal gain (dB). Omit for automatic gain setting.')
parser.add_argument('-n', metavar='count', help='Specify a number of samples to collect, or 0 for continuous.', default=SAMPLE_COUNT)
parser.add_argument('-o', metavar='file', help='Specify an output file for received samples. For stdout use \'-\'.')
parser.add_argument('--format', choices=FORMATS.keys(), help='Specify the binary format of each sample. Defaults to the first reported supported stream format for the device. Use `-v --list-devices` to find supported output formats.')
args = parser.parse_args()
if args.list_devices:
devices = [dict(device) for device in soapy.Device.enumerate()]
for index,device in enumerate(devices):
print(f"{Fore.RED}{index}: {device['label']}{Style.RESET_ALL}")
if args.verbose:
sdr = soapy.Device(device)
device.update(get_capabilities(sdr))
pprint(device)
sys.exit(0 if len(devices) > 0 else 1)
sdr = None
if args.d:
if args.verbose:
print(f"Searching for device '{args.d}'...", file=sys.stderr)
devices = [dict(device) for device in soapy.Device.enumerate()]
for index,device in enumerate(devices):
if args.d in device.values():
if args.verbose:
print(f"Found device '{args.d}' as '{device['label']}'.", file=sys.stderr)
sdr = soapy.Device(device)
break
if sdr is None:
print(f"Unable to connect to device '{args.d}'.", file=sys.stderr)
sys.exit(1)
if args.f is None:
print('No frequency specified, exiting.', file=sys.stderr)
sys.exit(1)
outfile = None
if args.o:
if args.o == '-':
outfile = sys.stdout.buffer
else:
outfile = open(args.o, 'wb+')
frequency = int(prefixed.Float(args.f))
bandwidth = int(prefixed.Float(args.b))
sample_rate = int(prefixed.Float(args.s))
sample_count = int(prefixed.Float(args.n))
gain = float(prefixed.Float(args.g)) if args.g else None
format = args.format
if format is None:
format = sdr.getStreamFormats(soapy.SOAPY_SDR_RX, 0)[0]
if args.verbose:
print(f"Frequency: {frequency} Hz", file=sys.stderr)
print(f"Bandwidth: {bandwidth} Hz", file=sys.stderr)
print(f"Sample rate: {sample_rate} Hz", file=sys.stderr)
print(f"Sample count: {sample_count if sample_count > 0 else 'Continuous'}", file=sys.stderr)
print(f"Gain: {gain} dB" if gain else 'Gain: Automatic', file=sys.stderr)
print(f"Format: {format}", file=sys.stderr)
#apply settings
sdr.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency)
sdr.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate)
sdr.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth)
sdr.setGainMode(soapy.SOAPY_SDR_RX, 0, (gain == None))
if gain is not None:
sdr.setGain(soapy.SOAPY_SDR_RX, 0, gain)
#setup a stream
samples = 1000
buffer = np.array([0] * samples * 2, TYPES[format])
stream = sdr.setupStream(soapy.SOAPY_SDR_RX, FORMATS[format])
sdr.activateStream(stream)
total_samples = 0
keep_going = True
while keep_going:
collect_samples = samples
if sample_count > 0:
collect_samples = min(samples, sample_count - total_samples)
if collect_samples <= 0:
keep_going = False
break
result = sdr.readStream(stream, [buffer], collect_samples)
if args.verbose:
print('Rx:', result.ret, result.flags, result.timeNs, file=sys.stderr)
if result.ret < 1:
print('Stream read failed, exiting.', file=sys.stderr)
keep_going = False
if outfile:
received = int(result.ret * 2)
outfile.write(struct.pack(PACKINGS[format] % received, *buffer[:received]))
if sample_count > 0:
total_samples += result.ret
if outfile and outfile != sys.stdout.buffer:
outfile.close()
#shutdown the stream
sdr.deactivateStream(stream)
sdr.closeStream(stream)