Stop Reading Process Output in Python Without Hang

Stop reading process output in Python without hang?

#!/usr/bin/env python
"""Start process; wait 2 seconds; kill the process; print all process output."""
import subprocess
import tempfile
import time

def main():
# open temporary file (it automatically deleted when it is closed)
# `Popen` requires `f.fileno()` so `SpooledTemporaryFile` adds nothing here
f = tempfile.TemporaryFile()

# start process, redirect stdout
p = subprocess.Popen(["top"], stdout=f)

# wait 2 seconds
time.sleep(2)

# kill process
#NOTE: if it doesn't kill the process then `p.wait()` blocks forever
p.terminate()
p.wait() # wait for the process to terminate otherwise the output is garbled

# print saved output
f.seek(0) # rewind to the beginning of the file
print f.read(),
f.close()

if __name__=="__main__":
main()

Tail-like Solutions that print only the portion of the output

You could read the process output in another thread and save the required number of the last lines in a queue:

import collections
import subprocess
import time
import threading

def read_output(process, append):
for line in iter(process.stdout.readline, ""):
append(line)

def main():
# start process, redirect stdout
process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True)
try:
# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(maxlen=number_of_lines) # atomic .append()
t = threading.Thread(target=read_output, args=(process, q.append))
t.daemon = True
t.start()

#
time.sleep(2)
finally:
process.terminate() #NOTE: it doesn't ensure the process termination

# print saved lines
print ''.join(q)

if __name__=="__main__":
main()

This variant requires q.append() to be atomic operation. Otherwise the output might be corrupted.

signal.alarm() solution

You could use signal.alarm() to call the process.terminate() after specified timeout instead of reading in another thread. Though it might not interact very well with the subprocess module. Based on @Alex Martelli's answer:

import collections
import signal
import subprocess

class Alarm(Exception):
pass

def alarm_handler(signum, frame):
raise Alarm

def main():
# start process, redirect stdout
process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True)

# set signal handler
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(2) # produce SIGALRM in 2 seconds

try:
# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(maxlen=number_of_lines)
for line in iter(process.stdout.readline, ""):
q.append(line)
signal.alarm(0) # cancel alarm
except Alarm:
process.terminate()
finally:
# print saved lines
print ''.join(q)

if __name__=="__main__":
main()

This approach works only on *nix systems. It might block if process.stdout.readline() doesn't return.

threading.Timer solution

import collections
import subprocess
import threading

def main():
# start process, redirect stdout
process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True)

# terminate process in timeout seconds
timeout = 2 # seconds
timer = threading.Timer(timeout, process.terminate)
timer.start()

# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(process.stdout, maxlen=number_of_lines)
timer.cancel()

# print saved lines
print ''.join(q),

if __name__=="__main__":
main()

This approach should also work on Windows. Here I've used process.stdout as an iterable; it might introduce an additional output buffering, you could switch to the iter(process.stdout.readline, "") approach if it is not desirable. if the process doesn't terminate on process.terminate() then the scripts hangs.

No threads, no signals solution

import collections
import subprocess
import sys
import time

def main():
args = sys.argv[1:]
if not args:
args = ['top']

# start process, redirect stdout
process = subprocess.Popen(args, stdout=subprocess.PIPE, close_fds=True)

# save last `number_of_lines` lines of the process output
number_of_lines = 200
q = collections.deque(maxlen=number_of_lines)

timeout = 2 # seconds
now = start = time.time()
while (now - start) < timeout:
line = process.stdout.readline()
if not line:
break
q.append(line)
now = time.time()
else: # on timeout
process.terminate()

# print saved lines
print ''.join(q),

if __name__=="__main__":
main()

This variant use neither threads, no signals but it produces garbled output in the terminal. It will block if process.stdout.readline() blocks.

A non-blocking read on a subprocess.PIPE in Python

fcntl, select, asyncproc won't help in this case.

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading import Thread

try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line

Python subprocess readlines() hangs

I assume you use pty due to reasons outlined in Q: Why not just use a pipe (popen())? (all other answers so far ignore your "NOTE: I don't want to print out everything at once").

pty is Linux only as said in the docs:

