88 lines
2.6 KiB
Python
88 lines
2.6 KiB
Python
from kafka import KafkaProducer
|
|
from kafka.admin import KafkaAdminClient, NewTopic
|
|
import subprocess
|
|
from select import select
|
|
import os
|
|
|
|
kafka_servers = ['localhost:9092']
|
|
producers = {}
|
|
client = KafkaAdminClient(bootstrap_servers=kafka_servers)
|
|
|
|
|
|
# Inspired by gist:
|
|
# https://gist.github.com/krzemienski/8c7e8e76c8652984cca0da3fea5b5368
|
|
def check_io(process, process_name=None):
|
|
#if process.stdout is not None: os.set_blocking(process.stdout.fileno(), False)
|
|
#if process.stderr is not None: os.set_blocking(process.stderr.fileno(), False)
|
|
|
|
#ready_to_read, _, _ = select(
|
|
# [x for x in [process.stdout, process.stderr] if x is not None],
|
|
# [], [], 0
|
|
#)
|
|
|
|
#return
|
|
|
|
for pipe_name,pipe in {'stdout':process.stdout, 'stderr':process.stderr}.items():
|
|
if pipe is None:
|
|
continue
|
|
|
|
if os.get_blocking(pipe):
|
|
os.set_blocking(pipe, False)
|
|
|
|
#if pipe not in ready_to_read:
|
|
# continue
|
|
|
|
topic_name = f"{process.pid if process_name is None else process_name}-{pipe_name}"
|
|
|
|
#if process.pid not in producers:
|
|
# producers[process.pid] = {}
|
|
#if pipe_name not in producers[process.pid]:
|
|
# #client.create_topics(
|
|
# # [
|
|
# # NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
|
|
# # ]
|
|
# #)
|
|
# producers[process.pid][pipe_name] = KafkaProducer(bootstrap_servers=kafka_servers, max_block_ms=10)
|
|
|
|
while True:
|
|
#print(topic_name)
|
|
#break
|
|
# The python official docs recommend using Popen.communicate()
|
|
# instead of reading from the pipes directly. This isn't applicable
|
|
# here, as the python docs assume you're shelling out to a short-lived
|
|
# process and retrieving the results, not starting a service and
|
|
# pulling the log output. Popen.communicate() is a blocking call and
|
|
# would die here forever waiting for the service to exit.
|
|
output = pipe.readline()
|
|
if output:
|
|
print(output)
|
|
#producers[process.pid][pipe_name].send(topic_name, value=output)
|
|
else:
|
|
break
|
|
|
|
#break anyway, i think we're dying in this loop
|
|
break
|
|
|
|
#producers[process.pid][pipe_name].flush()
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import time
|
|
|
|
ping = subprocess.Popen(
|
|
[
|
|
'/usr/bin/ping',
|
|
'1.1.1.1',
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
while True:
|
|
check_io(ping, 'ping')
|
|
time.sleep(0.1)
|
|
|
|
ping.kill()
|
|
ping.wait()
|
|
ping = None |