Multiprocessing:Use Tqdm to Display a Progress Bar

Multiprocessing : use tqdm to display a progress bar

Solution found. Be careful! Due to multiprocessing, the estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available in Python 3.3+.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square

if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for _ in p.imap_unordered(_foo, range(0, max_)):
pbar.update()

join() output from multiprocessing when using tqdm for progress bar

Your call to p.join() just waits for all the pool processes to end and returns None. This call is actually unnecessary since you are using the pool as a context manager, that is you have specified with Pool(processes=2) as p:). When that block terminates an implicit call is made to p.terminate(), which immediately terminates the pool processes and any tasks that may be running or queued up to run (there are none in your case).

It is, in fact, iterating the iterator returned by the call to p.imap_unordered that returns each return value from your worker function, _foo. But since you are using method imap_unordered, the results returned may not be in submission order. In other words, you cannot assume that the return values will be in succession 0, 1, , 4, 9, etc. There are many ways to handle this, such as having your worker function return the original argument along with the squared value:

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
square = my_number * my_number
return my_number, square # return the argunent along with the result

if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
results = [None] * 30; # preallocate the resulys array
with tqdm(total=max_) as pbar:
for x, result in p.imap_unordered(_foo, range(0, max_)):
results[x] = result
pbar.update()
print(results)

The second way is to not use imap_unordered, but rather apply_async with a callback function. The disadvantage of this is that for large iterables you do not have the option of specifying a chunksize argument as you do with imap_unordered:

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
square = my_number * my_number
return square

if __name__ == '__main__':
def my_callback(_): # ignore result
pbar.update() # update progress bar when a result is produced

with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
async_results = [p.apply_async(_foo, (x,), callback=my_callback) for x in range(0, max_)]
# wait for all tasks to complete:
p.close()
p.join()
results = [async_result.get() for async_result in async_results]
print(results)

Progress bar with multiprocessing

First a few general comments concerning your code. In your main process you use a path to a file to open zip archive just to retrieve back the original file name. That really does not make too much sense. Then in count_files_7z you iterate the return value from zf.namelist() to build a list of the files within the archive when zf.namelist() is already a list of those files. That does not make too much sense either. You also use the context manager function closing to ensure that the archive is closed at the end of the block, but the with block itself is a context manager that serves the same purpose.

I tried installing alive-progress and the progress bars were a mess. This is a task better suited to multithreading rather than multiprocessing. Actually, it is probably better suited to serial processing since doing concurrent I/O operations to your disk, unless it is a solid state drive, is probably going to hurt performance. You will gain performance if there is heavy CPU-intensive processing involved of the files you read. If that is the case, I have passed to each thread a multiprocessing pool to which you can execute a calls to apply specifying functions in which you have placed CPU-intensive code. But the progress bars will should work better when done under multithreading rather than multiprocessing. Even then I could not get any sort of decent display with alive-progress, which admittedly I did not spend too much time on. So I have switched to using the more common tqdm module available from the PyPi repository.

Even with tqdm there is a problem in that when a progress bar reaches 100%, tqdm must be writing something (a newline?) that relocates the other progress bars. Therefore, what I have done is specified leave=False, which causes the bar to disappear when it reaches 100%. But at least you can see all the progress bars without distortion as they are progressing.

from multiprocessing.pool import Pool, ThreadPool
from threading import Lock
import tqdm
from zipfile import ZipFile
import os
import heapq

def get_filepaths(directory):
file_paths = [] # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
for filename in files:
# Join the two strings in order to form the full filepath.
filepath = os.path.join(root, filename)
file_paths.append(filepath) # Add it to the list.
return file_paths # Self-explanatory.

def get_free_position():
""" Return the minimum possible position """
with lock:
free_position = heapq.heappop(free_positions)
return free_position

def return_free_position(position):
with lock:
heapq.heappush(free_positions, position)

def run_performance(zip_file):
position = get_free_position()
with ZipFile(zip_file) as zf:
file_list = zf.namelist()
with tqdm.tqdm(total=len(file_list), position=position, leave=False) as bar:
for f in file_list:
with zf.open(f) as myfile:
... # do things with myfile (perhaps myfile.read())
# for CPU-intensive tasks: result = pool.apply(some_function, args=(arg1, arg2, ... argn))
import time
time.sleep(.005) # simulate doing something
bar.update()
return_free_position(position)

def generate_zip_files():
list_dir = ['path1', 'path2']
for folder in list_dir:
get_all_zips = get_filepaths(folder)
for zip_file in get_all_zips:
yield zip_file

# Required for Windows:
if __name__ == '__main__':
N_THREADS = 5
free_positions = list(range(N_THREADS)) # already a heap
lock = Lock()
pool = Pool()
thread_pool = ThreadPool(N_THREADS)
for result in thread_pool.imap_unordered(run_performance, generate_zip_files()):
pass
pool.close()
pool.join()
thread_pool.close()
thread_pool.join()

The code above uses a multiprocessing thread pool arbitrarily limited in size to 5 just as a demo. You can increase or decrease N_THREADS to whatever value you want, but as I said, it may or may not help performance. If you want one thread per zip file then:

if __name__ == '__main__':
zip_files = list(generate_zip_files())
N_THREADS = len(zip_files)
free_positions = list(range(N_THREADS)) # already a heap
lock = Lock()
pool = Pool()
thread_pool = ThreadPool(N_THREADS)
for result in thread_pool.imap_unordered(run_performance, zip_files):
pass
pool.close()
pool.join()
thread_pool.close()
thread_pool.join()

Show the progress of a Python multiprocessing pool imap_unordered call?

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

Unexpected output when running tqdm inside multiprocessing.Process

You can manage your tqdm bars manually and use multiprocessing.Queue to signal that bar should update.

This example will create 4 processes that do the work and one process that updates the bars through the Queue:

import random
from tqdm import tqdm
from time import sleep
import multiprocessing

N = 4

def cluster(indexes, n_proc, q):
for index in indexes:
# do some work
sleep(random.randint(1, 10) / 10)

# update progress bar:
q.put_nowait(n_proc)

def update_bar(q):
bars = [tqdm(total=10, position=i, desc=f"Process {i}") for i in range(N)]

while True:
n = q.get()
bars[n].update()

if __name__ == "__main__":
q = multiprocessing.Queue()

# daemon=True means the script doesn't wait for this process to end
process = multiprocessing.Process(target=update_bar, args=(q,), daemon=True)
process.start()

processes = []
for i in range(N):
process = multiprocessing.Process(
target=cluster, args=(range(10), i, q)
)
process.start()
processes.append(process)

for process in processes:
process.join()

print("\n" * N)

Prints:

Process 0: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:06<00:00,  1.50it/s]
Process 1: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:05<00:00, 1.71it/s]
Process 2: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:04<00:00, 1.63it/s]
Process 3: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:05<00:00, 1.97it/s]



Related Topics



Leave a reply



Submit