Added first run of kafka logging
This commit is contained in:
parent
2895050152
commit
d84268847c
43
docker-compose.yml
Normal file
43
docker-compose.yml
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
version: "3"
|
||||||
|
|
||||||
|
services:
|
||||||
|
kafka:
|
||||||
|
container_name: kafka
|
||||||
|
hostname: kafka
|
||||||
|
image: bitnami/kafka:latest
|
||||||
|
ports:
|
||||||
|
# Flip the ports around, external gets default.
|
||||||
|
- "9092:9094"
|
||||||
|
- "9094:9092"
|
||||||
|
networks:
|
||||||
|
- kafka-internal
|
||||||
|
volumes:
|
||||||
|
- "kafka_data:/bitnami"
|
||||||
|
environment:
|
||||||
|
# https://hub.docker.com/r/bitnami/kafka/
|
||||||
|
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||||
|
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
|
||||||
|
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
|
||||||
|
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||||
|
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
|
||||||
|
|
||||||
|
kafka-ui:
|
||||||
|
container_name: kafka-ui
|
||||||
|
image: provectuslabs/kafka-ui:latest
|
||||||
|
ports:
|
||||||
|
- 8080:8080
|
||||||
|
networks:
|
||||||
|
- kafka-internal
|
||||||
|
environment:
|
||||||
|
# https://docs.kafka-ui.provectus.io/configuration/misc-configuration-properties
|
||||||
|
KAFKA_CLUSTERS_0_NAME: test
|
||||||
|
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
|
||||||
|
depends_on:
|
||||||
|
- kafka
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
kafka_data:
|
||||||
|
driver: local
|
||||||
|
|
||||||
|
networks:
|
||||||
|
kafka-internal:
|
||||||
64
kafkalogging.py
Normal file
64
kafkalogging.py
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
from kafka import KafkaProducer
|
||||||
|
import subprocess
|
||||||
|
import os
|
||||||
|
|
||||||
|
kafka_servers = ['localhost:9092']
|
||||||
|
producers = {}
|
||||||
|
|
||||||
|
|
||||||
|
# Do anything that might take more than a few ms early, otherwise you hit overflow errors
|
||||||
|
def prep_io(process, process_name=None):
|
||||||
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
||||||
|
if pipe is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if os.get_blocking(pipe.fileno()):
|
||||||
|
os.set_blocking(pipe.fileno(), False)
|
||||||
|
|
||||||
|
if process.pid not in producers:
|
||||||
|
producers[process.pid] = {}
|
||||||
|
if pipe_name not in producers[process.pid]:
|
||||||
|
producers[process.pid][pipe_name] = KafkaProducer(
|
||||||
|
bootstrap_servers=kafka_servers,
|
||||||
|
max_block_ms=200
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Inspired by gist:
|
||||||
|
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
|
||||||
|
def check_io(process, process_name=None):
|
||||||
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
||||||
|
if pipe is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# The python official docs recommend using Popen.communicate() instead of reading
|
||||||
|
# from the pipes directly. Using Popen.communicate is a blocking call though,
|
||||||
|
# which would stall threads for processes producing no output.
|
||||||
|
output = pipe.readline()
|
||||||
|
if output:
|
||||||
|
producers[process.pid][pipe_name].send(topic_name, value=output)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
producers[process.pid][pipe_name].flush()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import time
|
||||||
|
|
||||||
|
ping = subprocess.Popen(
|
||||||
|
[
|
||||||
|
'/usr/bin/ping',
|
||||||
|
'1.1.1.1',
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE
|
||||||
|
)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
check_io(ping, 'ping')
|
||||||
|
time.sleep(0.1)
|
||||||
15
radio.py
15
radio.py
@ -7,6 +7,7 @@ import subprocess
|
|||||||
import struct
|
import struct
|
||||||
from soapyhelpers import *
|
from soapyhelpers import *
|
||||||
from samplerates import *
|
from samplerates import *
|
||||||
|
from kafkalogging import *
|
||||||
|
|
||||||
|
|
||||||
class Radio:
|
class Radio:
|
||||||
@ -131,6 +132,11 @@ class Radio:
|
|||||||
*self.buffer[:read_size])
|
*self.buffer[:read_size])
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# handle subprocess logs
|
||||||
|
check_io(self.demod, f"{self.name}-demod")
|
||||||
|
check_io(self.playback, f"{self.name}-playback")
|
||||||
|
|
||||||
|
|
||||||
self._cleanup_stream()
|
self._cleanup_stream()
|
||||||
|
|
||||||
def _init_stream(self):
|
def _init_stream(self):
|
||||||
@ -140,17 +146,20 @@ class Radio:
|
|||||||
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
|
'-f', 'rtsp', f"rtsp://localhost{self._stream_path()}"
|
||||||
],
|
],
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
stdout=subprocess.DEVNULL,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.DEVNULL
|
stderr=subprocess.PIPE
|
||||||
)
|
)
|
||||||
|
|
||||||
self.demod = subprocess.Popen(
|
self.demod = subprocess.Popen(
|
||||||
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
|
['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)],
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
stdout=self.playback.stdin,
|
stdout=self.playback.stdin,
|
||||||
stderr=subprocess.DEVNULL
|
stderr=subprocess.PIPE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
prep_io(self.demod, f"{self.name}-demod")
|
||||||
|
prep_io(self.playback, f"{self.name}-playback")
|
||||||
|
|
||||||
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
|
self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy)
|
||||||
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
|
self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy)
|
||||||
result = self.device.activateStream(self.stream)
|
result = self.device.activateStream(self.stream)
|
||||||
|
|||||||
34
scripts/connect.sh
Executable file
34
scripts/connect.sh
Executable file
@ -0,0 +1,34 @@
|
|||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
|
||||||
|
server="localhost:5000"
|
||||||
|
radio="sdrplay"
|
||||||
|
frequency="105.5M"
|
||||||
|
|
||||||
|
trap deinit INT
|
||||||
|
|
||||||
|
deinit() {
|
||||||
|
curl $server/radio/$radio/end
|
||||||
|
curl $server/radio/$radio/disconnect
|
||||||
|
#kill $pid
|
||||||
|
killall ffplay
|
||||||
|
echo -n ""
|
||||||
|
}
|
||||||
|
|
||||||
|
init() {
|
||||||
|
curl $server/radio/$radio/connect
|
||||||
|
curl $server/radio/$radio/configure/$frequency
|
||||||
|
curl $server/radio/$radio/start
|
||||||
|
}
|
||||||
|
|
||||||
|
init
|
||||||
|
echo "Waiting for stream to become active..."
|
||||||
|
sleep 3
|
||||||
|
ffplay -nodisp -hide_banner rtsp://localhost:8554/radio/sdrplay &
|
||||||
|
pid=$!
|
||||||
|
|
||||||
|
while true; do
|
||||||
|
sleep 1
|
||||||
|
done
|
||||||
Loading…
Reference in New Issue
Block a user