Using Python's Multiprocessing Module to Execute Simultaneous and Separate Seawat/Modflow Model Runs

Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs

I don't see any computations in the Python code. If you just need to execute several external programs in parallel it is sufficient to use subprocess to run the programs and threading module to maintain constant number of processes running, but the simplest code is using multiprocessing.Pool:

#!/usr/bin/env python
import os
import multiprocessing as mp

def run(filename_def_param):
filename, def_param = filename_def_param # unpack arguments
... # call external program on `filename`

def safe_run(*args, **kwargs):
"""Call run(), catch exceptions."""
try: run(*args, **kwargs)
except Exception as e:
print("error: %s run(*%r, **%r)" % (e, args, kwargs))

def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
workdir = os.path.join(ws, r'fieldgen\reals')
files = ((os.path.join(workdir, f), ws)
for f in os.listdir(workdir) if f.endswith('.npy'))

# start processes
pool = mp.Pool() # use all available CPUs
pool.map(safe_run, files)

if __name__=="__main__":
mp.freeze_support() # optional if the program is not frozen
main()

If there are many files then pool.map() could be replaced by for _ in pool.imap_unordered(safe_run, files): pass.

There is also mutiprocessing.dummy.Pool that provides the same interface as multiprocessing.Pool but uses threads instead of processes that might be more appropriate in this case.

You don't need to keep some CPUs free. Just use a command that starts your executables with a low priority (on Linux it is a nice program).

ThreadPoolExecutor example

concurrent.futures.ThreadPoolExecutor would be both simple and sufficient but it requires 3rd-party dependency on Python 2.x (it is in the stdlib since Python 3.2).

#!/usr/bin/env python
import os
import concurrent.futures

def run(filename, def_param):
... # call external program on `filename`

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
future_to_file = dict((executor.submit(run, f, ws), f) for f in files)

for future in concurrent.futures.as_completed(future_to_file):
f = future_to_file[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (f, future.exception()))
# run() doesn't return anything so `future.result()` is always `None`

Or if we ignore exceptions raised by run():

from itertools import repeat

... # the same

# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(run, files, repeat(ws))
# run() doesn't return anything so `map()` results can be ignored

subprocess + threading (manual pool) solution

#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread

def run(filename, def_param):
... # define exe, swt_nam
subprocess.check_call([exe, swt_nam]) # run external program

def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)

# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()

# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))

for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion

multiprocessing - execute external command and wait before proceeding

You can use subprocess.Popen to launch the external commands asynchronously, and store each Popen object returned in a list. Once you've launched all the processes, just iterate over them and wait for each to finish using popen_object.wait.

from subprocess import Popen

processes = []
for i in range(1,20):
arguments += str(i) + "_image.jpg "
processes.append(subprocess.Popen(shlex.split("./combine" + arguments)))

for p in processes:
p.wait()
subprocess.call("./merge_resized_images")

However, this will launch twenty concurrent processes, which is probably going to hurt performance.

To avoid that, you can use a ThreadPool to limit yourself to some lower number of concurrent processes (multiprocessing.cpu_count is a good number), and then use pool.join to wait for them all to finish.

import multiprocessing
import subprocess
import shlex

from multiprocessing.pool import ThreadPool

def call_proc(cmd):
""" This runs in a separate thread. """
#subprocess.call(shlex.split(cmd)) # This will block until cmd finishes
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return (out, err)

pool = ThreadPool(multiprocessing.cpu_count())
results = []
for i in range(1,20):
arguments += str(i) + "_image.jpg "
results.append(pool.apply_async(call_proc, ("./combine" + arguments,)))

# Close the pool and wait for each running task to complete
pool.close()
pool.join()
for result in results:
out, err = result.get()
print("out: {} err: {}".format(out, err))
subprocess.call("./merge_resized_images")

Each thread will release the GIL while waiting for the subprocess to complete, so they'll all run in parallel.

How to spawn parallel child processes on a multi-processor system?

What you are looking for is the process pool class in multiprocessing.

import multiprocessing
import subprocess

def work(cmd):
return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)

And here is a calculation example to make it easier to understand. The following will divide 10000 tasks on N processes where N is the cpu count. Note that I'm passing None as the number of processes. This will cause the Pool class to use cpu_count for the number of processes (reference)

