#! /usr/bin/env python3 import SoapySDR as soapy import numpy as np import sys import struct import argparse import prefixed from pprint import pprint from colorama import Fore, Style from formats import FORMATS FREQUENCY = 105500000 BANDWIDTH = 200000 SAMPLE_RATE = 1000000 SAMPLE_COUNT = 0 BLOCK_SIZE = 16384 def get_capabilities(device): def get_direction_capabilities(direction): return { 'antennas': device.listAntennas(direction, 0), 'gains': device.listGains(direction, 0), 'frequencies': device.listFrequencies(direction, 0), 'sample-rates': device.listSampleRates(direction, 0), 'bandwidths': device.listBandwidths(direction, 0), 'sensors': device.listSensors(direction, 0), 'formats': device.getStreamFormats(direction, 0), } capabilities = { 'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX), 'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX), 'clock-sources': device.listClockSources(), 'time-sources': device.listTimeSources(), 'register-interfaces': device.listRegisterInterfaces(), 'gpios': device.listGPIOBanks(), 'uarts': device.listUARTs(), } return capabilities parser = argparse.ArgumentParser(epilog='For numerical quantities, SI & IEC prefix suffixing is supported, ie: 1k -> 1000, 1K -> 1024, 1M -> 1000000.') parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list-devices', help='List available devices and exit', action='store_true') group.add_argument('-d', metavar='device', help='Specify a device by driver or serial') parser.add_argument('-f', metavar='frequency', help='Frequency to tune to (Hz)')#, required=True) parser.add_argument('-b', metavar='bandwidth', help='Sampling bandwidth (Hz)', default=BANDWIDTH) parser.add_argument('-s', metavar='rate', help=f'Sample rate (Hz)', default=SAMPLE_RATE) parser.add_argument('-g', metavar='gain', help='Signal gain (dB). Omit for automatic gain setting.') parser.add_argument('-n', metavar='count', help='Specify a number of samples to collect, or 0 for continuous.', default=SAMPLE_COUNT) parser.add_argument('-o', metavar='file', help='Specify an output file for received samples. For stdout use \'-\'.') parser.add_argument('--format', choices=FORMATS.keys(), help='Specify the binary format of each sample. Defaults to the first reported supported stream format for the device. Use `-v --list-devices` to find supported output formats.') args = parser.parse_args() if args.list_devices: devices = [dict(device) for device in soapy.Device.enumerate()] for index,device in enumerate(devices): print(f"{Fore.RED}{index}: {device['label']}{Style.RESET_ALL}") if args.verbose: sdr = soapy.Device(device) device.update(get_capabilities(sdr)) pprint(device) sys.exit(0 if len(devices) > 0 else 1) sdr = None if args.d: if args.verbose: print(f"Searching for device '{args.d}'...", file=sys.stderr) devices = [dict(device) for device in soapy.Device.enumerate()] for index,device in enumerate(devices): if args.d in device.values(): if args.verbose: print(f"Found device '{args.d}' as '{device['label']}'.", file=sys.stderr) sdr = soapy.Device(device) break if sdr is None: print(f"Unable to connect to device '{args.d}'.", file=sys.stderr) sys.exit(1) if args.f is None: print('No frequency specified, exiting.', file=sys.stderr) sys.exit(1) outfile = None if args.o: if args.o == '-': outfile = sys.stdout.buffer else: outfile = open(args.o, 'wb+') frequency = int(prefixed.Float(args.f)) bandwidth = int(prefixed.Float(args.b)) sample_rate = int(prefixed.Float(args.s)) sample_count = int(prefixed.Float(args.n)) gain = float(prefixed.Float(args.g)) if args.g else None format = args.format if format is None: format = sdr.getStreamFormats(soapy.SOAPY_SDR_RX, 0)[0] if args.verbose: print(f"Frequency: {frequency} Hz", file=sys.stderr) print(f"Bandwidth: {bandwidth} Hz", file=sys.stderr) print(f"Sample rate: {sample_rate} Hz", file=sys.stderr) print(f"Sample count: {sample_count if sample_count > 0 else 'Continuous'}", file=sys.stderr) print(f"Gain: {gain} dB" if gain else 'Gain: Automatic', file=sys.stderr) print(f"Format: {format}", file=sys.stderr) #apply settings sdr.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency) sdr.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate) sdr.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth) sdr.setGainMode(soapy.SOAPY_SDR_RX, 0, (gain == None)) if gain is not None: sdr.setGain(soapy.SOAPY_SDR_RX, 0, gain) #setup a stream samples = 1000 buffer = np.array([0] * samples * 2, FORMATS[format].numpy) stream = sdr.setupStream(soapy.SOAPY_SDR_RX, FORMATS[format].soapy) sdr.activateStream(stream) total_samples = 0 keep_going = True while keep_going: collect_samples = samples if sample_count > 0: collect_samples = min(samples, sample_count - total_samples) if collect_samples <= 0: keep_going = False break result = sdr.readStream(stream, [buffer], collect_samples) if args.verbose: print('Rx:', result.ret, result.flags, result.timeNs, file=sys.stderr) if result.ret < 1: print('Stream read failed, exiting.', file=sys.stderr) keep_going = False if outfile: received = int(result.ret * 2) outfile.write(struct.pack(FORMATS[format].packing % received, *buffer[:received])) if sample_count > 0: total_samples += result.ret if outfile and outfile != sys.stdout.buffer: outfile.close() #shutdown the stream sdr.deactivateStream(stream) sdr.closeStream(stream)