Compare commits

...

1 Commits

Author SHA1 Message Date
d84268847c Added first run of kafka logging 2023-06-15 13:33:32 +09:30
4 changed files with 153 additions and 3 deletions

43
docker-compose.yml Normal file
View File

@ -0,0 +1,43 @@
version: "3"
services:
kafka:
container_name: kafka
hostname: kafka
image: bitnami/kafka:latest
ports:
# Flip the ports around, external gets default.
- "9092:9094"
- "9094:9092"
networks:
- kafka-internal
volumes:
- "kafka_data:/bitnami"
environment:
# https://hub.docker.com/r/bitnami/kafka/
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
networks:
- kafka-internal
environment:
# https://docs.kafka-ui.provectus.io/configuration/misc-configuration-properties
KAFKA_CLUSTERS_0_NAME: test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
volumes:
kafka_data:
driver: local
networks:
kafka-internal:

64
kafkalogging.py Normal file
View File

@ -0,0 +1,64 @@
from kafka import KafkaProducer
import subprocess
import os
kafka_servers = ['localhost:9092']
producers = {}
# Do anything that might take more than a few ms early, otherwise you hit overflow errors
def prep_io(process, process_name=None):
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
if os.get_blocking(pipe.fileno()):
os.set_blocking(pipe.fileno(), False)
if process.pid not in producers:
producers[process.pid] = {}
if pipe_name not in producers[process.pid]:
producers[process.pid][pipe_name] = KafkaProducer(
bootstrap_servers=kafka_servers,
max_block_ms=200
)
# Inspired by gist:
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
def check_io(process, process_name=None):
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
while True:
# The python official docs recommend using Popen.communicate() instead of reading
# from the pipes directly. Using Popen.communicate is a blocking call though,
# which would stall threads for processes producing no output.
output = pipe.readline()
if output:
producers[process.pid][pipe_name].send(topic_name, value=output)
else:
break
producers[process.pid][pipe_name].flush()
if __name__ == '__main__':
import time
ping = subprocess.Popen(
[
'/usr/bin/ping',
'1.1.1.1',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
while True:
check_io(ping, 'ping')
time.sleep(0.1)

View File

@ -7,6 +7,7 @@ import subprocess
import struct
from soapyhelpers import *
from samplerates import *
from kafkalogging import *
class Radio:
@ -131,6 +132,11 @@ class Radio:
*self.buffer[:read_size])
)
# handle subprocess logs
check_io(self.demod, f"{self.name}-demod")
check_io(self.playback, f"{self.name}-playback")
self._cleanup_stream()
def _init_stream(self):
@ -140,17 +146,20 @@ class Radio:
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.demod = subprocess.Popen(
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
stdin=subprocess.PIPE,
stdout=self.playback.stdin,
stderr=subprocess.DEVNULL
stderr=subprocess.PIPE
)
prep_io(self.demod, f"{self.name}-demod")
prep_io(self.playback, f"{self.name}-playback")
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
result = self.device.activateStream(self.stream)

34
scripts/connect.sh Executable file
View File

@ -0,0 +1,34 @@
#! /bin/bash
set -e
server="localhost:5000"
radio="sdrplay"
frequency="105.5M"
trap deinit INT
deinit() {
curl $server/radio/$radio/end
curl $server/radio/$radio/disconnect
#kill $pid
killall ffplay
echo -n ""
}
init() {
curl $server/radio/$radio/connect
curl $server/radio/$radio/configure/$frequency
curl $server/radio/$radio/start
}
init
echo "Waiting for stream to become active..."
sleep 3
ffplay -nodisp -hide_banner rtsp://localhost:8554/radio/sdrplay &
pid=$!
while true; do
sleep 1
done