Python: Execute Cat Subprocess in Parallel

Python: execute cat subprocess in parallel

Another approach (rather than the other suggestion of putting shell processes in the background) is to use multithreading.

The run method that you have would then do something like this:

thread.start_new_thread ( myFuncThatDoesZGrep)

To collect results, you can do something like this:

class MyThread(threading.Thread):
def run(self):
self.finished = False
# Your code to run the command here.
blahBlah()
# When finished....
self.finished = True
self.results = []

Run the thread as stated above in the link on multithreading. When your thread object has myThread.finished == True, then you can collect the results via myThread.results.

Python subprocess in parallel

You can do it in a single thread.

Suppose you have a script that prints lines at random times:

#!/usr/bin/env python
#file: child.py
import os
import random
import sys
import time

for i in range(10):
print("%2d %s %s" % (int(sys.argv[1]), os.getpid(), i))
sys.stdout.flush()
time.sleep(random.random())

And you'd like to collect the output as soon as it becomes available, you could use select on POSIX systems as @zigg suggested:

#!/usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen, PIPE

# start several subprocesses
processes = [Popen(['./child.py', str(i)], stdout=PIPE,
bufsize=1, close_fds=True,
universal_newlines=True)
for i in range(5)]

# read output
timeout = 0.1 # seconds
while processes:
# remove finished processes from the list (O(N**2))
for p in processes[:]:
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)

# wait until there is something to read
rlist = select([p.stdout for p in processes], [],[], timeout)[0]

# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block

A more portable solution (that should work on Windows, Linux, OSX) can use reader threads for each process, see Non-blocking read on a subprocess.PIPE in python.

Here's os.pipe()-based solution that works on Unix and Windows:

#!/usr/bin/env python
from __future__ import print_function
import io
import os
import sys
from subprocess import Popen

ON_POSIX = 'posix' in sys.builtin_module_names

# create a pipe to get data
input_fd, output_fd = os.pipe()

# start several subprocesses
processes = [Popen([sys.executable, 'child.py', str(i)], stdout=output_fd,
close_fds=ON_POSIX) # close input_fd in children
for i in range(5)]
os.close(output_fd) # close unused end of the pipe

# read output line by line as soon as it is available
with io.open(input_fd, 'r', buffering=1) as file:
for line in file:
print(line, end='')
#
for p in processes:
p.wait()

Control the number of subprocesses using to call external commands in python

You can use subprocess.call if you want to wait for the command to complete. See pydoc subprocess for more information.

You could also call the Popen.wait method in your worker:

def worker(cmd): 
p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile);
p.wait()

Because there seems to be some confusion about this answer, here's a complete example:

import concurrent.futures
import multiprocessing
import random
import subprocess

def worker(workerid):
print(f"start {workerid}")
p = subprocess.Popen(["sleep", f"{random.randint(1,30)}"])
p.wait()
print(f"stop {workerid}")
return workerid

def main():
tasks = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
for i in range(20):
tasks.append(pool.submit(worker, i))

print("waiting for tasks...", flush=True)
for task in concurrent.futures.as_completed(tasks):
print(f"completed {task.result()}", flush=True)
print("done.")

if __name__ == "__main__":
main()

If you run the above code, you will see that all of the worker processes start in parallel and that we are able to gather values as they are completed.

running multiple bash commands with subprocess

You have to use shell=True in subprocess and no shlex.split:

import subprocess

command = "echo a; echo b"

ret = subprocess.run(command, capture_output=True, shell=True)

# before Python 3.7:
# ret = subprocess.run(command, stdout=subprocess.PIPE, shell=True)

print(ret.stdout.decode())

returns:

a
b

Rewrite shell script with GNU Parallel to Python

I would use multiprocessing :

from multiprocessing import Pool

def run(file):
do something with input file
os.command(...)

if __name__ == '__main__':
with Pool(5) as p:
p.map(run, sys.argv[1:])

Call it with :

python test.py "${files[@]}"

Run multiple subprocesses in parallel - python 2.7

Along with the modifications mentioned by @chepner, you can try to use subprocess.call() instead of subprocess.Popen(). The latter method is not blocking and this causes all the commands to be executed simultaneously. However, call() is blocking and therefore your script will wait until the pinging is finished before entering next iteration of the loop. This will cause the output of your commands to be in sequential order and not interleaving.

If you need to execute the commands in parallel, I would suggest to write the outputs into different files and combine them after all commands are finished.

Edit: I have no particular experience in this area, but I guess the termination issue is related to the ping command only. Check the manual page here: https://linux.die.net/man/8/ping. In our case, we need to ping the destination X times. This is specified by using parameter -c X, where X defines the number of packets to be sent. This parameter can be also combined with parameter -w / -W which specify the timeout limit for the ping command. Take a look at some examples: https://www.thegeekstuff.com/2009/11/ping-tutorial-13-effective-ping-command-examples/

Python subprocess return code without waiting

You don't need neither multiprocessing nor threading here. You could run multiple child processes in parallel and collect their statutes all in a single thread:

#!/usr/bin/env python3
from subprocess import Popen

def run(cmd, log_filename):
with open(log_filename, 'wb', 0) as logfile:
return Popen(cmd, stdout=logfile)

# start several subprocesses
processes = {run(['echo', c], 'subprocess.%s.log' % c) for c in 'abc'}
# now they all run in parallel
# report as soon as a child process exits
while processes:
for p in processes:
if p.poll() is not None:
processes.remove(p)
print('{} done, status {}'.format(p.args, p.returncode))
break

p.args stores cmd in Python 3.3+, keep track of cmd yourself on earlier Python versions.

See also:

  • Python threading multiple bash subprocesses?
  • Python subprocess in parallel
  • Python: execute cat subprocess in parallel
  • Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs

To limit number of parallel jobs a ThreadPool could be used (as shown in the first link):

#!/usr/bin/env python3
from multiprocessing.dummy import Pool # use threads
from subprocess import Popen

def run_until_done(args):
cmd, log_filename = args
try:
with open(log_filename, 'wb', 0) as logfile:
p = Popen(cmd, stdout=logfile)
return cmd, p.wait(), None
except Exception as e:
return cmd, None, str(e)

commands = ((('echo', str(d)), 'subprocess.%03d.log' % d) for d in range(500))
pool = Pool(128) # 128 concurrent commands at a time
for cmd, status, error in pool.imap_unordered(run_until_done, commands):
if error is None:
fmt = '{cmd} done, status {status}'
else:
fmt = 'failed to run {cmd}, reason: {error}'
print(fmt.format_map(vars())) # or fmt.format(**vars()) on older versions

The thread pool in the example has 128 threads (no more, no less). It can't execute more than 128 jobs concurrently. As soon as any of the threads frees (done with a job), it takes another, etc. Total number of jobs that is executed concurrently is limited by the number of threads. New job doesn't wait for all 128 previous jobs to finish. It is started when any of the old jobs is done.



Related Topics



Leave a reply



Submit