Subprocess.Popen: Cloning Stdout and Stderr Both to Terminal and Variables

Subprocess.Popen: cloning stdout and stderr both to terminal and variables

You could spawn threads to read the stdout and stderr pipes, write to a common queue, and append to lists. Then use a third thread to print items from the queue.

import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def read_output(pipe, funcs):
for line in iter(pipe.readline, ''):
for func in funcs:
func(line)
# time.sleep(1)
pipe.close()

def write_output(get):
for line in iter(get, None):
sys.stdout.write(line)

process = subprocess.Popen(
['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
t.daemon = True
t.start()
process.wait()
for t in (tout, terr):
t.join()
q.put(None)
print(out)
print(err)

The reason for using the third thread -- instead of letting the first two threads both print directly to the terminal -- is to prevent both print statements from occurring concurrently, which can result in sometimes garbled text.


The above calls random_print.py, which prints to stdout and stderr at random:

import sys
import time
import random

for i in range(50):
f = random.choice([sys.stdout,sys.stderr])
f.write(str(i)+'\n')
f.flush()
time.sleep(0.1)

This solution borrows code and ideas from J. F. Sebastian, here.


Here is an alternative solution for Unix-like systems, using select.select:

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def make_async(fd):
# https://stackoverflow.com/a/7730201/190597
'''add the O_NONBLOCK flag to a file descriptor'''
fcntl.fcntl(
fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
# https://stackoverflow.com/a/7730201/190597
'''read some data from a file descriptor, ignoring EAGAIN errors'''
# time.sleep(1)
try:
return fd.read()
except IOError, e:
if e.errno != errno.EAGAIN:
raise e
else:
return ''

def write_output(fds, outmap):
for fd in fds:
line = read_async(fd)
sys.stdout.write(line)
outmap[fd.fileno()].append(line)

process = subprocess.Popen(
['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)

make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
write_output(rlist, outmap)
if process.poll() is not None:
write_output([process.stdout, process.stderr], outmap)
break

fileno = {'stdout': process.stdout.fileno(),
'stderr': process.stderr.fileno()}

print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

This solution uses code and ideas from Adam Rosenfield's post, here.

Output of subprocess both to PIPE and directly to stdout

This snippet has helped me once in a similar situation:

process = subprocess.Popen(cmd, bufsize=1, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(process.stdout.readline, ''):
print line,
sys.stdout.flush() # please see comments regarding the necessity of this line
process.wait()
errcode = process.returncode

Displaying subprocess output to stdout and redirecting it

To save subprocess' stdout to a variable for further processing and to display it while the child process is running as it arrives:

#!/usr/bin/env python3
from io import StringIO
from subprocess import Popen, PIPE

with Popen('/path/to/script', stdout=PIPE, bufsize=1,
universal_newlines=True) as p, StringIO() as buf:
for line in p.stdout:
print(line, end='')
buf.write(line)
output = buf.getvalue()
rc = p.returncode

To save both subprocess's stdout and stderr is more complex because you should consume both streams concurrently to avoid a deadlock:

stdout_buf, stderr_buf = StringIO(), StringIO()
rc = teed_call('/path/to/script', stdout=stdout_buf, stderr=stderr_buf,
universal_newlines=True)
output = stdout_buf.getvalue()
...

where teed_call() is define here.


Update: here's a simpler asyncio version.


Old version:

Here's a single-threaded solution based on child_process.py example from tulip:

import asyncio
import sys
from asyncio.subprocess import PIPE

@asyncio.coroutine
def read_and_display(*cmd):
"""Read cmd's stdout, stderr while displaying them as they arrive."""
# start process
process = yield from asyncio.create_subprocess_exec(*cmd,
stdout=PIPE, stderr=PIPE)

# read child's stdout/stderr concurrently
stdout, stderr = [], [] # stderr, stdout buffers
tasks = {
asyncio.Task(process.stdout.readline()): (
stdout, process.stdout, sys.stdout.buffer),
asyncio.Task(process.stderr.readline()): (
stderr, process.stderr, sys.stderr.buffer)}
while tasks:
done, pending = yield from asyncio.wait(tasks,
return_when=asyncio.FIRST_COMPLETED)
assert done
for future in done:
buf, stream, display = tasks.pop(future)
line = future.result()
if line: # not EOF
buf.append(line) # save for later
display.write(line) # display in terminal
# schedule to read the next line
tasks[asyncio.Task(stream.readline())] = buf, stream, display

# wait for the process to exit
rc = yield from process.wait()
return rc, b''.join(stdout), b''.join(stderr)

The script runs '/path/to/script command and reads line by line both its stdout&stderr concurrently. The lines are printed to parent's stdout/stderr correspondingly and saved as bytestrings for future processing. To run the read_and_display() coroutine, we need an event loop:

import os

if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
rc, *output = loop.run_until_complete(read_and_display("/path/to/script"))
if rc:
sys.exit("child failed with '{}' exit code".format(rc))
finally:
loop.close()

subprocess.Popen handling stdout and stderr as they come

I was able to solve this by using select.select()

process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
**kw
)

while True:
reads, _, _ = select(
[process.stdout.fileno(), process.stderr.fileno()],
[], []
)

for descriptor in reads:
if descriptor == process.stdout.fileno():
read = process.stdout.readline()
if read:
print 'stdout: %s' % read

if descriptor == process.stderr.fileno():
read = process.stderr.readline()
if read:
print 'stderr: %s' % read
sys.stdout.flush()

if process.poll() is not None:
break

By passing in the file descriptors to select() on the reads argument (first argument for select()) and looping over them (as long as process.poll()indicated that the process was still alive).

No need for threads. Code was adapted from this stackoverflow answer



Related Topics



Leave a reply



Submit