Appending to the Same List from Different Processes Using Multiprocessing

Appending to the same list from different processes using multiprocessing

Global variables are not shared between processes.

You need to use multiprocessing.Manager.list:

from multiprocessing import Process, Manager

def dothing(L, i): # the managed list `L` passed explicitly.
L.append("anything")

if __name__ == "__main__":
with Manager() as manager:
L = manager.list() # <-- can be shared between processes.
processes = []
for i in range(5):
p = Process(target=dothing, args=(L,i)) # Passing the list
p.start()
processes.append(p)
for p in processes:
p.join()
print L

See Sharing state between processes¶ (Server process part).

Share a list between different processes?

One way is to use a manager object and create your shared list object from it:

from multiprocessing import Manager, Pool

input_list = ['A', 'B', 'C', 'D', 'E', 'F']

manager = Manager()
shared_list = manager.list()

def do_stuff(element):
element_dict = {}
element_dict['name'] = element
shared_list.append(element_dict)
if len(shared_list) > 3:
print('list > 3')

pool = Pool(processes=6)
pool.map(do_stuff, input_list)
pool.close()

Remember, unlike threads, processes do not share memory space. (When spawned, each process gets its own copy of the memory footprint of the spawning process, and then runs with it.) So they can only communicate via some form of IPC (interprocess communication). In Python, one such method is multiprocessing.Manager and the data structures it exposes, e.g. list or dict. These are used in code as easily as their built-in equivalents, but under the hood utilize some form of IPC (sockets probably).

Edit Feb 1, 2022: Removed unneeded
global shared_list declaration from the function, since the object is not being replaced.

Python: multiprocessing append to list outside function

You could just return the data once it has been manipulated and then add it to the items list.

from multiprocessing import Pool

items = []
def myFunc(a,b):
data = (a,b)

# manipulate data...

return data

if __name__ == '__main__':
with Pool(2) as p:
items += p.starmap(myFunc,[(0,1),(2,3)])

print(items)

output:

[(0, 1), (2, 3)]

Appending an item to a list using multiprocessing in Python

It's important to understand that processes run in isolated areas of memory. Each process will have their own instance of hotels_url_list and there's no (easy) way of "sticking" those values into the parent process' list: if in the parent process you create an instance of list, that instance is not the same that the subprocesses use: When you do a .fork() (a.k.a. create a subprocess), the memory of the parent process is cloned on the child process. So, if the parent had an instance of list in the hotels_url_list variable, you'll also have an instance of list (also called hotels_url_list) in the child process BUT they will not be the same (they'll occupy different areas in memory).

This doesn't happen with Threads. They do share memory.

I would say (it's not like I'm much of an expert here) that the canonical way of communicating processes in this case would be a Queue: The child process puts things in the queue, the parent process grabs them:

from multiprocessing import Process, Queue

def get_spain_accomodations():
q = Queue()
processes = []
links = ['http://foo.com', 'http://bar.com', 'http://baz.com']
hotels_url_list = []
for link in links:
p = Process(target=get_page_links, args=(link, q,))
p.start()
processes.append(p)
for p in processes:
p.join()
hotels_url_list.append(q.get())
print("Collected: %s" % hotels_url_list)

def get_page_links(link, q):
print("link==%s" % link)
hotel_url = "https://www.booking.com" + link
q.put(hotel_url)

if __name__ == "__main__":
get_spain_accomodations()

This outputs each link prepended with https://www.booking.com, the pre-pending happening on independent processes:

link==http://foo.com
link==http://bar.com
link==http://baz.com
Collected: ['https://www.booking.comhttp://foo.com', 'https://www.booking.comhttp://bar.com', 'https://www.booking.comhttp://baz.com']

I don't know if it will help you, but to me, it helps seeing the Queue as a "shared file" that both processes know about. Imagine you have two complete different programs, and one of them knows that has to write things into a file called /tmp/foobar.txt and the other one knows that has to read from a file called /tmp/foobar.txt. That way they can "communicate" with each other. This paragraph is just a "metaphor" (although that's pretty much how Unix pipes work)... Is not like queues work exactly like that, but maybe it helps understanding the concept? Dunno, really, maybe I made it more confusing...