Because pseudo-terminal handling is highly platform dependent, there
is code to do it only for Linux. (The Linux code is supposed to work
on other platforms, but hasn’t been tested yet.)

It is unclear how well it works on other OSes.

You could try pexpect:

import sys
import pexpect

pexpect.run("ruby ruby_sleep.rb", logfile=sys.stdout)

Or stdbuf to enable line-buffering in non-interactive mode:

from subprocess import Popen, PIPE, STDOUT

proc = Popen(['stdbuf', '-oL', 'ruby', 'ruby_sleep.rb'],
bufsize=1, stdout=PIPE, stderr=STDOUT, close_fds=True)
for line in iter(proc.stdout.readline, b''):
print line,
proc.stdout.close()
proc.wait()

Or using pty from stdlib based on @Antti Haapala's answer:

#!/usr/bin/env python
import errno
import os
import pty
from subprocess import Popen, STDOUT

master_fd, slave_fd = pty.openpty() # provide tty to enable
# line-buffering on ruby's side
proc = Popen(['ruby', 'ruby_sleep.rb'],
stdin=slave_fd, stdout=slave_fd, stderr=STDOUT, close_fds=True)
os.close(slave_fd)
try:
while 1:
try:
data = os.read(master_fd, 512)
except OSError as e:
if e.errno != errno.EIO:
raise
break # EIO means EOF on some systems
else:
if not data: # EOF
break
print('got ' + repr(data))
finally:
os.close(master_fd)
if proc.poll() is None:
proc.kill()
proc.wait()
print("This is reached!")

All three code examples print 'hello' immediately (as soon as the first EOL is seen).


leave the old more complicated code example here because it may be referenced and discussed in other posts on SO

Or using pty based on @Antti Haapala's answer:

import os
import pty
import select
from subprocess import Popen, STDOUT

master_fd, slave_fd = pty.openpty() # provide tty to enable
# line-buffering on ruby's side
proc = Popen(['ruby', 'ruby_sleep.rb'],
stdout=slave_fd, stderr=STDOUT, close_fds=True)
timeout = .04 # seconds
while 1:
ready, _, _ = select.select([master_fd], [], [], timeout)
if ready:
data = os.read(master_fd, 512)
if not data:
break
print("got " + repr(data))
elif proc.poll() is not None: # select timeout
assert not select.select([master_fd], [], [], 0)[0] # detect race condition
break # proc exited
os.close(slave_fd) # can't do it sooner: it leads to errno.EIO error
os.close(master_fd)
proc.wait()

print("This is reached!")

Python hang when reading data from C subprocess

Finally find the solution......

If Python attempts to read data from an empty buffer,
in this case, it will wait the input from C and hang.

There're two problems should be solved in the original code:

  1. Add fflush(stdout) after printf.

    Otherwise, the output will be buffered and not actually written unless fflush(stdout) is called.

  2. Python needs to know how many chars or how many lines it should read.

    Don't use fpi_c.stdout.read(). This will wait C input and hang. Instead, use fpi_c.stdout.read(1) to read 1 char or
    fpi_c.stdout.readline() to read 1 line.

Something like this.

terminating a function call in python after n seconds

Finally the below code worked:

import subprocess
import threading
import time

def process_tree_kill(process_pid):
subprocess.call(['taskkill', '/F', '/T', '/PID', process_pid])

def main():
cmd = ["gcc", "-O2", "a.c", "-o", "a"];
p = subprocess.Popen(cmd)
p.wait()
print "Compiled"
start = time.time()

process = subprocess.Popen("a",shell=True)
print(str(process.pid))

# terminate process in timeout seconds
timeout = 3 # seconds
timer = threading.Timer(timeout, process_tree_kill,[str(process.pid)])
timer.start()

process.wait()
timer.cancel()

elapsed = (time.time() - start)
print elapsed

if __name__=="__main__":
main()

Timeout on subprocess readline in Python

Thanks for all the answers!

I found a way to solve my problem by simply using select.poll to peek into standard output.

import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)
while(some_criterium and not time_limit):
poll_result = poll_obj.poll(0)
if poll_result:
line = scan_process.stdout.readline()
some_criterium = do_something(line)
update(time_limit)


Related Topics



Leave a reply



Submit