Multiple Instances of Python Running Simultaneously Limited to 35

Multiple instances of Python running simultaneously limited to 35

Decomposing the Error Message

Your error message includes the following hint:

OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 1024 current, 2067021 max

The RLIMIT_NPROC variable controls the total number of processes that user can have. More specifically, as it is a per process setting, when fork(), clone(), vfork(), &c are called by a process, the RLIMIT_NPROC value for that process is compared to the total process count for that process's parent user. If that value is exceeded, things shut down, as you've experienced.

The error message indicates that OpenBLAS was unable to create additional threads because your user had used all the threads RLIMIT_NPROC had given it.

Since you're running on a cluster, it's unlikely that your user is running many threads (unlike, say, if you were on your personal machine and browsing the web, playing music, &c), so it's reasonable to conclude that OpenBLAS is trying to start multiple threads.

How OpenBLAS Uses Threads

OpenBLAS can use multiple threads to accelerate linear algebra. You may want many threads for solving a single, larger problem quickly. You may want fewer threads for solving many smaller problems simultaneously.

OpenBLAS has several ways to limit the number of threads it uses. These are controlled via:

export OPENBLAS_NUM_THREADS=4
export GOTO_NUM_THREADS=4
export OMP_NUM_THREADS=4

The priorities are OPENBLAS_NUM_THREADS > GOTO_NUM_THREADS > OMP_NUM_THREADS. (I think this means that OPENBLAS_NUM_THREADS overrides OMP_NUM_THREADS; however, OpenBLAS ignores OPENBLAS_NUM_THREADS and GOTO_NUM_THREADS when compiled with USE_OPENMP=1.)

If none of the foregoing variables are set, OpenBLAS will run using a number of threads equal to the number of cores on your machine (32 on your machine)

Your Situation

Your cluster has 32-core CPUs. You're trying to run 36 instances of Python. Each instance requires 1 thread for Python + 32 threads for OpenBLAS. You'll also need 1 thread for your SSH connection and 1 thread for your shell. That means that you need 36*(32+1)+2=1190 threads.

The nuclear option for fixing the problem is to use:

export OPENBLAS_NUM_THREADS=1

which should bring you down to 36*(1+1)+2=74 threads.

Since you have spare capacity, you could adjust OPENBLAS_NUM_THREADS to a higher value, but then the OpenBLAS instances owned by your separate Python processes will interfere with each other. So there's a trade-off between how fast you get one solution versus how fast you can get many solutions. Ideally, you can solve this trade-off by running fewer Pythons per node and using more nodes.

OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable

This is for others in the future who encounter this error. The cluster setup most likely limits the number processes that can be run by a user on an interactive node. The clue is in the second line of the error:

OpenBLAS blas_thread_init: pthread_create: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 64 current, 64 max

Here the limit is set to 64. While this is quite sufficient for normal CLI use, it's probably not sufficient for interactively running Keras jobs (like the OP); or in my case, trying to run an interactive Dask cluster.

It may be possible to increase the limit from your shell using, say ulimit -u 10000, but that's not guaranteed to work. It's best to notify the admins like the OP.

Executing one function multiple times simultaneously

  • target=func1() calls func1 and passes its return value (in this case None) as the target argument for Process instead of passing the function func1 itself.

  • Process(...) just creates a Process object. You never really spawn/execute any of the created Process objects. You need to add a call to .start. Depending on the OS (ie if os.fork is used to spawn the new process), this will also require us to add __main__ guard (which is a best-practice anyway regardless of the used OS).

  • You are using p1 twice.

Try this:

from multiprocessing import Process
import time

def func1():
time.sleep(1)
print('Finished sleeping')

if __name__ == '__main__':
t1_start = time.perf_counter()
p1 = Process(target=func1) # note no ()
p2 = Process(target=func1) # note no ()

p1.start()
p2.start()

t1_stop = time.perf_counter()

