Solving Embarassingly Parallel Problems Using Python Multiprocessing

Solving embarassingly parallel problems using Python multiprocessing

My solution has an extra bell and whistle to make sure that the order of the output has the same as the order of the input. I use multiprocessing.queue's to send data between processes, sending stop messages so each process knows to quit checking the queues. I think the comments in the source should make it clear what's going on but if not let me know.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
"""Make the command line interface parser."""
usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
__doc__,
"""
ARGUMENTS:
INPUT_CSV: an input CSV file with rows of numbers
OUTPUT_CSV: an output file that will contain the sums\
"""])
cli_parser = optparse.OptionParser(usage)
cli_parser.add_option('-n', '--numprocs', type='int',
default=NUM_PROCS,
help="Number of processes to launch [DEFAULT: %default]")
return cli_parser

class CSVWorker(object):
def __init__(self, numprocs, infile, outfile):
self.numprocs = numprocs
self.infile = open(infile)
self.outfile = outfile
self.in_csvfile = csv.reader(self.infile)
self.inq = multiprocessing.Queue()
self.outq = multiprocessing.Queue()

self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
for i in range(self.numprocs)]

self.pin.start()
self.pout.start()
for p in self.ps:
p.start()

self.pin.join()
i = 0
for p in self.ps:
p.join()
print "Done", i
i += 1

self.pout.join()
self.infile.close()

def parse_input_csv(self):
"""Parses the input CSV and yields tuples with the index of the row
as the first element, and the integers of the row as the second
element.

The index is zero-index based.

The data is then sent over inqueue for the workers to do their
thing. At the end the input process sends a 'STOP' message for each
worker.
"""
for i, row in enumerate(self.in_csvfile):
row = [ int(entry) for entry in row ]
self.inq.put( (i, row) )

for i in range(self.numprocs):
self.inq.put("STOP")

def sum_row(self):
"""
Workers. Consume inq and produce answers on outq
"""
tot = 0
for i, row in iter(self.inq.get, "STOP"):
self.outq.put( (i, sum(row)) )
self.outq.put("STOP")

def write_output_csv(self):
"""
Open outgoing csv file then start reading outq for answers
Since I chose to make sure output was synchronized to the input there
is some extra goodies to do that.

Obviously your input has the original row number so this is not
required.
"""
cur = 0
stop = 0
buffer = {}
# For some reason csv.writer works badly across processes so open/close
# and use it all in the same process or else you'll have the last
# several rows missing
outfile = open(self.outfile, "w")
self.out_csvfile = csv.writer(outfile)

#Keep running until we see numprocs STOP messages
for works in range(self.numprocs):
for i, val in iter(self.outq.get, "STOP"):
# verify rows are in order, if not save in buffer
if i != cur:
buffer[i] = val
else:
#if yes are write it out and make sure no waiting rows exist
self.out_csvfile.writerow( [i, val] )
cur += 1
while cur in buffer:
self.out_csvfile.writerow([ cur, buffer[cur] ])
del buffer[cur]
cur += 1

outfile.close()

def main(argv):
cli_parser = make_cli_parser()
opts, args = cli_parser.parse_args(argv)
if len(args) != 2:
cli_parser.error("Please provide an input file and output file.")

c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
main(sys.argv[1:])

Expected speedup from embarrassingly parallel task using Python Multiprocessing

I think you want to divide the work up differently.

Although your program divides the range of candidate integers evenly across cores, the work in each range is not likely to be even. That means some cores finish early, and have nothing to do, while others are still running. That loses parallel efficiency, fast.

Just to make the point, imagine you have 1000 cores. The first core sees very small candidate numbers and doesn't take long to factor them, then goes idle. The last (thousandth) core sees only very big candidate numbers, and takes much longer to factor them. So it runs, while the first core sits idle. Wasted cycles. Similarly for 4 cores.

