64 lines
1.9 KiB
Python
64 lines
1.9 KiB
Python
from kafka import KafkaProducer
|
|
import subprocess
|
|
import os
|
|
|
|
kafka_servers = ['localhost:9092']
|
|
producers = {}
|
|
|
|
|
|
# Do anything that might take more than a few ms early, otherwise you hit overflow errors
|
|
def prep_io(process, process_name=None):
|
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
|
if pipe is None:
|
|
continue
|
|
|
|
if os.get_blocking(pipe.fileno()):
|
|
os.set_blocking(pipe.fileno(), False)
|
|
|
|
if process.pid not in producers:
|
|
producers[process.pid] = {}
|
|
if pipe_name not in producers[process.pid]:
|
|
producers[process.pid][pipe_name] = KafkaProducer(
|
|
bootstrap_servers=kafka_servers,
|
|
max_block_ms=200
|
|
)
|
|
|
|
|
|
# Inspired by gist:
|
|
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
|
|
def check_io(process, process_name=None):
|
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
|
if pipe is None:
|
|
continue
|
|
|
|
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
|
|
|
|
while True:
|
|
# The python official docs recommend using Popen.communicate() instead of reading
|
|
# from the pipes directly. Using Popen.communicate is a blocking call though,
|
|
# which would stall threads for processes producing no output.
|
|
output = pipe.readline()
|
|
if output:
|
|
producers[process.pid][pipe_name].send(topic_name, value=output)
|
|
else:
|
|
break
|
|
|
|
producers[process.pid][pipe_name].flush()
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import time
|
|
|
|
ping = subprocess.Popen(
|
|
[
|
|
'/usr/bin/ping',
|
|
'1.1.1.1',
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
while True:
|
|
check_io(ping, 'ping')
|
|
time.sleep(0.1) |