Created an SDRPlay sample generator that is compatible with rtl_sdr interface

This commit is contained in:
Jono Targett 2023-05-10 14:06:04 +09:30
parent 1974dd8605
commit fcb86176c3
3 changed files with 184 additions and 83 deletions

83
demo.py
View File

@ -1,83 +0,0 @@
#! /usr/bin/env python3
#
# Example script for interfacing with SDRPlay radio.
#
# Modified from: https://github.com/pothosware/SoapySDR/wiki/PythonSupport
#
import SoapySDR
from SoapySDR import * #SOAPY_SDR_ constants
import numpy as np
import sys
import struct
#enumerate devices
devices = [dict(device) for device in SoapySDR.Device.enumerate()]
if len(devices) == 0:
print('No SDR devices available.')
sys.exit(1)
#create device instance
sdr = SoapySDR.Device(devices[0])
#apply settings
sdr.setSampleRate(SOAPY_SDR_RX, 0, 1e6)
sdr.setFrequency(SOAPY_SDR_RX, 0, 105500000)
#setup a stream (complex floats)
samples = 10000
buff = np.array([0]*samples*2, np.int16)
rxStream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CS16)
sdr.activateStream(rxStream)
#receive some samples
for i in range(1000):
sr = sdr.readStream(rxStream, [buff], samples*2)
print(sr.ret, sr.flags, sr.timeNs, file=sys.stderr)
sys.stdout.buffer.write(struct.pack('=%dh' % len(buff), *buff))
#shutdown the stream
sdr.deactivateStream(rxStream) #stop streaming
sdr.closeStream(rxStream)
'''
#create a re-usable buffer for rx samples
# https://stackoverflow.com/a/32877245
#while True:
sr = sdr.readStream(rxStream, [buff], samples)
#print(sr.ret) #num samples or error code
#print(sr.flags) #flags set by receive operation
#print(sr.timeNs) #timestamp for receive buffer
# we now have a np buffer of [(re_16:1, im_16:1), re_16:2, im_16:2), ...]
# that needs to become a buffer of [re_8:1, im_8:1, re_8:2, im_8:2, ...]
#print(buff)
#output = buff.flatten('F')
#print(output)
#output = np.empty(len(buff) * 2, dtype=np.float32)
#output[0::2] = output_left
#output[1::2] = output_right
#output = output.astype(int)
#print(output)
#scale_factor = 127 / np.max(buff)
# scale the values using numpy.interp
#uint8_buffer = np.interp(buff, (buff.min(), buff.max()), (-127, 127)).astype(np.int8)
output = buff
#output /= (np.max(np.int16) / np.max(np.int8))
#output /= (32768 / 127)
#output = np.divide(output, 1).astype(np.int8)
'''

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
colorama==0.4.3
numpy==1.17.4
prefixed==0.7.0

181
sdrplay_sample Executable file
View File