What you want to do when the amount of work being handed to a core is unknown, is hand all the cores a lot of modest sized chunks, many more than there are cores. Then cores can finish at uneven rates, and each core goes back to find a bit more work to do. This is essentially a work-list algorithm. You end up with un-evenness at the very end, but it is only on small chunks so not much is wasted.

I'm not a Python programmer, so I coded a solution in Parlanse instead.

(includeunique `Console.par')
(includeunique `Timer.par')

(define upper_limit 10000000)

(define candidates_per_segment 10)
(define candidates_per_segment2 (constant (* candidates_per_segment 2)))

(= [prime_count natural] 0)
[prime_finding_team team]

(define primes_in_segment
(action (procedure [lower natural] [upper natural])
(;;
(do [candidate natural] lower upper 2
(block test_primality
(local (= [divisor natural] 3)
(;;
(while (< (* divisor divisor) candidate)
(ifthenelse (== (modulo candidate divisor) 0)
(exitblock test_primality)
(+= divisor 2)
)ifthenelse
)while
(ifthen (~= (* divisor divisor) candidate)
(consume (atomic+= prime_count))
)ifthen
);;
)local
)block
)do
);;
)action
)define

(define main
(action (procedure void)
(local [timer Timer:Timer]
(;;
(Console:Put (. `Number of primes found: '))
(Timer:Reset (. timer))
(do [i natural] 1 upper_limit candidates_per_segment2
(consume (draft prime_finding_team primes_in_segment
`lower':i
`upper':(minimum upper_limit (- (+ i candidates_per_segment2) 2))))
)do
(consume (wait (@ (event prime_finding_team))))
(Timer:Stop (. timer))
(Console:PutNatural prime_count)
(Console:PutNewline)
(Timer:PrintElapsedTime (. timer) (. `Parallel computed in '))
(Console:PutNewline)
);;
)local
)action
)define

Parlanse looks like LISP, but works and compiles more like C.

The worker is primes_in_segment; it takes a range of candidate values defined by its parameters lower and upper. It tries each candidate in that range, and increments (atomically) a total prime_count if that candidate is a prime.

The full range is split into small packets of ranges (sequences of odd numbers) by the do
loop in main. The parallelism happens on the draft command, which creates a parallel execution grain of computation (not a Windows thread) and adds it to the prime_finding_team, which is an aggregate set of work representing all the prime factoring. (The purpose of a team is to allow all this work to managed as a unit, e.g., destroyed if necessary, not needed in this program). The arguments to draft are the function to be run by the forked grain, and its parameters. The work is accomplished by a Parlanse-managed set of (Windows) threads using a work-stealing algorithm. If there is too much work, Parlanse throttles the work-generating grains, and spends its energy executing grains which are pure computation.

One could pass only one candidate value to each grain, but then there's more fork overhead per candidate and the total runtime gets accordingly worse. We've chosen 10 empirically to ensure that the fork overhead per range of candidates is small; setting the candidates per segment to 1000 doesn't buy much additional speedup.

The do loop simply manufactures work as fast as it can. Parlanse throttles the draft step when there is enough parallelism to be useful. The wait on the team event, causes the main program to wait for all team members to complete.

We ran this on an HP hex-core AMD Phenom II X6 1090T 3.2 GHz.
Execution runs are below; first for 1 CPU:

 >run -p1 -v ..\teamprimes
PARLANSE RTS: Version 19.1.53
# Processors = 1
Number of primes found: 664579
Parallel computed in 13.443294 seconds
---- PARLANSE RTS: Performance Statistics
Duration = 13.527557 seconds.
CPU Time Statistics:
Kernel CPU Time: 0.031s
User CPU Time: 13.432s
Memory Statistics:
Peak Memory Usage : 4.02 MBytes
Steals: 0 Attempts: 0 Success rate: 0.0% Work Rediscovered: 0
Exiting with final status 0.

Then for 6 CPUs (scales nicely):

>run -p6 -v ..\teamprimes
PARLANSE RTS: Version 19.1.53
# Processors = 6
Number of primes found: 664579
Parallel computed in 2.443123 seconds
---- PARLANSE RTS: Performance Statistics
Duration = 2.538972 seconds.
CPU Time Statistics:
Kernel CPU Time: 0.000s
User CPU Time: 14.102s
Total CPU Time: 14.102s
Memory Statistics:
Peak Memory Usage : 4.28 MBytes
Steals: 459817 Attempts: 487334 Success rate: 94.4% Work Rediscovered: 153

You note the total CPU time for the parallel version is roughly the same as for the serial version; this is because they are doing the same work.

Given Python's "fork" and "join" operations, I'm sure there's a Python equivalent you can easily code. It might run out of space or threads because of the possibility of too many forks at the same time. (With candidates_per_segment at 10, there are up to 1 million live grains running under Parlanse). This is where automatic throttling the generation of work is a good thing to have. As a substitute, you can set candidates_per_segment to a much larger number, e.g., 10000, which means you only get 1000 threads worst case. (I think you will still pay a high price due the Python's interpretive nature). As you set the candidates per segment closer and closer to 1e7/4, you'll approach the exact behavior you have with your present Python code.

Multiprocessing in Python: Pool and Process with shared array

np.linalg.solve should already be executed in parallel function implemented in LAPACK. In fact, this is the case on my (Linux + Windows) machine. Indeed, it calls LAPACK functions like dtrsm and dlaswp and the main computational function, dgemm, implemented in BLAS libraries. This last function should take >90% of the time and is heavily optimized and parallelized as long as you use a fast BLAS implementation. Numpy use OpenBLAS by default on most systems which is very good (and parallel). The Intel MKL is also a good alternative supporting LAPACK (certainly better on Intel hardware). If the computation is not parallel on your machine, this is a problem and you should check your BLAS implementation as it may be very inefficient.

The thing is parallelizing a code already parallel make it slower because running more threads than available core put a lot of pressure on the OS scheduler and the BLAS functions are not optimized for such a case. More specifically, profiling results shows parallelizing a parallel OpenBLAS function cause some synchronization function to wait for a while because of the work imbalance (certainly due to a static schedule of the computing kernels). This is the main source of slowdown of the approach 2/3/4 compared to the first sequential approach.

If you really want to parallelize the function you need to configure the BLAS so to use 1 thread (with OMP_NUM_THREADS=1 on OpenBLAS) but this is likely be less efficient than letting the BLAS does the parallelization. Indeed, BLAS makes use of optimized multi-threaded kernels (working in shared memory) meanwhile Python nearly prevent such design to be fast because of the global interpreter lock (GIL) in multi-threaded codes. Multi-threading is also limited by the overhead of pickling or the one of forking. That being said such overheads are small in the approach 2 and 3 (not in 4 certainly due to the queue). This generally is why Python is often not great for writing fast parallel applications (unless for long-lasting embarrassingly parallel works with small data transfers).

Here are average timing on my 6-core i5-9600KF machine with OpenBLAS:

Default OpenBLAS configuration:
- Approach 1: 0.129 seconds (stable)
- Approach 2: 1.623 seconds (highly-variable)
- Approach 3: 1.407 seconds (highly-variable)
- Approach 4: 1.532 seconds (highly-variable)

Sequential configuration of OpenBLAS:
- Approach 1: 0.338 seconds (stable) <--- Now sequential
- Approach 2: 0.152 seconds (quite stable)
- Approach 3: 0.151 seconds (stable)
- Approach 4: 0.177 seconds (stable) <--- >10% time in CPython overheads
- Parallel Numba: 0.144 seconds (stable) <--- Multi-threaded

Note that the best speed up is pretty bad (ie. ~2.6) on a 6-core machine. I guess this might be because the computation is memory-bound (on my PC, the RAM throughput is about 20 GiB/s, while it can reach 32-35 GiB/s at most for such access pattern).



Related Topics



Leave a reply



Submit