Multiprocessing.Dummy in Python Is Not Utilising 100% Cpu

multiprocessing.dummy in Python is not utilising 100% cpu

When you use multiprocessing.dummy, you're using threads, not processes:

multiprocessing.dummy replicates the API of multiprocessing but is no
more than a wrapper around the threading module.

That means you're restricted by the Global Interpreter Lock (GIL), and only one thread can actually execute CPU-bound operations at a time. That's going to keep you from fully utilizing your CPUs. If you want get full parallelism across all available cores, you're going to need to address the pickling issue you're hitting with multiprocessing.Pool.

Note that multiprocessing.dummy might still be useful if the work you need to parallelize is IO bound, or utilizes a C-extension that releases the GIL. For pure Python code, however, you'll need multiprocessing.

Why does this parallel search and replace does not use 100% of CPU?

after a bit of playing I think it's because joblib is spending all its time coordinating parallel running of everything and no time actually doing any useful work. at least for me under OSX and Linux — I don't have any MS Windows machines

I started by loading packages, pulling in your code, and generating a dummy file:

from random import choice
import re

from multiprocessing import Pool
from joblib import delayed, Parallel

regex = re.compile(r'a *a|b *b') # of course more complex IRL, with lookbehind/forward
mydict = {'aa': 'A', 'bb': 'B'}

def handler(match):
return mydict[match[0].replace(' ', '')]

def replace_in(tweet):
return re.sub(regex, handler, tweet)

examples = [
"Regex replace isn't that computationally expensive... I would suggest using Pandas, though, rather than just a plain loop",
"Hmm I don't use pandas anywhere else, but if it makes it faster, I'll try! Thanks for the suggestion. Regarding the question: expensive or not, if there is no reason for it to use only 19%, it should use 100%"
"Well, is tweets a generator, or an actual list?",
"an actual list of strings",
"That might be causing the main process to have the 419MB of memory, however, that doesn't mean that list will be copied over to the other processes, which only need to work over slices of the list",
"I think joblib splits the list in roughly equal chunks and sends these chunks to the worker processes.",
"Maybe, but if you use something like this code, 2 million lines should be done in less than a minute (assuming an SSD, and reasonable memory speeds).",
"My point is that you don't need the whole file in memory. You could type tweets.txt | python replacer.py > tweets_replaced.txt, and use the OS's native speeds to replace data line-by-line",
"I will try this",
"no, this is actually slower. My code takes 12mn using joblib.parallel and for line in f_in: f_out.write(re.sub(..., line)) takes 21mn. Concerning CPU and memory usage: CPU is same (17%) and memory much lower (60Mb) using files. But I want to minimize time spent, not memory usage.",
"I moved this to chat because StackOverflow suggested it",
"I don't have experience with joblib. Could you try the same with Pandas? pandas.pydata.org/pandas-docs/…",
]

with open('tweets.txt', 'w') as fd:
for i in range(2_000_000):
print(choice(examples), file=fd)

(see if you can guess where I got the lines from!)

as a baseline, I tried using naive solution:

with open('tweets.txt') as fin, open('tweets2.txt', 'w') as fout:
for l in fin:
fout.write(replace_in(l))

this takes 14.0s (wall clock time) on my OSX laptop, and 5.15s on my Linux desktop. Note that changing your definition of replace_in to use do regex.sub(handler, tweet) instead of re.sub(regex, handler, tweet) reduces the above to 8.6s on my laptop, but I'll not use this change below.

I then tried your joblib package:

with open('tweets.txt') as fin, open('tweets2.txt', 'w') as fout:
with Parallel(n_jobs=-1) as parallel:
for l in parallel(delayed(replace_in)(tweet) for tweet in fin):
fout.write(l)

which takes 1min 16s on my laptop, and 34.2s on my desktop. CPU utilisation was pretty low as the child/worker tasks were all waiting for the coordinator to send them work most of the time.

I then tried using the multiprocessing package:

with open('tweets.txt') as fin, open('tweets2.txt', 'w') as fout:
with Pool() as pool:
for l in pool.map(replace_in, fin, chunksize=1024):
fout.write(l)

which took 5.95s on my laptop and 2.60s on my desktop. I also tried with a chunk size of 8 which took 22.1s and 8.29s respectively. the chunk size allows the pool to send large chunks of work to its children, so it can spend less time coordinating and more time getting useful work done.

I'd therefore hazard a guess that joblib isn't particularly useful for this sort of usage as it doesn't seem to have a notion of chunksize.

No improvements using multiprocessing

You are mapping wrap to (a, b, c), where a is a function and b and c are 100K element vectors. All of this data is pickled when it is sent to the chosen process in the pool, then unpickled when it reaches it. This is to ensure that processes have mutually exclusive access to data.

Your problem is that the pickling process is more expensive than the correlation. As a guideline you want to minimize that amount of information that is sent between processes, and maximize the amount of work each process does, while still being spread across the # of cores on the system.

How to do that depends on the actual problem you're trying to solve. By tweaking your toy example so that your vectors were a bit bigger (1 million elements) and randomly generated in the wrap function, I could get a 2X speedup over single core, by using a process pool with 4 elements. The code looks like this:

def wrap(a):
x = np.random.rand(1000000)
y = np.random.rand(1000000)
return np.correlate(x, y)

p = Pool(4)
p.map(wrap, range(30))

Using multiprocessing in python to return values

If you are using function map, then each item of the iterable player_id_list will be passed as a separate task to function send_player_result. Consequently, this function should no longer be expecting to be passed a list of player ids, but rather a single player id. And, as you know by now, if your tasks are largely I/O bound, then multithreading is a better model. You can either:

from multiprocessing.dummy import Pool
# or
from multiprocessing.pool import ThreadPool

You will probably want to greatly increase the number of threads (but not greater than the size of player_id_list):

#from multiprocessing import Pool
from multiprocessing.dummy import Pool
from typing import Set

def send_player_result(player_id):
success = player_api.send_results(player_id, user=user, send_health_results=True)
return success

# Only required for Windows if you are doing multiprocessing:
if __name__ == '__main__':

pool_size = 5 # This is a number can be increased to get more concurrency

# Caller
failed_player_ids: Set[str] = set()
with Pool(pool_size) as pool:
results = pool.map(func=send_player_result, iterable=player_id_list)
for idx, success in enumerate(results):
if not success:
# failed for argument player_id_list[idx]:
failed_player_ids.add(player_id_list[idx])



Related Topics



Leave a reply



Submit