Is there a simple process-based parallel map for python?
I seems like what you need is the map method in multiprocessing.Pool():
For example, if you wanted to map this function:map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.
This method chops the iterable into a number of chunks which it submits to the
process pool as separate tasks. The (approximate) size of these chunks can be
specified by setting chunksize to a positive integ
def f(x):
return x**2
to range(10), you could do it using the built-in map() function:map(f, range(10))
or using a multiprocessing.Pool() object's method map():import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))
Python 3 does inbuilt map function using parallel
It's sequential because map
the higher-order function in general has to apply a function to data and return the results in the same order as the original data:
map(f, [1,2,3,4]) => [f(1), f(2), f(3), f(4)]
multiprocessing.Pool.map
is a parallel version of the built-in map
that will split the workload into chunks and correctly organise the results.
python parallel map (multiprocessing.Pool.map) with global data
You need the list glob_data
to be backed by shared memory, Multiprocessing's Manager gives you just that:
import multiprocessing as multi
from multiprocessing import Manager
manager = Manager()
glob_data = manager.list([])
def func(a):
glob_data.append(a)
map(func,range(10))
print glob_data # [0,1,2,3,4 ... , 9] Good.
p = multi.Pool(processes=8)
p.map(func,range(80))
print glob_data # Super Good.
For some background: https://docs.python.org/3/library/multiprocessing.html#managers
How to process a list in parallel in Python?
Assuming CPython and the GIL here.
If your task is I/O bound, in general, threading may be more efficient since the threads are simply dumping work on the operating system and idling until the I/O operation finishes. Spawning processes is a heavy way to babysit I/O.
However, most file systems aren't concurrent, so using multithreading or multiprocessing may not be any faster than synchronous writes.
Nonetheless, here's a contrived example of multiprocessing.Pool.map
which may help with your CPU-bound work:
from multiprocessing import cpu_count, Pool
def process(data):
# best to do heavy CPU-bound work here...
# file write for demonstration
with open("%s.txt" % data, "w") as f:
f.write(data)
# example of returning a result to the map
return data.upper()
tasks = ["data1", "data2", "data3"]
pool = Pool(cpu_count() - 1)
print(pool.map(process, tasks))
A similar setup for threading can be found in concurrent.futures.ThreadPoolExecutor
.As an aside, all
is a builtin function and isn't a great variable name choice.
How to do parallel programming in Python?
You can use the multiprocessing module. For this case I might use a processing pool:
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
This will spawn processes that can do generic work for you. Since we did not pass processes
, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.If you want to map a list to a single function you would do this:
args = [A, B]
results = pool.map(solve1, args)
Don't use threads because the GIL locks any operations on python objects. How to process a list in parallel in Python?
Assuming CPython and the GIL here.
If your task is I/O bound, in general, threading may be more efficient since the threads are simply dumping work on the operating system and idling until the I/O operation finishes. Spawning processes is a heavy way to babysit I/O.
However, most file systems aren't concurrent, so using multithreading or multiprocessing may not be any faster than synchronous writes.
Nonetheless, here's a contrived example of multiprocessing.Pool.map
which may help with your CPU-bound work:
from multiprocessing import cpu_count, Pool
def process(data):
# best to do heavy CPU-bound work here...
# file write for demonstration
with open("%s.txt" % data, "w") as f:
f.write(data)
# example of returning a result to the map
return data.upper()
tasks = ["data1", "data2", "data3"]
pool = Pool(cpu_count() - 1)
print(pool.map(process, tasks))
A similar setup for threading can be found in concurrent.futures.ThreadPoolExecutor
.As an aside, all
is a builtin function and isn't a great variable name choice.
Parallel Processing in python
A good simple way to start with parallel processing in python is just the pool mapping in mutiprocessing -- its like the usual python maps but individual function calls are spread out over the different number of processes.
Factoring is a nice example of this - you can brute-force check all the divisions spreading out over all available tasks:
from multiprocessing import Pool
import numpy
numToFactor = 976
def isFactor(x):
result = None
div = (numToFactor / x)
if div*x == numToFactor:
result = (x,div)
return result
if __name__ == '__main__':
pool = Pool(processes=4)
possibleFactors = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
print 'Checking ', possibleFactors
result = pool.map(isFactor, possibleFactors)
cleaned = [x for x in result if not x is None]
print 'Factors are', cleaned
This gives meChecking [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
Factors are [(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]
Python multiprocessing pool.map doesn't work parallel
The multiprocessing.Pool()
documentation ( since ever, Py27 incl. ) is clear in intentionally blocking in processing the queue-of-calls as created by the iterator-generated set of the just -4- calls, produced sequentially from the above posted example.
The multiprocessing
-module documentation says this about its Pool.map()
method:
This should be the observed behaviour, whereas different instantiation methods would accrue different add-on ( process copying-related ) overhead costs.
map(func, iterable[, chunksize])
A parallel equivalent of themap()
built-in function (it supports only one iterable argument though). It blocks until the result is ready.
Anyway, the mp.cpu_count()
need not be the number of CPU-cores any such dispatched .Pool()
-instance workers' tasks will get on to get executed, because of the O/S ( user/process-related restriction policies ) settings of affinity:
Your code will have to "obey" the sub-set of those CPU-cores, that are permitted to be harnessed by any such multiprocessing
-requested sub-process,
the number of which is not higher than: len( os.sched_getaffinity( 0 ) )
The Best Next Step : re-evaluate your whole code-execution eco-system
import multiprocessing as mp # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642
def test_function( i = -1 ):
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; callerframerecord = inspect.stack()[1] # 1 represents line at caller
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] )
pass; _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i ),
"invoked from {0:}()-LINE[{1:_>4d}]".format( _CALLER_.function, _CALLER_.lineno )
)
time.sleep( 10 )
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] ) # 1 represents line at caller
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i )
)
if __name__ == '__main__':
print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
#------mp.Pool()-----------------------------------------------------
pool = mp.Pool( mp.cpu_count() )
print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
#---.map()--------------------------------------?
#---.map( ?
pool.map( test_function, [i for i in range(4) ] )
#---.map( ?
#---.map()--------------------------------------?
print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
pool.close()
#---.close()
print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
pool.join()
#---.join()
print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic() ) )
print( "EXECUTED on {0:}".format( platform.version() ) )
print( "USING: python-{0:}:".format( platform.python_version() ) )
might look about something like this on linux-class O/S:
(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d
EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:
Check the hidden detail - what your O/S uses for invoking the
test_function()
- the mapstar()
( not being a sure choice universally ) was the local SMP-linux-class O/S's choice for its default sub-process instantiation method, performed via 'fork
'. How do I parallelize a simple Python loop?
Using multiple threads on CPython won't give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing
module instead:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Note that this won't work in the interactive interpreter.To avoid the usual FUD around the GIL: There wouldn't be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.
Related Topics
Hide Chromedriver Console in Python
How to Change Tcp Keepalive Timer Using Python Script
Plotting Results of Hierarchical Clustering Ontop of a Matrix of Data in Python
Loading Initial Data with Django 1.7 and Data Migrations
How to Query Multiindex Index Columns Values in Pandas
Numpy: Find Index of the Elements Within Range
Subclassing Python Dictionary to Override _Setitem_
How to Solve Equations in Python
What's the Best Way to Store a Phone Number in Django Models
Pandas - Filter Dataframe by Another Dataframe by Row Elements
In Python, What Happens When You Import Inside of a Function
Python MySQL Connector - Unread Result Found When Using Fetchone
How to Concatenate Three Excels Files Xlsx Using Python