#! /usr/bin/env python3 import SoapySDR as soapy import numpy as np import sys import struct import argparse import prefixed from pprint import pprint from colorama import Fore, Style FREQUENCY = 105500000 BANDWIDTH = 200000 SAMPLE_RATE = 1000000 SAMPLE_COUNT = 0 BLOCK_SIZE = 16384 FORMATS = { 'CS16': soapy.SOAPY_SDR_CS16, 'CF32': soapy.SOAPY_SDR_CF32, } PACKINGS = { soapy.SOAPY_SDR_CS16: '=%dh', soapy.SOAPY_SDR_CF32: '=%df', } TYPES = { soapy.SOAPY_SDR_CS16: np.int16, soapy.SOAPY_SDR_CF32: np.float32, } def get_capabilities(device): def get_direction_capabilities(direction): return { 'antennas': device.listAntennas(direction, 0), 'gains': device.listGains(direction, 0), 'frequencies': device.listFrequencies(direction, 0), 'sample-rates': device.listSampleRates(direction, 0), 'bandwidths': device.listBandwidths(direction, 0), 'sensors': device.listSensors(direction, 0), 'formats': device.getStreamFormats(direction, 0), } capabilities = { 'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX), 'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX), 'clock-sources': device.listClockSources(), 'time-sources': device.listTimeSources(), 'register-interfaces': device.listRegisterInterfaces(), 'gpios': device.listGPIOBanks(), 'uarts': device.listUARTs(), } return capabilities parser = argparse.ArgumentParser(epilog='For numerical quantities, SI & IEC prefix suffixing is supported, ie: 1k -> 1000, 1K -> 1024, 1M -> 1000000.') parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list-devices', help='List available devices and exit', action='store_true') group.add_argument('-d', metavar='device', help='Specify a device by driver or serial') parser.add_argument('-f', metavar='frequency', help='Frequency to tune to (Hz)')#, required=True) parser.add_argument('-b', metavar='bandwidth', help='Sampling bandwidth (Hz)', default=BANDWIDTH) parser.add_argument('-s', metavar='rate', help=f'Sample rate (Hz)', default=SAMPLE_RATE) parser.add_argument('-g', metavar='gain', help='Signal gain (dB). Omit for automatic gain setting.') parser.add_argument('-n', metavar='count', help='Specify a number of samples to collect, or 0 for continuous.', default=SAMPLE_COUNT) parser.add_argument('-o', metavar='file', help='Specify an output file for received samples. For stdout use \'-\'.') parser.add_argument('--format', choices=FORMATS.keys(), help='Specify the binary format of each sample. Defaults to the first reported supported stream format for the device. Use `-v --list-devices` to find supported output formats.') args = parser.parse_args() if args.list_devices: devices = [dict(device) for device in soapy.Device.enumerate()] for index,device in enumerate(devices): print(f"{Fore.RED}{index}: {device['label']}{Style.RESET_ALL}") if args.verbose: sdr = soapy.Device(device) device.update(get_capabilities(sdr)) pprint(device) sys.exit(0 if len(devices) > 0 else 1) sdr = None if args.d: if args.verbose: print(f"Searching for device '{args.d}'...", file=sys.stderr) devices = [dict(device) for device in soapy.Device.enumerate()] for index,device in enumerate(devices): if args.d in device.values(): if args.verbose: print(f"Found device '{args.d}' as '{device['label']}'.", file=sys.stderr) sdr = soapy.Device(device) break if sdr is None: print(f"Unable to connect to device '{args.d}'.", file=sys.stderr) sys.exit(1) if args.f is None: print('No frequency specified, exiting.', file=sys.stderr) sys.exit(1) outfile = None if args.o: if args.o == '-': outfile = sys.stdout.buffer else: outfile = open(args.o, 'wb+') frequency = int(prefixed.Float(args.f)) bandwidth = int(prefixed.Float(args.b)) sample_rate = int(prefixed.Float(args.s)) sample_count = int(prefixed.Float(args.n)) gain = float(prefixed.Float(args.g)) if args.g else None format = args.format if format is None: format = sdr.getStreamFormats(soapy.SOAPY_SDR_RX, 0)[0] format = FORMATS[format] if args.verbose: print(f"Frequency: {frequency} Hz", file=sys.stderr) print(f"Bandwidth: {bandwidth} Hz", file=sys.stderr) print(f"Sample rate: {sample_rate} Hz", file=sys.stderr) print(f"Sample count: {sample_count if sample_count > 0 else 'Continuous'}", file=sys.stderr) print(f"Gain: {gain} dB" if gain else 'Gain: Automatic', file=sys.stderr) print(f"Format: {format}", file=sys.stderr) #apply settings sdr.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency) sdr.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate) sdr.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth) sdr.setGainMode(soapy.SOAPY_SDR_RX, 0, (gain == None)) if gain is not None: sdr.setGain(soapy.SOAPY_SDR_RX, 0, gain) #setup a stream samples = 1000 buffer = np.array([0] * samples * 2, TYPES[format]) stream = sdr.setupStream(soapy.SOAPY_SDR_RX, format) sdr.activateStream(stream) total_samples = 0 keep_going = True while keep_going: collect_samples = samples if sample_count > 0: collect_samples = min(samples, sample_count - total_samples) if collect_samples <= 0: keep_going = False break result = sdr.readStream(stream, [buffer], collect_samples) if args.verbose: print('Rx:', result.ret, result.flags, result.timeNs, file=sys.stderr) if result.ret != samples: continue if result.ret < 1: print('Stream read failed, exiting.', file=sys.stderr) keep_going = False if outfile: outfile.write(struct.pack(PACKINGS[format] % len(buffer), *buffer)) if sample_count > 0: total_samples += result.ret if outfile and outfile != sys.stdout.buffer: outfile.close() #shutdown the stream sdr.deactivateStream(stream) sdr.closeStream(stream)