Another way would be using Threads and collect their return value, as explained here.

Python Multiprocessing appending list

With your current code, you're not actually sharing CURRENT_SUCCESSES between processes. callback is executed in the main process, in a result handling thread. There is only one result handling thread, so each callback will be run one at a time, not concurrently. So your code as written is process/thread safe.

However, you are forgetting to return successes from func, which you'll want to fix.

Edit:

Also, this could be much more succinctly written using map:

def func(inputs):
successes = []

for input in inputs:
result = #something with return code
if result == 0:
successes.append(input)
return successes

def main():
pool = mp.Pool()
total_successes = pool.map(func, myInputs) # Returns a list of lists
# Flatten the list of lists
total_successes = [ent for sublist in total_successes for ent in sublist]

Multiprocessing for-loop appending to a list

I'm not quite sure what you meant about "the all_processes list growing too large before the files can be uploaded". Below is example code that limits the number of processes to be started at one time by making use of a multiprocessing.Queue with a fixed maximum size. The queue will block whenever an attempt is made to insert more that that into it than its max. The all-processes list in it is exactly that, and is used to determine when they are all done.

On my system it takes a little over 20 seconds for 1000 processes to each sleep for 1 second (because only a limited number of them are allowed to run concurrently).

from multiprocessing import Process, Queue
from time import sleep, time

QSIZE = 10
SENTINEL = 'SENTINEL'
START_BLOCK, END_BLOCK = 1, 1_000

def worker(i):
sleep(1)
# print(f'{i} finished')

def runner(queue):
''' Get args from queue and start a task to process them. '''
all_processes = []

while True:
args = queue.get() # Remove from queue.
if args == SENTINEL:
break
process = Process(target=worker, args=args)
process.start()
all_processes.append(process)

for process in all_processes: # Wait for them all to finish.
process.join()

if __name__ == "__main__":
args_gueue = Queue(QSIZE)

start_time = time()

runner_process = Process(target=runner, args=(args_gueue,))
runner_process.start()

for i in range(START_BLOCK, END_BLOCK):
args_gueue.put((i,)) # Blocks whenever queue is full.

args_gueue.put('SENTINEL') # Signal the end.

runner_process.join() # Wait for all processes to complete.

print('Completed in', time()-start_time, 'seconds')

Add element to list using multiprocessing python

IIUC you need:

If your integer process is more IO bound, threads might work better.

Threads are more IO intensive, therefore if that's what you need you could try:

from concurrent.futures import ThreadPoolExecutor
def get_output(n):
output = n ** 2
return output

input_list = [1,2,3,5,6,8,5,5,8,6,5,2,5,2,5,4,5,2]
output_list = []

if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=6) as pool:
output_list.extend(pool.map(get_output, input_list))

print(output_list)

This processes the list and squares all the elements, it applies this to every 6 elements parallelly, as you can see I specified max_workers=6.

If your integer process is more CPU bound, go with multiprocessing.

With the virtually same code:

from concurrent.futures import ProcessPoolExecutor
def get_output(n):
output = n ** 2
return output

input_list = [1,2,3,5,6,8,5,5,8,6,5,2,5,2,5,4,5,2]
output_list = []

if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=6) as pool:
output_list.extend(pool.map(get_output, input_list))

print(output_list)

This does the same, it processes and squares all elements for every 6 elements parellelly.

Both codes output:

[1, 4, 9, 25, 36, 64, 25, 25, 64, 36, 25, 4, 25, 4, 25, 16, 25, 4]

Multiprocessing: append to 2 lists simultanously

Instead of trying to append from sub-processes, you can make the function return the values and append them in main process; you don't need to care about mutual access between sub-processes (also no need to use manager).

from multiprocessing import Pool

def f(args):
a, b = args
# do something with a and b
return a, b

if __name__ == '__main__':
data = [(1,2), (3,4), (5,6)]
x, y = [], []
with Pool() as p:
for a, b in p.map(f, data): # or imap()
x.append(a)
y.append(b)

# do something with x and y
assert x == [1,3,5]
assert y == [2,4,6]


Related Topics



Leave a reply



Submit