Is There a Simple Process-Based Parallel Map for Python

Is there a simple process-based parallel map for python?

I seems like what you need is the map method in multiprocessing.Pool():

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the
process pool as separate tasks. The (approximate) size of these chunks can be
specified by setting chunksize to a positive integ

For example, if you wanted to map this function:

def f(x):
return x**2

to range(10), you could do it using the built-in map() function:

map(f, range(10))

or using a multiprocessing.Pool() object's method map():

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))

Python 3 does inbuilt map function using parallel

It's sequential because map the higher-order function in general has to apply a function to data and return the results in the same order as the original data:

map(f, [1,2,3,4]) => [f(1), f(2), f(3), f(4)]

Making it parallel will introduce the need of synchronisation, which'll defeat the purpose of parallelism.

multiprocessing.Pool.map is a parallel version of the built-in map that will split the workload into chunks and correctly organise the results.

python parallel map (multiprocessing.Pool.map) with global data

You need the list glob_data to be backed by shared memory, Multiprocessing's Manager gives you just that:

import multiprocessing as multi
from multiprocessing import Manager

manager = Manager()

glob_data = manager.list([])

def func(a):
glob_data.append(a)

map(func,range(10))
print glob_data # [0,1,2,3,4 ... , 9] Good.

p = multi.Pool(processes=8)
p.map(func,range(80))

print glob_data # Super Good.

For some background:

https://docs.python.org/3/library/multiprocessing.html#managers

How to process a list in parallel in Python?

Assuming CPython and the GIL here.

If your task is I/O bound, in general, threading may be more efficient since the threads are simply dumping work on the operating system and idling until the I/O operation finishes. Spawning processes is a heavy way to babysit I/O.

However, most file systems aren't concurrent, so using multithreading or multiprocessing may not be any faster than synchronous writes.

Nonetheless, here's a contrived example of multiprocessing.Pool.map which may help with your CPU-bound work:

from multiprocessing import cpu_count, Pool

def process(data):
# best to do heavy CPU-bound work here...

# file write for demonstration
with open("%s.txt" % data, "w") as f:
f.write(data)

# example of returning a result to the map
return data.upper()

tasks = ["data1", "data2", "data3"]
pool = Pool(cpu_count() - 1)
print(pool.map(process, tasks))

A similar setup for threading can be found in concurrent.futures.ThreadPoolExecutor.

As an aside, all is a builtin function and isn't a great variable name choice.

How to do parallel programming in Python?

You can use the multiprocessing module. For this case I might use a processing pool:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

If you want to map a list to a single function you would do this:

args = [A, B]
results = pool.map(solve1, args)

Don't use threads because the GIL locks any operations on python objects.

How to process a list in parallel in Python?

Assuming CPython and the GIL here.

If your task is I/O bound, in general, threading may be more efficient since the threads are simply dumping work on the operating system and idling until the I/O operation finishes. Spawning processes is a heavy way to babysit I/O.

However, most file systems aren't concurrent, so using multithreading or multiprocessing may not be any faster than synchronous writes.

Nonetheless, here's a contrived example of multiprocessing.Pool.map which may help with your CPU-bound work:

from multiprocessing import cpu_count, Pool

def process(data):
# best to do heavy CPU-bound work here...

# file write for demonstration
with open("%s.txt" % data, "w") as f:
f.write(data)

# example of returning a result to the map
return data.upper()

tasks = ["data1", "data2", "data3"]
pool = Pool(cpu_count() - 1)
print(pool.map(process, tasks))

A similar setup for threading can be found in concurrent.futures.ThreadPoolExecutor.

As an aside, all is a builtin function and isn't a great variable name choice.

Parallel Processing in python

A good simple way to start with parallel processing in python is just the pool mapping in mutiprocessing -- its like the usual python maps but individual function calls are spread out over the different number of processes.

Factoring is a nice example of this - you can brute-force check all the divisions spreading out over all available tasks:

from multiprocessing import Pool
import numpy

numToFactor = 976

def isFactor(x):
result = None
div = (numToFactor / x)
if div*x == numToFactor:
result = (x,div)
return result

if __name__ == '__main__':
pool = Pool(processes=4)
possibleFactors = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
print 'Checking ', possibleFactors
result = pool.map(isFactor, possibleFactors)
cleaned = [x for x in result if not x is None]
print 'Factors are', cleaned

This gives me

Checking  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
Factors are [(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]

Python multiprocessing pool.map doesn't work parallel

The multiprocessing.Pool() documentation ( since ever, Py27 incl. ) is clear in intentionally blocking in processing the queue-of-calls as created by the iterator-generated set of the just -4- calls, produced sequentially from the above posted example.

The multiprocessing-module documentation says this about its Pool.map() method:

map(func, iterable[, chunksize])


A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.

This should be the observed behaviour, whereas different instantiation methods would accrue different add-on ( process copying-related ) overhead costs.

Anyway, the mp.cpu_count() need not be the number of CPU-cores any such dispatched .Pool()-instance workers' tasks will get on to get executed, because of the O/S ( user/process-related restriction policies ) settings of affinity:

Your code will have to "obey" the sub-set of those CPU-cores, that are permitted to be harnessed by any such multiprocessing-requested sub-process,
the number of which is not higher than: len( os.sched_getaffinity( 0 ) )


The Best Next Step : re-evaluate your whole code-execution eco-system

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642

def test_function( i = -1 ):
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; callerframerecord = inspect.stack()[1] # 1 represents line at caller
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] )
pass; _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i ),
"invoked from {0:}()-LINE[{1:_>4d}]".format( _CALLER_.function, _CALLER_.lineno )
)
time.sleep( 10 )
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] ) # 1 represents line at caller
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i )
)

if __name__ == '__main__':
print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
#------mp.Pool()-----------------------------------------------------
pool = mp.Pool( mp.cpu_count() )
print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
#---.map()--------------------------------------?
#---.map( ?
pool.map( test_function, [i for i in range(4) ] )
#---.map( ?
#---.map()--------------------------------------?
print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
pool.close()
#---.close()
print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
pool.join()
#---.join()
print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic() ) )
print( "EXECUTED on {0:}".format( platform.version() ) )
print( "USING: python-{0:}:".format( platform.python_version() ) )

might look about something like this on linux-class O/S:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:

Check the hidden detail - what your O/S uses for invoking the test_function() - the mapstar() ( not being a sure choice universally ) was the local SMP-linux-class O/S's choice for its default sub-process instantiation method, performed via 'fork'.

How do I parallelize a simple Python loop?

Using multiple threads on CPython won't give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing module instead:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Note that this won't work in the interactive interpreter.

To avoid the usual FUD around the GIL: There wouldn't be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.



Related Topics



Leave a reply



Submit