print("elapsed time: {} sec".format(round(t1_stop - t1_start), 1))

This gives the output

elapsed time: 0 sec
Finished sleepingFinished sleeping

which makes sense. If the function is executed 2 times in separated processes then the main process, the main process would not execute time.sleep(1) at all.

However, if we add .join() then the main process will be forced to wait for the child processes:

p1.start()
p2.start()

p1.join()
p2.join()

Now the output is the same as your required output:

Finished sleeping
Finished sleeping
elapsed time: 1 sec

EDIT: If you want an arbitrary number of processes with a loop:

...
times = 3
processes = []
for _ in range(times):
p = Process(target=func1)
p.start()
processes.append(p)

for p in processes:
p.join()
...

Using multiprocessing.Process with a maximum number of simultaneous processes

It might be most sensible to use multiprocessing.Pool which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count() method to count the number of cores on a given system, if needed in your code.

Edit: Here's some draft code that seems to work for your specific case:

import multiprocessing

def f(name):
print 'hello', name

if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()

python: calling method over multiple instances parallelly

Ok, let's do it. First the code(multiprocessing docs):

In [1]: from multiprocessing import Process

In [2]: def f():
...: print(1)
...: for i in range(100):
...: # do something
...: pass
...:

In [3]: p1 = Process(target=f)

In [4]: p1.start()

1
In [5]: p2 = Process(target=f)

In [6]: p2.start()

1
In [7]: import time

In [8]: def f():
...: for i in range(100):
...: print(i)
...: # do something
...: time.sleep(1)
...: pass
...:
In [9]: p1 = Process(target=f)
In [9]: p1 = Process(target=f)

In [10]: p1.start()

0
In [11]: p2 1
= Process(target=f)2
3
4
5
In [11]: p2 = Process(target=f)

In [12]: 6
p2.7
start8
In [12]: p2.start()

0
In [13]: 9

This is an example of how a function can be called in parallel. From In [10]: p1.start() you can see the output gets jumbled because program p1 is running in parallel while we run program p2.

When running the program in a Python script you want to make sure script only ends when all the programs have executed successfully. You can do this by

def multi_process(instance_params, *funcs):
process = []
for f in funcs:
prog = Process(target=f, args=instance_params)
prog.start()
process.append(prog)
for p in process:
p.join()

multi_process(params, f, f)

Python doesn't have C++ or Java like multithreading support because of GIL. Read about it here. Though if your program is such that it does more I/O operations then CPU intensive tasks then you can use multithreading. For performing CPU intensive tasks multiprocessing is recommended.

In comment @ytutow asked what is difference between pool of workers and process. From Pymotw:

The Pool class can be used to manage a fixed number of workers for
simple cases where the work to be done can be broken up and
distributed between workers independently.

The return values from the jobs are collected and returned as a list.

The pool arguments include the number of processes and a function to
run when starting the task process (invoked once per child).

You can use Pool as:

def your_instance_method(instance):
instances.do_some_computation_over_a_dataset()

with Pool(3) as p:
instances = [insatnce_1, instance_2, instance_3]
print(p.map(your_instance_method, instances))

About the correct number of workers, it's gereral recommendation to have 2*cpu_cores number of workers.

Multiprocessing with pool in python: About several instances with same name at the same time

Actually, you will have 8 instances of SomeKindOfClass (one for each worker), but only 4 will ever be active at the same time.

multiprocessing vs multiprocessing.dummy

Your program will only work if you continue to use the multiprocessing.dummy module, which is just a wrapper around the threading module. You are still using "python threads" (not separate processes). "Python threads" share the same global state; "Processes" don't. Python threads also share the same GIL, so they're still limited to running one python bytecode statement at a time, unlike processes, which can all run python code simultaneously.

