sdrplay-fm-radio/fm_demod.py

205 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
# FM demodulator based on I/Q (quadrature) samples
#
# This code is modified from the original demodulator by:
# https://github.com/elvis-epx/sdr
# No licence information is provided.
#
import struct
import math
import random
import sys
import numpy as np
import filters
import argparse
import prefixed
TYPES = {
'CU8': np.uint8,
'CS16': np.int16,
'CF32': np.float32,
}
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true')
parser.add_argument('-f', '--format', choices=list(TYPES.keys()), help='Input sample format', required=True)
parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True)
parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True)
# TODO JMT: Output to file
#parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.')
args = parser.parse_args()
INPUT_RATE = int(prefixed.Float(args.sample_rate))
OUTPUT_RATE = int(prefixed.Float(args.demod_rate))
DECIMATION = INPUT_RATE / OUTPUT_RATE
if DECIMATION != math.floor(DECIMATION):
print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr)
sys.exit(1)
FORMAT = TYPES[args.format]
DT = np.dtype(FORMAT)
BYTES_PER_SAMPLE = 2 * DT.itemsize
MAX_DEVIATION = 200000.0 # Hz
FM_BANDWIDTH = 15000 # Hz
STEREO_CARRIER = 38000 # Hz
DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2))
pll = math.pi - random.random() * 2 * math.pi
last_pilot = 0.0
deviation_avg = math.pi - random.random() * 2 * math.pi
last_deviation_avg = deviation_avg
tau = 2 * math.pi
# Downsample mono audio
decimate1 = filters.decimator(DECIMATION)
# Deemph + Low-pass filter for mono (L+R) audio
lo = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Downsample jstereo audio
decimate2 = filters.decimator(DECIMATION)
# Deemph + Low-pass filter for joint-stereo demodulated audio (L-R)
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Band-pass filter for stereo (L-R) modulated audio
hi = filters.band_pass(INPUT_RATE,
STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120)
# Filter to extract pilot signal
pilot = filters.band_pass(INPUT_RATE,
STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120)
last_angle = 0.0
remaining_data = b''
while True:
# Ingest 0.1s worth of data
data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10)
if not data:
break
# TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1
#data = remaining_data + data
if len(data) < 2 * BYTES_PER_SAMPLE:
remaining_data = data
continue
# Save one sample to next batch, and the odd byte if exists
if len(data) % 2 == 1:
print("Odd byte, that's odd", file=sys.stderr)
remaining_data = data[-3:]
data = data[:-1]
else:
remaining_data = data[-2:]
samples = len(data) // BYTES_PER_SAMPLE
# Find angle (phase) of I/Q pairs
iqdata = np.frombuffer(data, dtype=FORMAT)
if args.verbose:
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
if np.issubdtype(FORMAT, np.integer):
iinfo = np.iinfo(FORMAT)
if np.issubdtype(FORMAT, np.unsignedinteger):
iqdata = iqdata - (iinfo.max / 2.0)
iqdata = iqdata / (iinfo.max / 2.0)
else:
iqdata = iqdata / np.float64(iinfo.max)
else:
iqdata = iqdata.astype(np.float64)
if args.verbose:
print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr)
iqdata = iqdata.view(complex)
angles = np.angle(iqdata)
# Determine phase rotation between samples
rotations = np.ediff1d(angles)
# Wrap rotations >= +/-180º
rotations = (rotations + np.pi) % (2 * np.pi) - np.pi
# Convert rotations to baseband signal
output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL)
output_raw = np.clip(output_raw, -0.999, +0.999)
# At this point, output_raw contains two audio signals:
# L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC,
# carrier 38kHz
# Downsample and low-pass L+R (mono) signal
output_mono = lo.feed(output_raw)
output_mono = decimate1.feed(output_mono)
# Filter pilot tone
detected_pilot = pilot.feed(output_raw)
# Separate ultrasonic L-R signal by high-pass filtering
output_jstereo_mod = hi.feed(output_raw)
output_jstereo = []
# Demodulate L-R, which is AM-SC with 53kHz carrier
for n in range(0, len(output_jstereo_mod)):
# Advance carrier
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
# Standard demodulation
output_jstereo.append(math.cos(pll) * output_jstereo_mod[n])
# Detect pilot zero-crossing
cur_pilot = detected_pilot[n]
zero_crossed = (cur_pilot * last_pilot) <= 0
last_pilot = cur_pilot
if not zero_crossed:
continue
# When pilot is at 90º or 270º, carrier should be around 180º
ideal = math.pi
deviation = pll - ideal
if deviation > math.pi:
deviation -= tau
deviation_avg = 0.99 * deviation_avg + 0.01 * deviation
rotation = deviation_avg - last_deviation_avg
last_deviation_avg = deviation_avg
if abs(deviation_avg) > math.pi / 8:
# big phase deviation, reset PLL
pll = ideal
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
deviation_avg = 0.0
last_deviation_avg = 0.0
# Translate rotation to frequency deviation
STEREO_CARRIER /= (1 + (rotation * 1.05) / tau)
# Downsample, Low-pass/deemphasis demodulated L-R
output_jstereo = lo_r.feed(output_jstereo)
output_jstereo = decimate2.feed(output_jstereo)
assert len(output_jstereo) == len(output_mono)
# Scale to 16-bit and divide by 2 for channel sum
output_mono = np.multiply(output_mono, 32767 / 2.0)
output_jstereo = np.multiply(output_jstereo, 32767 / 2.0)
# Output stereo by adding or subtracting joint-stereo to mono
output_left = output_mono + output_jstereo
output_right = output_mono - output_jstereo
# Interleave L and R samples using np trickery
output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype)
output[0::2] = output_left
output[1::2] = output_right
output = output.astype(int)
sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))