import multiprocessing
import subprocess

def calculate(value):
return value * 10

if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results

Using the multiprocessing module

Change self.sum = 0 to self.sum = multiprocessing.Value('d', 0.0), and use self.sum.value to access or change the value.

class AdderProcess(multiprocessing.Process):    
def __init__(self):
...
self.sum = multiprocessing.Value('d', 0.0)
...
def run(self):
while True:
number = self.queue.get()
self.sum.value += number # <-- use self.sum.value
self.queue.task_done()
def get_result(self):
self.queue.join()
return self.sum.value # <-- use self.sum.value

The problem is this: Once you call self.start() in __init__, the main process forks off a child process. All values are copied. Now there are two versions of p. In the main process, p.sum is 0. In the child process, the run method is called and p.sum is augmented to 2. But when the main process calls p.get_result(), its version of p still has p.sum equal to 0.
So 0 is printed.

When you want to share a float value between processes, you need to use a sharing mechanism, such as mp.Value.

See "Sharing state between processes" for more options on how to share values.

Correct way of using multiprocessing Process() for parallel execution

Instead of spawning and managing the processes yourself rather use a Pool of workers. It is designed to deal with all of that for you.

As your workers are spawning a subprocess, you can use threads rather than processes.

Moreover, it seems that the workers will write on the same file. You will need to protect its access from concurrent instances or the result will be totally out of order.

from threading import Lock
from concurrent.futures import ThreadPoolExecutor

mutex = Lock()
task_dir = "/tmp/tasks"

def task_fn(task_nr):
"""This function will run in a separate thread."""
cmd_str = ["my_exe","-my_exe_arguments"]
try:
msg = subprocess.check_output(cmd_str, cwd=task_dir, stderr=subprocess.STDOUT, universal_newlines=True)
except subprocess.CalledProcessError as e:
with mutex:
with open("a_unique_PROTECTED_err_log_file.log", "w") as f :
f.write(e.output)

return task_nr

with ThreadPoolExecutor() as pool:
iterator = pool.map(task_fn, range(100000))
for result in iterator:
print("Task %d done" % result)

Python Multiprocessing queue

The queue is actually geting populated. You need to call queue.get() for each time you put an object to the queue. So you just need to call queue.get() one more time.

>>> import multiprocessing
>>> from multiprocessing import Queue
>>> queue = Queue()
>>> jobs = [['a', 'b'], ['c', 'd']]
>>> for job in jobs:
queue.put(job)

>>> queue.get()
['a', 'b']
>>> queue.get()
['c', 'd']

importing and using a module that uses multiprocessing without causing infinite loop on Windows

I don't quite get what you're asking. You don't need to do anything to prevent this from spawning infinitely many processes. I just ran it on Windows XP --- imported the file and ran multi.start() --- and it completed fine in a couple seconds.

The reason you have to do the if __name__=="__main__" protection is that, on Windows, multiprocessing has to import the main script in order to run the target function, which means top-level module code in that file will be executed. The problem only arises if that top-level module code itself tries to spawn a new process. In your example, the top level module code doesn't use multiprocessing, so there's no infinite process chain.

Edit: Now I get what you're asking. You don't need to protect multi.py. You need to protect your main script, whatever it is. If you're getting a crash, it's because in your main script you are doing multi.start() in the top level module code. Your script needs to look like this:

import multi
if __name__=="__main__":
multi.start()

The "protection" is always needed in the main script.

Parallel Processing in python

A good simple way to start with parallel processing in python is just the pool mapping in mutiprocessing -- its like the usual python maps but individual function calls are spread out over the different number of processes.

Factoring is a nice example of this - you can brute-force check all the divisions spreading out over all available tasks:

from multiprocessing import Pool
import numpy

numToFactor = 976

def isFactor(x):
result = None
div = (numToFactor / x)
if div*x == numToFactor:
result = (x,div)
return result

if __name__ == '__main__':
pool = Pool(processes=4)
possibleFactors = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
print 'Checking ', possibleFactors
result = pool.map(isFactor, possibleFactors)
cleaned = [x for x in result if not x is None]
print 'Factors are', cleaned

This gives me

Checking  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
Factors are [(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]


Related Topics



Leave a reply



Submit