If you were to change your import to from multiprocessing import Pool, you would notice that the allOutputs array remains unchanged after all the workers finish executing (also, you would likely get an error because you're creating the pool in the global scope, you should probably put that inside a main() function). This is because multiprocessing makes a new copy of the entire global state when it makes a new process. When the worker modifies the global allOutputs, it will be modifying a copy of that initial global state. When the process ends, nothing will be returned to the main process and the global state of the main process will remain unchanged.

Sharing State Between Processes

Unlike threads, processes aren't sharing the same memory

If you want to share state between processes, you have to explicitly declare shared variables and pass them to each process, or use pipes or some other method to allow the worker processes to communicate with each other or with the main process.

There are several ways to do this, but perhaps the simplest is using the Manager class

import multiprocessing

def worker(args):
index, array = args
a = SomeKindOfClass()
a.some_expensive_function()
array[index] = a.output

def main():
n = 8
manager = multiprocessing.Manager()
array = manager.list([0] * n)
pool = multiprocessing.Pool(4)
pool.map(worker, [(i, array) for i in range(n)])
print array

How to run multiple python scripts simultaneously from a wrapper script in such a way that CPU utilization is maximized?

This is a technique I developed for calling many external programs using subprocess.Popen. In this example, I'm calling convert make JPEG images from DICOM files.

In short; it uses manageprocs to keep checking a list of running subprocesses. If one is finished, it is removed and a new one is started as long as unprocesses files remain. After that, the remaining processes are watched until they are all finished.

from datetime import datetime
from functools import partial
import argparse
import logging
import os
import subprocess as sp
import sys
import time

def main():
"""
Entry point for dicom2jpg.
"""
args = setup()
if not args.fn:
logging.error("no files to process")
sys.exit(1)
if args.quality != 80:
logging.info(f"quality set to {args.quality}")
if args.level:
logging.info("applying level correction.")
start_partial = partial(start_conversion, quality=args.quality, level=args.level)

starttime = str(datetime.now())[:-7]
logging.info(f"started at {starttime}.")
# List of subprocesses
procs = []
# Do not launch more processes concurrently than your CPU has cores.
# That will only lead to the processes fighting over CPU resources.
maxprocs = os.cpu_count()
# Launch and mange subprocesses for all files.
for path in args.fn:
while len(procs) == maxprocs:
manageprocs(procs)
procs.append(start_partial(path))
# Wait for all subprocesses to finish.
while len(procs) > 0:
manageprocs(procs)
endtime = str(datetime.now())[:-7]
logging.info(f"completed at {endtime}.")

def start_conversion(filename, quality, level):
"""
Convert a DICOM file to a JPEG file.

Removing the blank areas from the Philips detector.

Arguments:
filename: name of the file to convert.
quality: JPEG quality to apply
level: Boolean to indicate whether level adustment should be done.
Returns:
Tuple of (input filename, output filename, subprocess.Popen)
"""
outname = filename.strip() + ".jpg"
size = "1574x2048"
args = [
"convert",
filename,
"-units",
"PixelsPerInch",
"-density",
"300",
"-depth",
"8",
"-crop",
size + "+232+0",
"-page",
size + "+0+0",
"-auto-gamma",
"-quality",
str(quality),
]
if level:
args += ["-level", "-35%,70%,0.5"]
args.append(outname)
proc = sp.Popen(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return (filename, outname, proc)

def manageprocs(proclist):
"""Check a list of subprocesses for processes that have ended and
remove them from the list.

Arguments:
proclist: List of tuples. The last item in the tuple must be
a subprocess.Popen object.
"""
for item in proclist:
filename, outname, proc = item
if proc.poll() is not None:
logging.info(f"conversion of “{filename}” to “{outname}” finished.")
proclist.remove(item)
# since manageprocs is called from a loop, keep CPU usage down.
time.sleep(0.05)

if __name__ == "__main__":
main()

I've left out setup(); it's using argparse to deal with command-line arguments.

Here the thing to be processed is just a list of file names.
But it could also be (in your case) a list of tuples of script names and arguments.



Related Topics



Leave a reply



Submit