How to Do Parallel Programming in Python

How to do parallel programming in Python?

You can use the multiprocessing module. For this case I might use a processing pool:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

If you want to map a list to a single function you would do this:

args = [A, B]
results = pool.map(solve1, args)

Don't use threads because the GIL locks any operations on python objects.

How do I parallelize a simple Python loop?

Using multiple threads on CPython won't give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing module instead:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Note that this won't work in the interactive interpreter.

To avoid the usual FUD around the GIL: There wouldn't be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.

Parallel Programming - Python

Since you are a beginner, the same with threading below:
(Note that the output is not controlled in any way and might mix up!)

import threading
def f1():
for i in range(1000):
print('Process1:', i)

def f2():
for j in range(1000):
print('Process2:', j)

t1=threading.Thread(target=f1)
t2=threading.Thread(target=f2)
t1.start()
t2.start()

Parallel Processing in python

A good simple way to start with parallel processing in python is just the pool mapping in mutiprocessing -- its like the usual python maps but individual function calls are spread out over the different number of processes.

Factoring is a nice example of this - you can brute-force check all the divisions spreading out over all available tasks:

from multiprocessing import Pool
import numpy

numToFactor = 976

def isFactor(x):
result = None
div = (numToFactor / x)
if div*x == numToFactor:
result = (x,div)
return result

if __name__ == '__main__':
pool = Pool(processes=4)
possibleFactors = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
print 'Checking ', possibleFactors
result = pool.map(isFactor, possibleFactors)
cleaned = [x for x in result if not x is None]
print 'Factors are', cleaned

This gives me

Checking  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
Factors are [(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]

How global variable works in parallel programming with Python?

TL;DR. You can skip to the last paragraph for the solution or read everything to understand what is actually going on.

You did not tag your question with your platform (e.g. windows or linux) as the guidelines for posting questions tagged with multiprocessing requests that you do; the behavior ("behaviour" for Anglos) of global variables very much depends on the platform.

On platforms that use method spawn to create new processes, such as Windows, to create and initialize each processes in the pool that is created with your pool = multiprocessing.Pool() statement, a new, empty address space is created and a new Python interpreter is launched that re-reads and re-executes the source program in order to initialize the address space before ultimately calling the worker function test. That means that every statement at global scope, i.e. import statements, variable declarations, function declarations, etc., are executed for this purpose. However, in the new subprocess variable __name__ will not be "__main__" so any statements within the if __name__ == "__main__" : block will not be executed. That is why for Windows platforms you must put code that creates new processes within such a block. Failure to do so would result in an infinite recursive process-creation loop if it were to go otherwise undetected.

So if you are running under Windows, your main process has set globVar to 'not ok' just prior to creating the pool. But when the processes are initialized prior to calling test, your source is re-executed and each process, which runs in its own address space and therefore has its own copy of globVar re-initialized that variable back to 'ok'. That is the value that test will see and the previous statement implies that modifying that local copy of globVar will not be reflected back to the main process.

Now on platforms that use fork to create new processes, such as Linux, things are a bit different. When the subprocesses are created, each one inherits the address space of the parent process as read-only and only when it attempts to modify memory does it get a copy ("copy on write"). This is clearly a more efficient process-creating mechanism. So in this case test will see globVar having a value of 'not ok' because that was the value it had at the time the subprocesses were created. But if test updates globVar, the "copy on write" mechanism will ensure that it is updating a globVar that exists in a local address space. So again the main process will not see the updated value.

So having worker functions returning values as your test function is doing is a standard way of reflecting back to the main process results. Your problem is that you are not starting with a globVar value that you expected. This can be solved by initializing the pool's processes with the correct globVar value using the initializer and initargs arguments to the Pool constructor (see the documentation):

import multiprocessing

global globVar
globVar = 'ok'

def init_processes(gVar):
global globVar
globVar = gVar

def test(arg1):
print(arg1)
return globVar

if __name__ == "__main__" :
globVar = 'not ok'

#Sequential
print(test(0))

#Parallel
pool = multiprocessing.Pool(initializer=init_processes, initargs=(globVar,))
argList = [0,1,2]
result = pool.map(test,argList)
pool.close()
print(result)

Prints:

0
not ok
0
1
2
['not ok', 'not ok', 'not ok']


Related Topics



Leave a reply



Submit