Python Multiprocessing Pool.Apply_Async with Shared Variables (Value)

Python multiprocessing Pool.apply_async with shared variables (Value)

As the error messages states, you can't pass a multiprocessing.Value via pickle. However, you can use a multiprocessing.Manager().Value:

import multiprocessing
import urllib2
import random
import myurllist #list of all destination urls for all 10 servers
import time
import socbindtry #script that binds various virtual/aliased client ips to the script

def send_request3(response_time, error_count): #function to send requests from alias client ip 1
opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1
try:
tstart=time.time()
for i in range(myurllist.url):
x=random.choice(myurllist.url[i])
opener.open(x).read()
print "file downloaded:",x
response_time.append(time.time()-tstart)
except urllib2.URLError, e:
with error_count.get_lock():
error_count.value += 1

def send_request4(response_time, error_count): #function to send requests from alias client ip 2
opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2
try:
tstart=time.time()
for i in range(myurllist.url):
x=random.choice(myurllist.url[i])
opener.open(x).read()
print "file downloaded:",x
response_time.append(time.time()-tstart)
except urllib2.URLError, e:
with error_count.get_lock():
error_count.value += 1

#50 such functions are defined here for 50 clients

def func(response_time, error_count):
pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count())
args = (response_time, error_count)
for i in range(5):
pool.apply_async(send_request3, args=args)
pool.apply_async(send_request4, args=args)
#append 50 functions here
pool.close()
pool.join()
print"All work Done..!!"
return

if __name__ == "__main__":
m=multiprocessing.Manager()
response_time=m.list() #some shared variables
error_count=m.Value('i',0)

start=float(time.time())
func(response_time, error_count)
end=float(time.time())-start
print end

A few other notes here:

  1. Using a Pool with 750 processes is not a good idea. Unless you're using a server with hundreds of CPU cores, that's going to overwhelm your machine. It'd be faster and put less strain on your machine to use significantly fewer processes. Something more like 2 * multiprocessing.cpu_count().
  2. As a best practice, you should explicitly pass all the shared arguments you need to use to the child processes, rather than using global variables. This increases the chances that the code will be work on Windows.
  3. It looks like all your send_request* functions do almost the exact same thing. Why not just make one function and use a variable to decide which socbindtry.BindableHTTPHandler to use? You would avoid a ton of code duplication by doing this.
  4. The way you're incrementing error_count is not process/thread-safe, and is susceptible to race conditions. You need to protect the increment with a lock (as I did in the example code above).

pool.apply_async and global variable

Solution for someone who will need it.
I edited example from 16.6.1.4. Sharing state between processes¶

from multiprocessing import Pool, Manager
import config
from threading import Thread
import time

def f(d):

if d['flag1'] is False:
d['flag1'] = True
else:
d['flag1'] = False
# l.reverse()

def stop():
print('stop')
while 1:
if config.variable1 is True:
break

if __name__ == '__main__':
manager = Manager()
print(config.variable1)

d = manager.dict()

thread1 = Thread(target = stop)
thread1.start()

while 1:
d['flag1'] = config.variable1
pool = Pool(4)
p = pool.apply_async(f, args=(d,))
pool.close()
pool.join()

config.variable1 = d['flag1']

print (d)
print(config.variable1)
if thread1.is_alive() is False:
break
time.sleep(3)

How to share a global variable with another script in multiprocessing?

Here is an example of creating a shared managed string value per the comment offered by @martineau.

On a platform such as Linux where fork by default is used to create new processes you could code:

import multiprocessing
from ctypes import c_char_p

s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()

def function1():
s.value = 'New value' # updates global variable s
event.set() # show we have a new value

def function2():
event.wait() # wait for new s value
print(s.value)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()

Prints:

New value

On platforms such as Windows where spawn is used to create new processes, the shared string is being passed as an argument to the processes to ensure that only one instance of the string is being created.

import multiprocessing
from ctypes import c_char_p

def function1(s, event):
s.value = 'New value'
event.set() # show we have a new value

def function2(s, event):
event.wait() # wait for new s value
print(s.value)

# I need this for Windows:
if __name__ == '__main__':
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(s, event))
p2 = multiprocessing.Process(target=function2, args=(s, event))
p1.start()
p2.start()
p1.join()
p2.join()

Prints:

New value

The if __name__ == '__main__': check above is needed or else we would get into a recursive loop because our newly created processes start executing the source from the top and without that check would create new processes ad infinitum. And for that reason the definitions of s and event cannot be outside that check or else each newly created process would be creating its own instance of these variables. But that means we now have to be passing these variables as arguments whereas in the forking example they can just be inherited.

Update: Creating a Shared numpy Array on Linux/Unix

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)

def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value

def function2():
event.wait() # wait for new arr value
print('arr =', arr)

p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

Prints:

arr = [[1 1 1]
[1 1 1]]
arr = [[1 1 1]
[1 1 1]]

Creating a Shared numpy Array on Windows

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)

def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array

def function1(arr, event):
shape = arr.shape
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value

def function2(arr, event):
event.wait() # wait for new arr value
print('arr =', arr)

if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

p1 = multiprocessing.Process(target=function1, args=(arr, event))
p2 = multiprocessing.Process(target=function2, args=(arr, event))
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

Using a Shared numpy Array With a Multiprocessing Pool on Windows

When using a multiprocessing pool, whether you are passing the array as an argument to the worker function or as in this case using it to initialize a global variable for each process in the pool, you must pass the shared array to each process and recreate a numpy array from that.

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)

def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array

def init_pool(shared_array, the_shape, the_event):
global arr, shape, event
shape = the_shape
event = the_event
# recreate the numpy array from the shared array:
arr = to_numpy_array(shared_array, shape)

def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value

def function2():
event.wait() # wait for new arr value
print('arr =', arr)

if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)

Using a Shared numpy Array With a Multiprocessing Pool on Linux/Unix

import multiprocessing
import ctypes
import numpy as np

def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)

def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array

arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()

def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value

def function2():
event.wait() # wait for new arr value
print('arr =', arr)

pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)

python multiprocessing shared variable safe

Your subprocesses do not inherit the lock object. Or they do, but they are independent copies not linked together and cannot be used for anything. So there is a race condition and it eventually fails.

You can solve this by Manager().Lock() as you are already using Manager.

def a_complex_operation(counter, alock):
with alock:
time.sleep(random.random())
counter.value += 1

def main():
pool = Pool(16)
ma = Manager()
counter = ma.Value('i', 0)
lock = ma.Lock()
for i in range(100):
pool.apply_async(a_complex_operation, args=(counter, lock))

This works (your subprocesses will now be much slower, though. Expect around 50 seconds per run, 100 times average 0.5 seconds).

But now your counter.value is always 100.

How to increment a shared counter from multiple processes?

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
''' store the counter for later use '''
global counter
counter = args

def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10

if __name__ == '__main__':
#inputs = os.listdir(some_directory)

#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]

#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()


Related Topics



Leave a reply



Submit