from kafka import KafkaProducer import subprocess import os kafka_servers = ['localhost:9092'] producers = {} # Do anything that might take more than a few ms early, otherwise you hit overflow errors def prep_io(process, process_name=None): for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items(): if pipe is None: continue if os.get_blocking(pipe.fileno()): os.set_blocking(pipe.fileno(), False) if process.pid not in producers: producers[process.pid] = {} if pipe_name not in producers[process.pid]: producers[process.pid][pipe_name] = KafkaProducer( bootstrap_servers=kafka_servers, max_block_ms=200 ) # Inspired by gist: # https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368 def check_io(process, process_name=None): for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items(): if pipe is None: continue topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}" while True: # The python official docs recommend using Popen.communicate() instead of reading # from the pipes directly. Using Popen.communicate is a blocking call though, # which would stall threads for processes producing no output. output = pipe.readline() if output: producers[process.pid][pipe_name].send(topic_name, value=output) else: break producers[process.pid][pipe_name].flush() if __name__ == '__main__': import time ping = subprocess.Popen( [ '/usr/bin/ping', '1.1.1.1', ], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) while True: check_io(ping, 'ping') time.sleep(0.1)