Unified byte format mappings

This commit is contained in:
Jono Targett 2023-05-15 12:52:05 +09:30
parent 56bb29ebdd
commit a4e4a11d86
4 changed files with 23 additions and 24 deletions

View File

@ -15,12 +15,12 @@ import numpy as np
import filters
import argparse
import prefixed
from formats import TYPES
from formats import FORMATS
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
parser.add_argument('-f', '--format', choices=list(TYPES.keys()), help='Input sample format', required=True)
parser.add_argument('-f', '--format', choices=list(FORMATS.keys()), help='Input sample format', required=True)
parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True)
parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True)
# TODO JMT: Output to file
@ -35,7 +35,7 @@ if DECIMATION != math.floor(DECIMATION):
print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr)
sys.exit(1)
FORMAT = TYPES[args.format]
FORMAT = FORMATS[args.format].numpy
DT = np.dtype(FORMAT)
BYTES_PER_SAMPLE = 2 * DT.itemsize

View File

@ -1,18 +1,17 @@
import numpy as np
from SoapySDR import SOAPY_SDR_CS16, SOAPY_SDR_CF32
from SoapySDR import *
from dataclasses import dataclass
@dataclass
class FormatSpec:
name: str
soapy: int
numpy: np.dtype
packing: str
FORMATS = {
'CS16': SOAPY_SDR_CS16,
'CF32': SOAPY_SDR_CF32,
}
PACKINGS = {
'CS16': '=%dh',
'CF32': '=%df',
}
TYPES = {
'CU8': np.uint8,
'CS16': np.int16,
'CF32': np.float32,
'CU8': FormatSpec('CU8', SOAPY_SDR_CU8, np.uint8, '=%dB'),
'CS8': FormatSpec('CS8', SOAPY_SDR_CS8, np.int8, '=%db'),
'CS16': FormatSpec('CS16', SOAPY_SDR_CS16, np.int16, '=%dh'),
'CF32': FormatSpec('CF32', SOAPY_SDR_CF32, np.float32, '=%df'),
}

View File

@ -87,7 +87,7 @@ class Radio:
else:
read_size = int(result.ret * 2)
self.demod.stdin.write(
struct.pack(PACKINGS[Radio.FORMAT] % read_size,
struct.pack(FORMATS[Radio.FORMAT].packing % read_size,
*self.buffer[:read_size])
)
@ -111,8 +111,8 @@ class Radio:
stderr=subprocess.DEVNULL
)
self.buffer = np.array([0] * Radio.SAMPLES * 2, TYPES[Radio.FORMAT])
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT])
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
self.device.activateStream(self.stream)
def _cleanup_stream(self):

View File

@ -8,7 +8,7 @@ import argparse
import prefixed
from pprint import pprint
from colorama import Fore, Style
from formats import FORMATS, PACKINGS, TYPES
from formats import FORMATS
FREQUENCY = 105500000
@ -130,8 +130,8 @@ if args.d:
#setup a stream
samples = 1000
buffer = np.array([0] * samples * 2, TYPES[format])
stream = sdr.setupStream(soapy.SOAPY_SDR_RX, FORMATS[format])
buffer = np.array([0] * samples * 2, FORMATS[format].numpy)
stream = sdr.setupStream(soapy.SOAPY_SDR_RX, FORMATS[format].soapy)
sdr.activateStream(stream)
@ -156,7 +156,7 @@ if args.d:
if outfile:
received = int(result.ret * 2)
outfile.write(struct.pack(PACKINGS[format] % received, *buffer[:received]))
outfile.write(struct.pack(FORMATS[format].packing % received, *buffer[:received]))
if sample_count > 0:
total_samples += result.ret