#!/usr/bin/env python3 # FM demodulator based on I/Q (quadrature) samples # # This code is modified from the original demodulator by: # https://github.com/elvis-epx/sdr # No licence information is provided. # import struct import math import random import sys import numpy as np import filters import argparse import prefixed from formats import TYPES parser = argparse.ArgumentParser() parser.add_argument('-v', '--verbose', help='Print additional informational output', action='store_true') parser.add_argument('-f', '--format', choices=list(TYPES.keys()), help='Input sample format', required=True) parser.add_argument('-s', '--sample-rate', metavar='rate', help='Source sample rate (Hz)', required=True) parser.add_argument('-d', '--demod-rate', metavar='rate', help='Output sample rate (Hz)', required=True) # TODO JMT: Output to file #parser.add_argument('-o', metavar='file', help='Specify an output file for demodulated audio. Omit for stdout or use \'-\'.') args = parser.parse_args() INPUT_RATE = int(prefixed.Float(args.sample_rate)) OUTPUT_RATE = int(prefixed.Float(args.demod_rate)) DECIMATION = INPUT_RATE / OUTPUT_RATE if DECIMATION != math.floor(DECIMATION): print(f'The output rate must be an integer divisor of the input rate: {INPUT_RATE}/{OUTPUT_RATE} = {DECIMATION}', file=sys.stderr) sys.exit(1) FORMAT = TYPES[args.format] DT = np.dtype(FORMAT) BYTES_PER_SAMPLE = 2 * DT.itemsize MAX_DEVIATION = 200000.0 # Hz FM_BANDWIDTH = 15000 # Hz STEREO_CARRIER = 38000 # Hz DEVIATION_X_SIGNAL = 0.999 / (math.pi * MAX_DEVIATION / (INPUT_RATE / 2)) pll = math.pi - random.random() * 2 * math.pi last_pilot = 0.0 deviation_avg = math.pi - random.random() * 2 * math.pi last_deviation_avg = deviation_avg tau = 2 * math.pi # Downsample mono audio decimate1 = filters.decimator(DECIMATION) # Deemph + Low-pass filter for mono (L+R) audio lo = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120) # Downsample jstereo audio decimate2 = filters.decimator(DECIMATION) # Deemph + Low-pass filter for joint-stereo demodulated audio (L-R) lo_r = filters.deemphasis(INPUT_RATE, 75, FM_BANDWIDTH, 120) # Band-pass filter for stereo (L-R) modulated audio hi = filters.band_pass(INPUT_RATE, STEREO_CARRIER - FM_BANDWIDTH, STEREO_CARRIER + FM_BANDWIDTH, 120) # Filter to extract pilot signal pilot = filters.band_pass(INPUT_RATE, STEREO_CARRIER / 2 - 100, STEREO_CARRIER / 2 + 100, 120) last_angle = 0.0 remaining_data = b'' while True: # Ingest 0.1s worth of data data = sys.stdin.buffer.read((INPUT_RATE * BYTES_PER_SAMPLE) // 10) if not data: break # TODO JMT: Something about this is broken for BYTES_PER_SAMPLE > 1 #data = remaining_data + data if len(data) < 2 * BYTES_PER_SAMPLE: remaining_data = data continue # Save one sample to next batch, and the odd byte if exists if len(data) % 2 == 1: print("Odd byte, that's odd", file=sys.stderr) remaining_data = data[-3:] data = data[:-1] else: remaining_data = data[-2:] samples = len(data) // BYTES_PER_SAMPLE # Find angle (phase) of I/Q pairs iqdata = np.frombuffer(data, dtype=FORMAT) if args.verbose: print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) if np.issubdtype(FORMAT, np.integer): iinfo = np.iinfo(FORMAT) if np.issubdtype(FORMAT, np.unsignedinteger): iqdata = iqdata - (iinfo.max / 2.0) iqdata = iqdata / (iinfo.max / 2.0) else: iqdata = iqdata / np.float64(iinfo.max) else: iqdata = iqdata.astype(np.float64) if args.verbose: print(iqdata.dtype, iqdata.shape, iqdata, file=sys.stderr) iqdata = iqdata.view(complex) angles = np.angle(iqdata) # Determine phase rotation between samples rotations = np.ediff1d(angles) # Wrap rotations >= +/-180º rotations = (rotations + np.pi) % (2 * np.pi) - np.pi # Convert rotations to baseband signal output_raw = np.multiply(rotations, DEVIATION_X_SIGNAL) output_raw = np.clip(output_raw, -0.999, +0.999) # At this point, output_raw contains two audio signals: # L+R (mono-compatible) and L-R (joint-stereo) modulated in AM-SC, # carrier 38kHz # Downsample and low-pass L+R (mono) signal output_mono = lo.feed(output_raw) output_mono = decimate1.feed(output_mono) # Filter pilot tone detected_pilot = pilot.feed(output_raw) # Separate ultrasonic L-R signal by high-pass filtering output_jstereo_mod = hi.feed(output_raw) output_jstereo = [] # Demodulate L-R, which is AM-SC with 53kHz carrier for n in range(0, len(output_jstereo_mod)): # Advance carrier pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau # Standard demodulation output_jstereo.append(math.cos(pll) * output_jstereo_mod[n]) # Detect pilot zero-crossing cur_pilot = detected_pilot[n] zero_crossed = (cur_pilot * last_pilot) <= 0 last_pilot = cur_pilot if not zero_crossed: continue # When pilot is at 90º or 270º, carrier should be around 180º ideal = math.pi deviation = pll - ideal if deviation > math.pi: deviation -= tau deviation_avg = 0.99 * deviation_avg + 0.01 * deviation rotation = deviation_avg - last_deviation_avg last_deviation_avg = deviation_avg if abs(deviation_avg) > math.pi / 8: # big phase deviation, reset PLL pll = ideal pll = (pll + tau * STEREO_CARRIER / INPUT_RATE) % tau deviation_avg = 0.0 last_deviation_avg = 0.0 # Translate rotation to frequency deviation STEREO_CARRIER /= (1 + (rotation * 1.05) / tau) # Downsample, Low-pass/deemphasis demodulated L-R output_jstereo = lo_r.feed(output_jstereo) output_jstereo = decimate2.feed(output_jstereo) assert len(output_jstereo) == len(output_mono) # Scale to 16-bit and divide by 2 for channel sum output_mono = np.multiply(output_mono, 32767 / 2.0) output_jstereo = np.multiply(output_jstereo, 32767 / 2.0) # Output stereo by adding or subtracting joint-stereo to mono output_left = output_mono + output_jstereo output_right = output_mono - output_jstereo # Interleave L and R samples using np trickery output = np.empty(len(output_mono) * 2, dtype=output_mono.dtype) output[0::2] = output_left output[1::2] = output_right output = output.astype(int) sys.stdout.buffer.write(struct.pack('<%dh' % len(output), *output))