from kafka import KafkaProducer from kafka.admin import KafkaAdminClient, NewTopic import subprocess from select import select import os kafka_servers = ['localhost:9092'] producers = {} client = KafkaAdminClient(bootstrap_servers=kafka_servers) # Inspired by gist: # https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368 def check_io(process, process_name=None): #if process.stdout is not None: os.set_blocking(process.stdout.fileno(), False) #if process.stderr is not None: os.set_blocking(process.stderr.fileno(), False) #ready_to_read, _, _ = select( # [x for x in [process.stdout, process.stderr] if x is not None], # [], [], 0 #) #return for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items(): if pipe is None: continue if os.get_blocking(pipe): os.set_blocking(pipe, False) #if pipe not in ready_to_read: # continue topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}" #if process.pid not in producers: # producers[process.pid] = {} #if pipe_name not in producers[process.pid]: # #client.create_topics( # # [ # # NewTopic(name=topic_name, num_partitions=1, replication_factor=1) # # ] # #) # producers[process.pid][pipe_name] = KafkaProducer(bootstrap_servers=kafka_servers, max_block_ms=10) while True: #print(topic_name) #break # The python official docs recommend using Popen.communicate() # instead of reading from the pipes directly. This isn't applicable # here, as the python docs assume you're shelling out to a short-lived # process and retrieving the results, not starting a service and # pulling the log output. Popen.communicate() is a blocking call and # would die here forever waiting for the service to exit. output = pipe.readline() if output: print(output) #producers[process.pid][pipe_name].send(topic_name, value=output) else: break #break anyway, i think we're dying in this loop break #producers[process.pid][pipe_name].flush() if __name__ == '__main__': import time ping = subprocess.Popen( [ '/usr/bin/ping', '1.1.1.1', ], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) while True: check_io(ping, 'ping') time.sleep(0.1) ping.kill() ping.wait() ping = None