Working FM radio demodulator for NESDR smart radio

This commit is contained in:
Jono Targett 2023-05-05 11:19:31 +09:30
parent 9c2b1c0ad5
commit 1ef2ee731d
4 changed files with 404 additions and 0 deletions

165
filters.py Normal file
View File

@ -0,0 +1,165 @@
#!/usr/bin/env python3
# Common DSP filters using pure Python
#
# This code belongs to https://github.com/elvis-epx/sdr
# No licence information is provided.
#
import numpy, math, sys, time
from numpy import fft
def impulse(mask):
''' Convert frequency domain mask to time-domain '''
# Negative side, a mirror of positive side
negatives = mask[1:-1]
negatives.reverse()
mask = mask + negatives
fft_length = len(mask)
# Convert FFT filter mask to FIR coefficients
impulse_response = fft.ifft(mask).real.tolist()
# swap left and right sides
left = impulse_response[:fft_length // 2]
right = impulse_response[fft_length // 2:]
impulse_response = right + left
return impulse_response
def lo_mask(sample_rate, tap_count, freq, dboct):
''' Create a freq domain mask for a lowpass filter '''
order = dboct / 6
max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0)
# Convert freq to filter step unit
freq /= f2s
l = tap_count // 2
mask = []
for f in range(0, l+1):
H = 1.0 / ( 1 + (f / freq) ** (2 * order) ) ** 0.5
mask.append(H)
return mask
def hi_mask(sample_rate, tap_count, freq, dboct):
''' Create a freq domain mask for a highpass filter '''
order = dboct / 6
max_freq = sample_rate / 2.0
f2s = max_freq / (tap_count / 2.0)
# Convert freq frequency to filter step unit
freq /= f2s
l = tap_count // 2
mask = []
for f in range(0, l+1):
H = 1.0 / ( 1 + (freq / (f + 0.0001)) ** (2 * order) ) ** 0.5
mask.append(H)
return mask
def combine_masks(mask1, mask2):
''' Combine two filter masks '''
assert len(mask1) == len(mask2)
return [ mask1[i] * mask2[i] for i in range(0, len(mask1)) ]
def taps(sample_rate, freq, dboct, is_highpass):
cutoff_octaves = 60 / dboct
if is_highpass:
cutoff = freq / 2 ** cutoff_octaves
else:
cutoff = freq * 2 ** cutoff_octaves
cutoff = min(cutoff, sample_rate / 2)
transition_band = abs(freq - cutoff)
Bt = transition_band / sample_rate
taps = int(60 / (22 * Bt))
# print("Freq=%f,%f number of taps: %d" % (freq, cutoff, taps), file=sys.stderr)
return taps
class filter:
def __init__(self, sample_rate, cutoff):
raise "Abstract"
def feed(self, original):
unfiltered = numpy.concatenate((self.buf, original))
self.buf = unfiltered[-len(self.coefs):]
filtered = numpy.convolve(unfiltered, self.coefs, mode='valid')
assert len(filtered) == len(original) + 1
return filtered[1:]
class low_pass(filter):
def __init__(self, sample_rate, f, dbo):
tap_count = taps(sample_rate, f, dbo, False)
mask = lo_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
class high_pass(filter):
def __init__(self, sample_rate, f, dbo):
tap_count = taps(sample_rate, f, dbo, True)
mask = hi_mask(sample_rate, tap_count, f, dbo)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
class band_pass(filter):
def __init__(self, sample_rate, lo, hi, dbo):
tap_count = max(taps(sample_rate, lo, dbo, True),
taps(sample_rate, hi, dbo, False))
lomask = lo_mask(sample_rate, tap_count, hi, dbo)
himask = hi_mask(sample_rate, tap_count, lo, dbo)
mask = combine_masks(lomask, himask)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
class deemphasis(filter):
def __init__(self, sample_rate, us, hi, final_dbo):
# us = RC constant of the hypothetical deemphasis filter
us /= 1000000
# 0..lo is not deemphasized
lo = 1.0 / (2 * math.pi * us)
# attenuation from lo to hi should be 10dB
octaves = math.log(hi / lo) / math.log(2)
# slope in dB/octave of deemphasis filter
dedbo = 10 / octaves
tap_count = max(taps(sample_rate, lo, dedbo, False),
taps(sample_rate, hi, final_dbo, False))
# Calculate deemphasis filter
demask = lo_mask(sample_rate, tap_count, lo, dedbo)
# Calculate low-pass filter after deemphasis
fmask = lo_mask(sample_rate, tap_count, hi, final_dbo)
mask = combine_masks(demask, fmask)
self.coefs = impulse(mask)
self.buf = [ 0 for n in self.coefs ]
class decimator(filter):
def __init__(self, factor):
self.buf2 = []
self.factor = int(factor)
def feed(self, original):
original = numpy.concatenate((self.buf2, original))
# Gets the last n-th sample of every n (n = factor)
# If e.g. gets 12 samples, gets s[4] and s[9], and
# stoves s[10:] to the next round
'''
decimated = [ original[ self.factor * i + self.factor - 1 ] \
for i in range(0, len(original) // self.factor) ]
'''
decimated = original[(self.factor - 1)::self.factor]
self.buf2 = original[:-len(original) % self.factor]
return decimated

223
fm1s.py Executable file
View File

@ -0,0 +1,223 @@
#!/usr/bin/env python3
# FM demodulator based on I/Q (quadrature) samples
#
# This code belongs to https://github.com/elvis-epx/sdr
# No licence information is provided.
#
import struct, math, random, sys, numpy, filters, time
optimized = "-o" in sys.argv
debug_mode = "-d" in sys.argv
disable_pll = "--disable-pll" in sys.argv
if disable_pll:
optimized = False
if optimized:
import fastfm # Cython
MAX_DEVIATION = 200000.0 # Hz
INPUT_RATE = 256000
OUTPUT_RATE = 32000
#INPUT_RATE = 1e6
#OUTPUT_RATE = 50000
if debug_mode:
OUTPUT_RATE=256000
DECIMATION = INPUT_RATE / OUTPUT_RATE
assert DECIMATION == math.floor(DECIMATION)
FM_BANDWIDTH = 15000 # Hz
STEREO_CARRIER = 38000 # Hz
DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2))
pll = math.pi - random.random() * 2 * math.pi
last_pilot = 0.0
deviation_avg = math.pi - random.random() * 2 * math.pi
last_deviation_avg = deviation_avg
tau = 2 * math.pi
# Downsample mono audio
decimate1 = filters.decimator(DECIMATION)
# Deemph + Low-pass filter for mono (L+R) audio
lo = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Downsample jstereo audio
decimate2 = filters.decimator(DECIMATION)
# Deemph + Low-pass filter for joint-stereo demodulated audio (L-R)
lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120)
# Band-pass filter for stereo (L-R) modulated audio
hi = filters.band_pass(INPUT_RATE,
STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120)
# Filter to extract pilot signal
pilot = filters.band_pass(INPUT_RATE,
STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120)
last_angle = 0.0
remaining_data = b''
while True:
# Ingest 0.1s worth of data
data = sys.stdin.buffer.read((INPUT_RATE * 2) // 10)
if not data:
break
data = remaining_data + data
if len(data) < 4:
remaining_data = data
continue
# Save one sample to next batch, and the odd byte if exists
if len(data) % 2 == 1:
print("Odd byte, that's odd", file=sys.stderr)
remaining_data = data[-3:]
data = data[:-1]
else:
remaining_data = data[-2:]
samples = len(data) // 2
# Find angle (phase) of I/Q pairs
iqdata = numpy.frombuffer(data, dtype=numpy.uint8)
iqdata = iqdata - 127.5
iqdata = iqdata / 128.0
iqdata = iqdata.view(complex)
angles = numpy.angle(iqdata)
# Determine phase rotation between samples
# (Output one element less, that's we always save last sample
# in remaining_data)
rotations = numpy.ediff1d(angles)
# Wrap rotations >= +/-180º
rotations = (rotations + numpy.pi) % (2 * numpy.pi) - numpy.pi
# Convert rotations to baseband signal
output_raw = numpy.multiply(rotations, DEVIATION_X_SIGNAL)
output_raw = numpy.clip(output_raw, -0.999, +0.999)
# At this point, output_raw contains two audio signals:
# L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC,
# carrier 38kHz
# Downsample and low-pass L+R (mono) signal
output_mono = lo.feed(output_raw)
output_mono = decimate1.feed(output_mono)
# Filter pilot tone
detected_pilot = pilot.feed(output_raw)
# Separate ultrasonic L-R signal by high-pass filtering
output_jstereo_mod = hi.feed(output_raw)
# Demodulate L-R, which is AM-SC with 53kHz carrier
if optimized:
output_jstereo, pll, STEREO_CARRIER, \
last_pilot, deviation_avg, last_deviation_avg = \
fastfm.demod_stereo(output_jstereo_mod,
pll,
STEREO_CARRIER,
INPUT_RATE,
detected_pilot,
last_pilot,
deviation_avg,
last_deviation_avg)
else:
output_jstereo = []
for n in range(0, len(output_jstereo_mod)):
# Advance carrier
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
# Standard demodulation
output_jstereo.append(math.cos(pll) * output_jstereo_mod[n])
if disable_pll:
continue
############ Carrier PLL #################
# Detect pilot zero-crossing
cur_pilot = detected_pilot[n]
zero_crossed = (cur_pilot * last_pilot) <= 0
last_pilot = cur_pilot
if not zero_crossed:
continue
# When pilot is at 90º or 270º, carrier should be around 180º
# t=0 => cos(t) = 1, cos(2t) = 1
# t=π/2 => cos(t) = 0, cos(2t) = -1
# t=π => cos(t) = -1, cos(2t) = 1
# t=-π/2 => cos(t) = 0, cos(2t) = -1
ideal = math.pi
deviation = pll - ideal
if deviation > math.pi:
# 350º => -10º
deviation -= tau
deviation_avg = 0.99 * deviation_avg + 0.01 * deviation
rotation = deviation_avg - last_deviation_avg
last_deviation_avg = deviation_avg
if abs(deviation_avg) > math.pi / 8:
# big phase deviation, reset PLL
# print("Resetting PLL", file=sys.stderr)
pll = ideal
pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau
deviation_avg = 0.0
last_deviation_avg = 0.0
# Translate rotation to frequency deviation e.g.
# cos(tau + 3.6º) = cos(1.01 * tau)
# cos(tau - 9º) = cos(tau * 0.975)
#
# Overcorrect by 5% to (try to) sync phase,
# otherwise only the frequency would be synced.
STEREO_CARRIER /= (1 + (rotation * 1.05) / tau)
'''
print("%d deviationavg=%f rotation=%f freq=%f" %
(n,
deviation_avg * 180 / math.pi,
rotation * 180 / math.pi,
STEREO_CARRIER),
file=sys.stderr)
time.sleep(0.05)
'''
# Downsample, Low-pass/deemphasis demodulated L-R
output_jstereo = lo_r.feed(output_jstereo)
output_jstereo = decimate2.feed(output_jstereo)
assert len(output_jstereo) == len(output_mono)
# Scale to 16-bit and divide by 2 for channel sum
output_mono = numpy.multiply(output_mono, 32767 / 2.0)
output_jstereo = numpy.multiply(output_jstereo, 32767 / 2.0)
# Output stereo by adding or subtracting joint-stereo to mono
output_left = output_mono + output_jstereo
output_right = output_mono - output_jstereo
if not debug_mode:
# Interleave L and R samples using NumPy trickery
output = numpy.empty(len(output_mono) * 2, dtype=output_mono.dtype)
output[0::2] = output_left
output[1::2] = output_right
output = output.astype(int)
else:
output = numpy.empty(len(output_mono) * 3, dtype=output_mono.dtype)
output[0::3] = output_mono
output[1::3] = output_jstereo
output[2::3] = numpy.multiply(detected_pilot, 32767)
output = output.astype(int)
sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))

11
fm_rt_stereo Executable file
View File

@ -0,0 +1,11 @@
#!/bin/sh -x
# Decode and play stereo broadcast FM in realtime.
#
# This code belongs to https://github.com/elvis-epx/sdr
# No licence information is provided.
#
rtl_sdr -f 106.3M -s 256k - | ./fm1s.py | sox -t raw -r 32000 -b 16 -c 2 -L -e signed-integer - -d

5
setup Executable file
View File

@ -0,0 +1,5 @@
#! /bin/bash
sudo apt install rtl-sdr portaudio19-dev python3-all-dev
pip install pyaudio filters