diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d04a1ec --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,43 @@ +version: "3" + +services: + kafka: + container_name: kafka + hostname: kafka + image: bitnami/kafka:latest + ports: + # Flip the ports around, external gets default. + - "9092:9094" + - "9094:9092" + networks: + - kafka-internal + volumes: + - "kafka_data:/bitnami" + environment: + # https://hub.docker.com/r/bitnami/kafka/ + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + networks: + - kafka-internal + environment: + # https://docs.kafka-ui.provectus.io/configuration/misc-configuration-properties + KAFKA_CLUSTERS_0_NAME: test + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + depends_on: + - kafka + +volumes: + kafka_data: + driver: local + +networks: + kafka-internal: \ No newline at end of file diff --git a/kafkalogging.py b/kafkalogging.py new file mode 100644 index 0000000..806ad53 --- /dev/null +++ b/kafkalogging.py @@ -0,0 +1,64 @@ +from kafka import KafkaProducer +import subprocess +import os + +kafka_servers = ['localhost:9092'] +producers = {} + + +# Do anything that might take more than a few ms early, otherwise you hit overflow errors +def prep_io(process, process_name=None): + for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items(): + if pipe is None: + continue + + if os.get_blocking(pipe.fileno()): + os.set_blocking(pipe.fileno(), False) + + if process.pid not in producers: + producers[process.pid] = {} + if pipe_name not in producers[process.pid]: + producers[process.pid][pipe_name] = KafkaProducer( + bootstrap_servers=kafka_servers, + max_block_ms=200 + ) + + +# Inspired by gist: +# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368 +def check_io(process, process_name=None): + for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items(): + if pipe is None: + continue + + topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}" + + while True: + # The python official docs recommend using Popen.communicate() instead of reading + # from the pipes directly. Using Popen.communicate is a blocking call though, + # which would stall threads for processes producing no output. + output = pipe.readline() + if output: + producers[process.pid][pipe_name].send(topic_name, value=output) + else: + break + + producers[process.pid][pipe_name].flush() + + + +if __name__ == '__main__': + import time + + ping = subprocess.Popen( + [ + '/usr/bin/ping', + '1.1.1.1', + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + while True: + check_io(ping, 'ping') + time.sleep(0.1) \ No newline at end of file diff --git a/radio.py b/radio.py index c36f949..f10eefb 100644 --- a/radio.py +++ b/radio.py @@ -7,6 +7,7 @@ import subprocess import struct from soapyhelpers import * from samplerates import * +from kafkalogging import * class Radio: @@ -131,6 +132,11 @@ class Radio: *self.buffer[:read_size]) ) + # handle subprocess logs + check_io(self.demod, f"{self.name}-demod") + check_io(self.playback, f"{self.name}-playback") + + self._cleanup_stream() def _init_stream(self): @@ -140,17 +146,20 @@ class Radio: '-f', 'rtsp', f"rtsp://localhost{self._stream_path()}" ], stdin=subprocess.PIPE, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) self.demod = subprocess.Popen( ['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)], stdin=subprocess.PIPE, stdout=self.playback.stdin, - stderr=subprocess.DEVNULL + stderr=subprocess.PIPE ) + prep_io(self.demod, f"{self.name}-demod") + prep_io(self.playback, f"{self.name}-playback") + self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy) self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy) result = self.device.activateStream(self.stream) diff --git a/scripts/connect.sh b/scripts/connect.sh new file mode 100755 index 0000000..a6079eb --- /dev/null +++ b/scripts/connect.sh @@ -0,0 +1,34 @@ +#! /bin/bash + +set -e + + +server="localhost:5000" +radio="sdrplay" +frequency="105.5M" + +trap deinit INT + +deinit() { + curl $server/radio/$radio/end + curl $server/radio/$radio/disconnect + #kill $pid + killall ffplay + echo -n "" +} + +init() { + curl $server/radio/$radio/connect + curl $server/radio/$radio/configure/$frequency + curl $server/radio/$radio/start +} + +init +echo "Waiting for stream to become active..." +sleep 3 +ffplay -nodisp -hide_banner rtsp://localhost:8554/radio/sdrplay & +pid=$! + +while true; do + sleep 1 +done \ No newline at end of file