sdrplay-fm-radio/data/kafkalogging.py
2024-06-29 19:28:01 +09:30

88 lines
2.6 KiB
Python

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
import subprocess
from select import select
import os
kafka_servers = ['localhost:9092']
producers = {}
client = KafkaAdminClient(bootstrap_servers=kafka_servers)
# Inspired by gist:
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
def check_io(process, process_name=None):
#if process.stdout is not None: os.set_blocking(process.stdout.fileno(), False)
#if process.stderr is not None: os.set_blocking(process.stderr.fileno(), False)
#ready_to_read, _, _ = select(
# [x for x in [process.stdout, process.stderr] if x is not None],
# [], [], 0
#)
#return
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
if pipe is None:
continue
if os.get_blocking(pipe):
os.set_blocking(pipe, False)
#if pipe not in ready_to_read:
# continue
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
#if process.pid not in producers:
# producers[process.pid] = {}
#if pipe_name not in producers[process.pid]:
# #client.create_topics(
# # [
# # NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
# # ]
# #)
# producers[process.pid][pipe_name] = KafkaProducer(bootstrap_servers=kafka_servers, max_block_ms=10)
while True:
#print(topic_name)
#break
# The python official docs recommend using Popen.communicate()
# instead of reading from the pipes directly. This isn't applicable
# here, as the python docs assume you're shelling out to a short-lived
# process and retrieving the results, not starting a service and
# pulling the log output. Popen.communicate() is a blocking call and
# would die here forever waiting for the service to exit.
output = pipe.readline()
if output:
print(output)
#producers[process.pid][pipe_name].send(topic_name, value=output)
else:
break
#break anyway, i think we're dying in this loop
break
#producers[process.pid][pipe_name].flush()
if __name__ == '__main__':
import time
ping = subprocess.Popen(
[
'/usr/bin/ping',
'1.1.1.1',
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
while True:
check_io(ping, 'ping')
time.sleep(0.1)
ping.kill()
ping.wait()
ping = None