Same Output in Different Workers in Multiprocessing

Same output in different workers in multiprocessing

I think you'll need to re-seed the random number generator using numpy.random.seed in your do_calculation function.

My guess is that the random number generator (RNG) gets seeded when you import the module. Then, when you use multiprocessing, you fork the current process with the RNG already seeded -- Thus, all your processes are sharing the same seed value for the RNG and so they'll generate the same sequences of numbers.

e.g.:

def do_calculation(data):
np.random.seed()
rand=np.random.randint(10)
print data, rand
return data * 2

Appending to the same list from different processes using multiprocessing

Global variables are not shared between processes.

You need to use multiprocessing.Manager.list:

from multiprocessing import Process, Manager

def dothing(L, i): # the managed list `L` passed explicitly.
L.append("anything")

if __name__ == "__main__":
with Manager() as manager:
L = manager.list() # <-- can be shared between processes.
processes = []
for i in range(5):
p = Process(target=dothing, args=(L,i)) # Passing the list
p.start()
processes.append(p)
for p in processes:
p.join()
print L

See Sharing state between processes¶ (Server process part).

Joining output of multiprocessing workers - python2

I assume the issue you encountered was the inability of both processes to share new_id_list.

What you need to do is to create another Queue which will represent the result queue and pass it to both processes. Append to the queue as needed inside the processes and at the end of both processes execution (after the process.join()) you just extract everything from the queue in a list.

Log output of multiprocessing.Process

The easiest way might be to just override sys.stdout. Slightly modifying an example from the multiprocessing manual:

from multiprocessing import Process
import os
import sys

def info(title):
print title
print 'module name:', __name__
print 'parent process:', os.getppid()
print 'process id:', os.getpid()

def f(name):
sys.stdout = open(str(os.getpid()) + ".out", "w")
info('function f')
print 'hello', name

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
q = Process(target=f, args=('fred',))
q.start()
p.join()
q.join()

And running it:


$ ls
m.py
$ python m.py
$ ls
27493.out 27494.out m.py
$ cat 27493.out
function f
module name: __main__
parent process: 27492
process id: 27493
hello bob
$ cat 27494.out
function f
module name: __main__
parent process: 27492
process id: 27494
hello fred

Using multiprocessing.Process with a maximum number of simultaneous processes

It might be most sensible to use multiprocessing.Pool which produces a pool of worker processes based on the max number of cores available on your system, and then basically feeds tasks in as the cores become available.

The example from the standard docs (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) shows that you can also manually set the number of cores:

from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"

And it's also handy to know that there is the multiprocessing.cpu_count() method to count the number of cores on a given system, if needed in your code.

Edit: Here's some draft code that seems to work for your specific case:

import multiprocessing

def f(name):
print 'hello', name

if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()

Can a python pool worker return values from initialization?

I don't really think I understand how exactly returning the data incrementally isn't sufficient, but it kinda seems like you need some sort of finalization function to send the data similar to how you have an initialization function. Unfortunately, I don't think this sort of thing exists for mp.Pool, so it'll require you to use a couple mp.Process's, and send input args, and return results with a couple mp.Queue's

On a side note your use of Semaphore is unncessary, as the call to the "load_data" iterator always happens on the main process. I have moved that to another "producer" process, which puts inputs to a queue, which is also already synchronized automatically by default. This allows you to have one process for gathering inputs, several processes for processing the inputs to outputs, and leaves the main (parent) process to gather outputs. If the "producer" generating the inputs is IO limited by file read speed (very likely), it could also be in a thread rather than a process, but in this case the difference is probably minimal.

I have created an example of a custom "Pool" which allows you to return some data at the end of each worker's "life" using aforementioned "producer-consumer" scheme. there are print statements to track what is going on in each process, but please also read the comments to track what's going on and why:

import multiprocessing as mp
from time import sleep
from queue import Empty

class ExitFlag:
def __init__(self, exit_value=None):
self.exit_value = exit_value #optionally pass value along with exit flag

def producer_func(input_q, n_workers):
for i in range(100): #100 lines of some long file
print(f"put {i}")
input_q.put(i) #put each line of the file to the work queue
print('stopping consumers')
for i in range(n_workers):
input_q.put(ExitFlag()) #send shut down signal to each of the workers
print('producer exiting')

def consumer_func(input_q, output_q, work_func):
counter = 0
while True:
try:
item = input_q.get(.1) #never wait forever on a "get". It's a recipe for deadlock.
except Empty:
continue
print(f"get {item}")
if isinstance(item, ExitFlag):
break
else:
counter += 1
output_q.put(work_func(item))
output_q.put(ExitFlag(exit_value=counter))
print('consumer exiting')

def work_func(number):
sleep(.1) #some heavy nltk work...
return number*2

if __name__ == '__main__':
input_q = mp.Queue(maxsize=10) #only bother limiting size if you have memory usage constraints
output_q = mp.Queue(maxsize=10)

n_workers = mp.cpu_count()

producer = mp.Process(target=producer_func, args=(input_q, n_workers)) #generate the input from another process. (this could just as easily be a thread as it seems it will be IO limited anyway)
producer.start()

consumers = [mp.Process(target=consumer_func, args=(input_q, output_q, work_func)) for _ in range(n_workers)]
for c in consumers: c.start()

total = 0
stop_signals = 0
exit_values = []
while True:
try:
item = output_q.get(.1)
except Empty:
continue
if isinstance(item, ExitFlag):
stop_signals += 1
if item.exit_value is not None:
exit_values.append(item.exit_value) #do something with the return at the end
if stop_signals >= n_workers: #stop waiting for more results once all consumers finish
break
else:
total += item #do something with the incremental return values
print(total)
print(exit_values)

#cleanup
producer.join()
print("producer joined")
for c in consumers: c.join()
print("consumers joined")


Related Topics



Leave a reply



Submit