Read Streaming Input from Subprocess.Communicate()

Read streaming input from subprocess.communicate()

Please note, I think J.F. Sebastian's method (below) is better.


Here is an simple example (with no checking for errors):

import subprocess
proc = subprocess.Popen('ls',
shell=True,
stdout=subprocess.PIPE,
)
while proc.poll() is None:
output = proc.stdout.readline()
print output,

If ls ends too fast, then the while loop may end before you've read all the data.

You can catch the remainder in stdout this way:

output = proc.communicate()[0]
print output,

Streaming read from subprocess

subprocess.run always spawns the child process, and blocks the thread until it exits.

The only option for you is to use p = subprocess.Popen(...) and read lines with s = p.stdout.readline() or p.stdout.__iter__() (see below).

This code works for me, if the child process flushes stdout after printing a line (see below for extended note).

cmd = ["/usr/bin/python3", "zzz.py"]
test_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)

out_data = ""
print(time.time(), "START")
while not "QUIT" in str(out_data):
out_data = test_proc.stdout.readline()
print(time.time(), "MAIN received", out_data)
test_proc.communicate() # shut it down

See my terminal log (dots removed from zzz.py):

ibug@ubuntu:~/t $ python3 p.py
1546450821.9174328 START
1546450821.9793346 MAIN received b'0 sleeping \n'
1546450822.987753 MAIN received b'1 sleeping \n'
1546450823.993136 MAIN received b'2 sleeping \n'
1546450824.997726 MAIN received b'3 sleeping \n'
1546450825.9975247 MAIN received b'4 sleeping \n'
1546450827.0094354 MAIN received b'QUIT this exercise\n'

You can also do it with a for loop:

for out_data in test_proc.stdout:
if "QUIT" in str(out_data):
break
print(time.time(), "MAIN received", out_data)

If you cannot modify the child process, unbuffer (from package expect - install with APT or YUM) may help. This is my working parent code without changing the child code.

test_proc = subprocess.Popen(
["unbuffer"] + cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)

subprocess.communicate(input_string) still tries to read input from stdin

Passing an argument to process.communicate() is correct, but you also need to pass stdin=subprocess.PIPE. Thus:

command = ['../path/to/bin/executable-script', '-option1=x', '-topic', 'my_kafka_topic']
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# ^^^^^^^^^^^^^^^^^^^^^
process.communicate(input_data)

How to capture streaming output in python from subprocess.communicate()

I can think of a few solutions.

#1: You can just go into the source to grab the code for communicate, copy and paste it, adding in code that prints each line as it comes in as well as buffering things up. (If its possible for your own stdout to block because of, say, a deadlocked parent, you can use a threading.Queue or something instead.) This is obviously a bit hacky, but it's pretty easy, and will be safe.

But really, communicate is complicated because it needs to be fully general, and handle cases you don't. All you need here is the central trick: throw threads at the problem. A dedicated reader thread that doesn't do anything slow or blocking between read calls is all you need.

Something like this:

self.process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE)
lines = []
def reader():
for line in self.process.stdout:
lines.append(line)
sys.stdout.write(line)
t = threading.Thread(target=reader)
t.start()
self.process.wait()
t.join()

You may need some error handling in the reader thread. And I'm not 100% sure you can safely use readline here. But this will either work, or be close.

#2: Or you can create a wrapper class that takes a file object and tees to stdout/stderr every time anyone reads from it. Then create the pipes manually, and pass in wrapped pipes, instead of using the automagic PIPE. This has the exact same issues as #1 (meaning either no issues, or you need to use a Queue or something if sys.stdout.write can block).

Something like this:

class TeeReader(object):
def __init__(self, input_file, tee_file):
self.input_file = input_file
self.tee_file = tee_file
def read(self, size=-1):
ret = self.input_file.read(size)
if ret:
self.tee_file.write(ret)
return ret

In other words, it wraps a file object (or something that acts like one), and acts like a file object. (When you use PIPE, process.stdout is a real file object on Unix, but may just be something that acts like on on Windows.) Any other methods you need to delegate to input_file can probably be delegated directly, without any extra wrapping. Either try this and see what methods communicate gets AttributeExceptions looking for and code those those explicitly, or do the usual __getattr__ trick to delegate everything. PS, if you're worried about this "file object" idea meaning disk storage, read Everything is a file at Wikipedia.

#3: Finally, you can grab one of the "async subprocess" modules on PyPI or included in twisted or other async frameworks and use that. (This makes it possible to avoid the deadlock problems, but it's not guaranteed—you still have to make sure to services the pipes properly.)

Constantly print Subprocess output while process is running

You can use iter to process lines as soon as the command outputs them: lines = iter(fd.readline, ""). Here's a full example showing a typical use case (thanks to @jfs for helping out):

from __future__ import print_function # Only Python 2.x
import subprocess

def execute(cmd):
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
for stdout_line in iter(popen.stdout.readline, ""):
yield stdout_line
popen.stdout.close()
return_code = popen.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, cmd)

# Example
for path in execute(["locate", "a"]):
print(path, end="")

streaming data into command with subprocess.Popen

Just write to the pipe directly:

#!/usr/bin/env python2
import fileinput
import subprocess

process = subprocess.Popen(['sort'], stdin=subprocess.PIPE)
with process.stdin as pipe, fileinput.FileInput() as file:
for line in file:
if file.isfirstline(): # print header
print line,
else: # pipe tails
pipe.write(line)
process.wait()

Python read from subprocess stdout and stderr separately while preserving order

Here's a solution based on selectors, but one that preserves order, and streams variable-length characters (even single chars).

The trick is to use read1(), instead of read().

import selectors
import subprocess
import sys

p = subprocess.Popen(
["python", "random_out.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

sel = selectors.DefaultSelector()
sel.register(p.stdout, selectors.EVENT_READ)
sel.register(p.stderr, selectors.EVENT_READ)

while True:
for key, _ in sel.select():
data = key.fileobj.read1().decode()
if not data:
exit()
if key.fileobj is p.stdout:
print(data, end="")
else:
print(data, end="", file=sys.stderr)

If you want a test program, use this.

import sys
from time import sleep


for i in range(10):
print(f" x{i} ", file=sys.stderr, end="")
sleep(0.1)
print(f" y{i} ", end="")
sleep(0.1)

read subprocess stdout line by line

I think the problem is with the statement for line in proc.stdout, which reads the entire input before iterating over it. The solution is to use readline() instead:

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
line = proc.stdout.readline()
if not line:
break
#the real code does filtering here
print "test:", line.rstrip()

Of course you still have to deal with the subprocess' buffering.

Note: according to the documentation the solution with an iterator should be equivalent to using readline(), except for the read-ahead buffer, but (or exactly because of this) the proposed change did produce different results for me (Python 2.5 on Windows XP).



Related Topics



Leave a reply



Submit