Multiple instances of Python running simultaneously limited to 35
Decomposing the Error Message
Your error message includes the following hint:
OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 1024 current, 2067021 max
The RLIMIT_NPROC
variable controls the total number of processes that user can have. More specifically, as it is a per process setting, when fork()
, clone()
, vfork()
, &c are called by a process, the RLIMIT_NPROC
value for that process is compared to the total process count for that process's parent user. If that value is exceeded, things shut down, as you've experienced.
The error message indicates that OpenBLAS was unable to create additional threads because your user had used all the threads RLIMIT_NPROC
had given it.
Since you're running on a cluster, it's unlikely that your user is running many threads (unlike, say, if you were on your personal machine and browsing the web, playing music, &c), so it's reasonable to conclude that OpenBLAS is trying to start multiple threads.
How OpenBLAS Uses Threads
OpenBLAS can use multiple threads to accelerate linear algebra. You may want many threads for solving a single, larger problem quickly. You may want fewer threads for solving many smaller problems simultaneously.
OpenBLAS has several ways to limit the number of threads it uses. These are controlled via:
export OPENBLAS_NUM_THREADS=4
export GOTO_NUM_THREADS=4
export OMP_NUM_THREADS=4
The priorities are OPENBLAS_NUM_THREADS > GOTO_NUM_THREADS > OMP_NUM_THREADS. (I think this means that OPENBLAS_NUM_THREADS
overrides OMP_NUM_THREADS
; however, OpenBLAS ignores OPENBLAS_NUM_THREADS
and GOTO_NUM_THREADS
when compiled with USE_OPENMP=1
.)
If none of the foregoing variables are set, OpenBLAS will run using a number of threads equal to the number of cores on your machine (32 on your machine)
Your Situation
Your cluster has 32-core CPUs. You're trying to run 36 instances of Python. Each instance requires 1 thread for Python + 32 threads for OpenBLAS. You'll also need 1 thread for your SSH connection and 1 thread for your shell. That means that you need 36*(32+1)+2=1190 threads.
The nuclear option for fixing the problem is to use:
export OPENBLAS_NUM_THREADS=1
which should bring you down to 36*(1+1)+2=74 threads.
Since you have spare capacity, you could adjust OPENBLAS_NUM_THREADS
to a higher value, but then the OpenBLAS instances owned by your separate Python processes will interfere with each other. So there's a trade-off between how fast you get one solution versus how fast you can get many solutions. Ideally, you can solve this trade-off by running fewer Pythons per node and using more nodes.
OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable
This is for others in the future who encounter this error. The cluster setup most likely limits the number processes that can be run by a user on an interactive node. The clue is in the second line of the error:
OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 64 current, 64 max
Here the limit is set to 64. While this is quite sufficient for normal CLI use, it's probably not sufficient for interactively running Keras jobs (like the OP); or in my case, trying to run an interactive Dask cluster.
It may be possible to increase the limit from your shell using, say ulimit -u 10000
, but that's not guaranteed to work. It's best to notify the admins like the OP.
Executing one function multiple times simultaneously
target=func1()
callsfunc1
and passes its return value (in this caseNone
) as the target argument forProcess
instead of passing the functionfunc1
itself.Process(...)
just creates aProcess
object. You never really spawn/execute any of the createdProcess
objects. You need to add a call to.start
. Depending on the OS (ie ifos.fork
is used to spawn the new process), this will also require us to add__main__
guard (which is a best-practice anyway regardless of the used OS).You are using
p1
twice.
Try this:
from multiprocessing import Process
import time
def func1():
time.sleep(1)
print('Finished sleeping')
if __name__ == '__main__':
t1_start = time.perf_counter()
p1 = Process(target=func1) # note no ()
p2 = Process(target=func1) # note no ()
p1.start()
p2.start()
t1_stop = time.perf_counter()
print("elapsed time: {} sec".format(round(t1_stop - t1_start), 1))
This gives the output
elapsed time: 0 sec
Finished sleepingFinished sleeping
which makes sense. If the function is executed 2 times in separated processes then the main process, the main process would not execute time.sleep(1)
at all.
However, if we add .join()
then the main process will be forced to wait for the child processes:
p1.start()
p2.start()
p1.join()
p2.join()
Now the output is the same as your required output:
Finished sleeping
Finished sleeping
elapsed time: 1 sec
EDIT: If you want an arbitrary number of processes with a loop:
...
times = 3
processes = []
for _ in range(times):
p = Process(target=func1)
p.start()
processes.append(p)
for p in processes:
p.join()
...
Using multiprocessing.Process with a maximum number of simultaneous processes
It might be most sensible to use multiprocessing.Pool
which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.
The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
And it's also handy to know that there is the multiprocessing.cpu_count()
method to count the number of cores on a given system, if needed in your code.
Edit: Here's some draft code that seems to work for your specific case:
import multiprocessing
def f(name):
print 'hello', name
if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()
python: calling method over multiple instances parallelly
Ok, let's do it. First the code(multiprocessing docs):
In [1]: from multiprocessing import Process
In [2]: def f():
...: print(1)
...: for i in range(100):
...: # do something
...: pass
...:
In [3]: p1 = Process(target=f)
In [4]: p1.start()
1
In [5]: p2 = Process(target=f)
In [6]: p2.start()
1
In [7]: import time
In [8]: def f():
...: for i in range(100):
...: print(i)
...: # do something
...: time.sleep(1)
...: pass
...:
In [9]: p1 = Process(target=f)
In [9]: p1 = Process(target=f)
In [10]: p1.start()
0
In [11]: p2 1
= Process(target=f)2
3
4
5
In [11]: p2 = Process(target=f)
In [12]: 6
p2.7
start8
In [12]: p2.start()
0
In [13]: 9
This is an example of how a function can be called in parallel. From In [10]: p1.start()
you can see the output gets jumbled because program p1 is running in parallel while we run program p2.
When running the program in a Python script you want to make sure script only ends when all the programs have executed successfully. You can do this by
def multi_process(instance_params, *funcs):
process = []
for f in funcs:
prog = Process(target=f, args=instance_params)
prog.start()
process.append(prog)
for p in process:
p.join()
multi_process(params, f, f)
Python doesn't have C++ or Java like multithreading support because of GIL. Read about it here. Though if your program is such that it does more I/O operations then CPU intensive tasks then you can use multithreading. For performing CPU intensive tasks multiprocessing is recommended.
In comment @ytutow asked what is difference between pool of workers and process. From Pymotw:
The Pool class can be used to manage a fixed number of workers for
simple cases where the work to be done can be broken up and
distributed between workers independently.The return values from the jobs are collected and returned as a list.
The pool arguments include the number of processes and a function to
run when starting the task process (invoked once per child).
You can use Pool as:
def your_instance_method(instance):
instances.do_some_computation_over_a_dataset()
with Pool(3) as p:
instances = [insatnce_1, instance_2, instance_3]
print(p.map(your_instance_method, instances))
About the correct number of workers, it's gereral recommendation to have 2*cpu_cores number of workers.
Multiprocessing with pool in python: About several instances with same name at the same time
Actually, you will have 8
instances of SomeKindOfClass
(one for each worker), but only 4 will ever be active at the same time.
multiprocessing
vs multiprocessing.dummy
Your program will only work if you continue to use the multiprocessing.dummy
module, which is just a wrapper around the threading
module. You are still using "python threads" (not separate processes). "Python threads" share the same global state; "Processes" don't. Python threads also share the same GIL, so they're still limited to running one python bytecode statement at a time, unlike processes, which can all run python code simultaneously.
If you were to change your import to from multiprocessing import Pool
, you would notice that the allOutputs
array remains unchanged after all the workers finish executing (also, you would likely get an error because you're creating the pool in the global scope, you should probably put that inside a main()
function). This is because multiprocessing
makes a new copy of the entire global state when it makes a new process. When the worker modifies the global allOutputs
, it will be modifying a copy of that initial global state. When the process ends, nothing will be returned to the main process and the global state of the main process will remain unchanged.
Sharing State Between Processes
Unlike threads, processes aren't sharing the same memory
If you want to share state between processes, you have to explicitly declare shared variables and pass them to each process, or use pipes or some other method to allow the worker processes to communicate with each other or with the main process.
There are several ways to do this, but perhaps the simplest is using the Manager
class
import multiprocessing
def worker(args):
index, array = args
a = SomeKindOfClass()
a.some_expensive_function()
array[index] = a.output
def main():
n = 8
manager = multiprocessing.Manager()
array = manager.list([0] * n)
pool = multiprocessing.Pool(4)
pool.map(worker, [(i, array) for i in range(n)])
print array
How to run multiple python scripts simultaneously from a wrapper script in such a way that CPU utilization is maximized?
This is a technique I developed for calling many external programs using subprocess.Popen
. In this example, I'm calling convert
make JPEG images from DICOM files.
In short; it uses manageprocs
to keep checking a list of running subprocesses. If one is finished, it is removed and a new one is started as long as unprocesses files remain. After that, the remaining processes are watched until they are all finished.
from datetime import datetime
from functools import partial
import argparse
import logging
import os
import subprocess as sp
import sys
import time
def main():
"""
Entry point for dicom2jpg.
"""
args = setup()
if not args.fn:
logging.error("no files to process")
sys.exit(1)
if args.quality != 80:
logging.info(f"quality set to {args.quality}")
if args.level:
logging.info("applying level correction.")
start_partial = partial(start_conversion, quality=args.quality, level=args.level)
starttime = str(datetime.now())[:-7]
logging.info(f"started at {starttime}.")
# List of subprocesses
procs = []
# Do not launch more processes concurrently than your CPU has cores.
# That will only lead to the processes fighting over CPU resources.
maxprocs = os.cpu_count()
# Launch and mange subprocesses for all files.
for path in args.fn:
while len(procs) == maxprocs:
manageprocs(procs)
procs.append(start_partial(path))
# Wait for all subprocesses to finish.
while len(procs) > 0:
manageprocs(procs)
endtime = str(datetime.now())[:-7]
logging.info(f"completed at {endtime}.")
def start_conversion(filename, quality, level):
"""
Convert a DICOM file to a JPEG file.
Removing the blank areas from the Philips detector.
Arguments:
filename: name of the file to convert.
quality: JPEG quality to apply
level: Boolean to indicate whether level adustment should be done.
Returns:
Tuple of (input filename, output filename, subprocess.Popen)
"""
outname = filename.strip() + ".jpg"
size = "1574x2048"
args = [
"convert",
filename,
"-units",
"PixelsPerInch",
"-density",
"300",
"-depth",
"8",
"-crop",
size + "+232+0",
"-page",
size + "+0+0",
"-auto-gamma",
"-quality",
str(quality),
]
if level:
args += ["-level", "-35%,70%,0.5"]
args.append(outname)
proc = sp.Popen(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return (filename, outname, proc)
def manageprocs(proclist):
"""Check a list of subprocesses for processes that have ended and
remove them from the list.
Arguments:
proclist: List of tuples. The last item in the tuple must be
a subprocess.Popen object.
"""
for item in proclist:
filename, outname, proc = item
if proc.poll() is not None:
logging.info(f"conversion of “{filename}” to “{outname}” finished.")
proclist.remove(item)
# since manageprocs is called from a loop, keep CPU usage down.
time.sleep(0.05)
if __name__ == "__main__":
main()
I've left out setup()
; it's using argparse
to deal with command-line arguments.
Here the thing to be processed is just a list of file names.
But it could also be (in your case) a list of tuples of script names and arguments.
Related Topics
How to Crop an Image in Opencv Using Python
How to Install a Script to Run Anywhere from the Command Line
Using Python 32 Bit in 64Bit Platform
Which Command to Use for Checking Whether Python Is 64Bit or 32Bit
How to Deal with Linux/Python Dependencies
How to Get a Process's Stdin by a Process Id
Why am I Getting Socket.Gaierror: [Errno -2] from Python Httplib
Why Does Os.Path.Getsize() Return a Negative Number for a 10Gb File
Virtualenv Uses Wrong Python, Even Though It Is First in $Path
Typeerror: 'Str' Does Not Support the Buffer Interface
How to Read a (Static) File from Inside a Python Package
Pipe Raw Opencv Images to Ffmpeg
Matplotlib-Animation "No Moviewriters Available"
How to Find the Target File's Full(Absolute Path) of the Symbolic Link or Soft Link in Python
How to Find Out the Date of the Last Saturday in Linux Shell Script or Python