Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
I don't see any computations in the Python code. If you just need to execute several external programs in parallel it is sufficient to use subprocess
to run the programs and threading
module to maintain constant number of processes running, but the simplest code is using multiprocessing.Pool
:
#!/usr/bin/env python
import os
import multiprocessing as mp
def run(filename_def_param):
filename, def_param = filename_def_param # unpack arguments
... # call external program on `filename`
def safe_run(*args, **kwargs):
"""Call run(), catch exceptions."""
try: run(*args, **kwargs)
except Exception as e:
print("error: %s run(*%r, **%r)" % (e, args, kwargs))
def main():
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
workdir = os.path.join(ws, r'fieldgen\reals')
files = ((os.path.join(workdir, f), ws)
for f in os.listdir(workdir) if f.endswith('.npy'))
# start processes
pool = mp.Pool() # use all available CPUs
pool.map(safe_run, files)
if __name__=="__main__":
mp.freeze_support() # optional if the program is not frozen
main()
If there are many files then pool.map()
could be replaced by for _ in pool.imap_unordered(safe_run, files): pass
.
There is also mutiprocessing.dummy.Pool
that provides the same interface as multiprocessing.Pool
but uses threads instead of processes that might be more appropriate in this case.
You don't need to keep some CPUs free. Just use a command that starts your executables with a low priority (on Linux it is a nice
program).
ThreadPoolExecutor
example
concurrent.futures.ThreadPoolExecutor
would be both simple and sufficient but it requires 3rd-party dependency on Python 2.x (it is in the stdlib since Python 3.2).
#!/usr/bin/env python
import os
import concurrent.futures
def run(filename, def_param):
... # call external program on `filename`
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith('.npy'))
# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
future_to_file = dict((executor.submit(run, f, ws), f) for f in files)
for future in concurrent.futures.as_completed(future_to_file):
f = future_to_file[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (f, future.exception()))
# run() doesn't return anything so `future.result()` is always `None`
Or if we ignore exceptions raised by run()
:
from itertools import repeat
... # the same
# start threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(run, files, repeat(ws))
# run() doesn't return anything so `map()` results can be ignored
subprocess
+ threading
(manual pool) solution
#!/usr/bin/env python
from __future__ import print_function
import os
import subprocess
import sys
from Queue import Queue
from threading import Thread
def run(filename, def_param):
... # define exe, swt_nam
subprocess.check_call([exe, swt_nam]) # run external program
def worker(queue):
"""Process files from the queue."""
for args in iter(queue.get, None):
try:
run(*args)
except Exception as e: # catch exceptions to avoid exiting the
# thread prematurely
print('%r failed: %s' % (args, e,), file=sys.stderr)
# start threads
q = Queue()
threads = [Thread(target=worker, args=(q,)) for _ in range(8)]
for t in threads:
t.daemon = True # threads die if the program dies
t.start()
# populate files
ws = r'D:\Data\Users\jbellino\Project\stJohnsDeepening\model\xsec_a'
wdir = os.path.join(ws, r'fieldgen\reals')
for f in os.listdir(wdir):
if f.endswith('.npy'):
q.put_nowait((os.path.join(wdir, f), ws))
for _ in threads: q.put_nowait(None) # signal no more files
for t in threads: t.join() # wait for completion
multiprocessing - execute external command and wait before proceeding
You can use subprocess.Popen
to launch the external commands asynchronously, and store each Popen
object returned in a list. Once you've launched all the processes, just iterate over them and wait for each to finish using popen_object.wait
.
from subprocess import Popen
processes = []
for i in range(1,20):
arguments += str(i) + "_image.jpg "
processes.append(subprocess.Popen(shlex.split("./combine" + arguments)))
for p in processes:
p.wait()
subprocess.call("./merge_resized_images")
However, this will launch twenty concurrent processes, which is probably going to hurt performance.
To avoid that, you can use a ThreadPool
to limit yourself to some lower number of concurrent processes (multiprocessing.cpu_count
is a good number), and then use pool.join
to wait for them all to finish.
import multiprocessing
import subprocess
import shlex
from multiprocessing.pool import ThreadPool
def call_proc(cmd):
""" This runs in a separate thread. """
#subprocess.call(shlex.split(cmd)) # This will block until cmd finishes
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
return (out, err)
pool = ThreadPool(multiprocessing.cpu_count())
results = []
for i in range(1,20):
arguments += str(i) + "_image.jpg "
results.append(pool.apply_async(call_proc, ("./combine" + arguments,)))
# Close the pool and wait for each running task to complete
pool.close()
pool.join()
for result in results:
out, err = result.get()
print("out: {} err: {}".format(out, err))
subprocess.call("./merge_resized_images")
Each thread will release the GIL while waiting for the subprocess to complete, so they'll all run in parallel.
How to spawn parallel child processes on a multi-processor system?
What you are looking for is the process pool class in multiprocessing.
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
And here is a calculation example to make it easier to understand. The following will divide 10000 tasks on N processes where N is the cpu count. Note that I'm passing None as the number of processes. This will cause the Pool class to use cpu_count for the number of processes (reference)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
Using the multiprocessing module
Change self.sum = 0
to self.sum = multiprocessing.Value('d', 0.0)
, and use self.sum.value
to access or change the value.
class AdderProcess(multiprocessing.Process):
def __init__(self):
...
self.sum = multiprocessing.Value('d', 0.0)
...
def run(self):
while True:
number = self.queue.get()
self.sum.value += number # <-- use self.sum.value
self.queue.task_done()
def get_result(self):
self.queue.join()
return self.sum.value # <-- use self.sum.value
The problem is this: Once you call self.start()
in __init__
, the main process forks off a child process. All values are copied. Now there are two versions of p
. In the main process, p.sum
is 0. In the child process, the run
method is called and p.sum
is augmented to 2. But when the main process calls p.get_result()
, its version of p
still has p.sum
equal to 0.
So 0 is printed.
When you want to share a float value between processes, you need to use a sharing mechanism, such as mp.Value
.
See "Sharing state between processes" for more options on how to share values.
Correct way of using multiprocessing Process() for parallel execution
Instead of spawning and managing the processes yourself rather use a Pool of workers. It is designed to deal with all of that for you.
As your workers are spawning a subprocess, you can use threads rather than processes.
Moreover, it seems that the workers will write on the same file. You will need to protect its access from concurrent instances or the result will be totally out of order.
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
mutex = Lock()
task_dir = "/tmp/tasks"
def task_fn(task_nr):
"""This function will run in a separate thread."""
cmd_str = ["my_exe","-my_exe_arguments"]
try:
msg = subprocess.check_output(cmd_str, cwd=task_dir, stderr=subprocess.STDOUT, universal_newlines=True)
except subprocess.CalledProcessError as e:
with mutex:
with open("a_unique_PROTECTED_err_log_file.log", "w") as f :
f.write(e.output)
return task_nr
with ThreadPoolExecutor() as pool:
iterator = pool.map(task_fn, range(100000))
for result in iterator:
print("Task %d done" % result)
Python Multiprocessing queue
The queue is actually geting populated. You need to call queue.get() for each time you put an object to the queue. So you just need to call queue.get() one more time.
>>> import multiprocessing
>>> from multiprocessing import Queue
>>> queue = Queue()
>>> jobs = [['a', 'b'], ['c', 'd']]
>>> for job in jobs:
queue.put(job)
>>> queue.get()
['a', 'b']
>>> queue.get()
['c', 'd']
importing and using a module that uses multiprocessing without causing infinite loop on Windows
I don't quite get what you're asking. You don't need to do anything to prevent this from spawning infinitely many processes. I just ran it on Windows XP --- imported the file and ran multi.start()
--- and it completed fine in a couple seconds.
The reason you have to do the if __name__=="__main__"
protection is that, on Windows, multiprocessing has to import the main script in order to run the target function, which means top-level module code in that file will be executed. The problem only arises if that top-level module code itself tries to spawn a new process. In your example, the top level module code doesn't use multiprocessing, so there's no infinite process chain.
Edit: Now I get what you're asking. You don't need to protect multi.py
. You need to protect your main script, whatever it is. If you're getting a crash, it's because in your main script you are doing multi.start()
in the top level module code. Your script needs to look like this:
import multi
if __name__=="__main__":
multi.start()
The "protection" is always needed in the main script.
Parallel Processing in python
A good simple way to start with parallel processing in python is just the pool mapping in mutiprocessing -- its like the usual python maps but individual function calls are spread out over the different number of processes.
Factoring is a nice example of this - you can brute-force check all the divisions spreading out over all available tasks:
from multiprocessing import Pool
import numpy
numToFactor = 976
def isFactor(x):
result = None
div = (numToFactor / x)
if div*x == numToFactor:
result = (x,div)
return result
if __name__ == '__main__':
pool = Pool(processes=4)
possibleFactors = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
print 'Checking ', possibleFactors
result = pool.map(isFactor, possibleFactors)
cleaned = [x for x in result if not x is None]
print 'Factors are', cleaned
This gives me
Checking [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
Factors are [(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]
Related Topics
How to Print Utf-8 Encoded Text to the Console in Python < 3
Differencebetween a Pandas Series and a Single-Column Dataframe
How to Read the Contents of an Url with Python
Plot a Bar Using Matplotlib Using a Dictionary
Determine Complete Django Url Configuration
Best Way to Determine If a Sequence Is in Another Sequence
Combining Two Series into a Dataframe in Pandas
Logging, Streamhandler and Standard Streams
Why Does Python's Multiprocessing Module Import _Main_ When Starting a New Process on Windows
How to Search a Word in a Word 2007 .Docx File
How to Add an Empty Column to a Dataframe
"Ssl: Certificate_Verify_Failed" Error When Scraping Https://Www.Thenewboston.Com/