@ -0,0 +1,181 @@
#! /usr/bin/env python3
import SoapySDR as soapy
import numpy as np
import sys
import struct
import argparse
import prefixed
from pprint import pprint
from colorama import Fore, Style
FREQUENCY = 105500000
BANDWIDTH = 200000
SAMPLE_RATE = 1000000
SAMPLE_COUNT = 0
BLOCK_SIZE = 16384
FORMATS = {
'CS16': soapy.SOAPY_SDR_CS16,
'CF32': soapy.SOAPY_SDR_CF32,
}
PACKINGS = {
soapy.SOAPY_SDR_CS16: '=%dh',
soapy.SOAPY_SDR_CF32: '=%df',
}
TYPES = {
soapy.SOAPY_SDR_CS16: np.int16,
soapy.SOAPY_SDR_CF32: np.float32,
}
def get_capabilities(device):
def get_direction_capabilities(direction):
return {
'antennas': device.listAntennas(direction, 0),
'gains': device.listGains(direction, 0),
'frequencies': device.listFrequencies(direction, 0),
'sample-rates': device.listSampleRates(direction, 0),
'bandwidths': device.listBandwidths(direction, 0),
'sensors': device.listSensors(direction, 0),
'formats': device.getStreamFormats(direction, 0),
}
capabilities = {
'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX),
'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX),
'clock-sources': device.listClockSources(),
'time-sources': device.listTimeSources(),
'register-interfaces': device.listRegisterInterfaces(),
'gpios': device.listGPIOBanks(),
'uarts': device.listUARTs(),
}
return capabilities
parser = argparse.ArgumentParser(epilog='For numerical quantities, SI & IEC prefix suffixing is supported, ie: 1k -> 1000, 1K -> 1024, 1M -> 1000000.')
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-l', '--list-devices', help='List available devices and exit', action='store_true')
group.add_argument('-d', metavar='device', help='Specify a device by driver or serial')
parser.add_argument('-f', metavar='frequency', help='Frequency to tune to (Hz)')#, required=True)
parser.add_argument('-b', metavar='bandwidth', help='Sampling bandwidth (Hz)', default=BANDWIDTH)
parser.add_argument('-s', metavar='rate', help=f'Sample rate (Hz)', default=SAMPLE_RATE)
parser.add_argument('-g', metavar='gain', help='Signal gain (dB). Omit for automatic gain setting.')
parser.add_argument('-n', metavar='count', help='Specify a number of samples to collect, or 0 for continuous.', default=SAMPLE_COUNT)
parser.add_argument('-o', metavar='file', help='Specify an output file for received samples. For stdout use \'-\'.')
parser.add_argument('--format', metavar='format', help='Specify the binary format of each sample. Defaults to the first reported supported stream format for the device. Use `-v --list-devices` to find supported output formats.')
args = parser.parse_args()
if args.list_devices:
devices = [dict(device) for device in soapy.Device.enumerate()]
for index,device in enumerate(devices):
print(f"{Fore.RED}{index}: {device['label']}{Style.RESET_ALL}")
if args.verbose:
sdr = soapy.Device(device)
device.update(get_capabilities(sdr))
pprint(device)
sys.exit(0 if len(devices) > 0 else 1)
sdr = None
if args.d:
if args.verbose:
print(f"Searching for device '{args.d}'...", file=sys.stderr)
devices = [dict(device) for device in soapy.Device.enumerate()]
for index,device in enumerate(devices):
if args.d in device.values():
if args.verbose:
print(f"Found device '{args.d}' as '{device['label']}'.", file=sys.stderr)
sdr = soapy.Device(device)
break
if sdr is None:
print(f"Unable to connect to device '{args.d}'.", file=sys.stderr)
sys.exit(1)
if args.f is None:
print('No frequency specified, exiting.', file=sys.stderr)
sys.exit(1)
outfile = None
if args.o:
if args.o == '-':
outfile = sys.stdout.buffer
else:
outfile = open(args.o, 'wb+')
frequency = int(prefixed.Float(args.f))
bandwidth = int(prefixed.Float(args.b))
sample_rate = int(prefixed.Float(args.s))
sample_count = int(prefixed.Float(args.n))
gain = float(prefixed.Float(args.g)) if args.g else None
format = args.format
if format is None:
format = sdr.getStreamFormats(soapy.SOAPY_SDR_RX, 0)[0]
format = FORMATS[format]
if args.verbose:
print(f"Frequency: {frequency} Hz", file=sys.stderr)
print(f"Bandwidth: {bandwidth} Hz", file=sys.stderr)
print(f"Sample rate: {sample_rate} Hz", file=sys.stderr)
print(f"Sample count: {sample_count if sample_count > 0 else 'Continuous'}", file=sys.stderr)
print(f"Gain: {gain} dB" if gain else 'Gain: Automatic', file=sys.stderr)
print(f"Format: {format}", file=sys.stderr)
#apply settings
sdr.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency)
sdr.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate)
sdr.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth)
sdr.setGainMode(soapy.SOAPY_SDR_RX, 0, (gain == None))
if gain is not None:
sdr.setGain(soapy.SOAPY_SDR_RX, 0, gain)
#setup a stream
samples = 10000
buffer = np.array([0] * samples * 2, TYPES[format])
stream = sdr.setupStream(soapy.SOAPY_SDR_RX, format)
sdr.activateStream(stream)
total_samples = 0
keep_going = True
while keep_going:
collect_samples = samples
if sample_count > 0:
collect_samples = min(samples, sample_count - total_samples)
if collect_samples <= 0:
keep_going = False
break
result = sdr.readStream(stream, [buffer], collect_samples)
print('Rx:', result.ret, result.flags, result.timeNs, file=sys.stderr)
if result.ret < 1:
print('Stream read failed, exiting.', file=sys.stderr)
keep_going = False
if outfile:
outfile.write(struct.pack(PACKINGS[format] % len(buffer), *buffer))
if sample_count > 0:
total_samples += result.ret
if outfile and outfile != sys.stdout.buffer:
outfile.close()
#shutdown the stream
sdr.deactivateStream(stream)
sdr.closeStream(stream)