Using the Multiprocessing Module for Cluster Computing

Using the multiprocessing module for cluster computing

If by cluster computing you mean distributed memory systems (multiple nodes rather that SMP) then Python's multiprocessing may not be a suitable choice. It can spawn multiple processes but they will still be bound within a single node.

What you will need is a framework that handles spawing of processes across multiple nodes and provides a mechanism for communication between the processors. (pretty much what MPI does).

See the page on Parallel Processing on the Python wiki for a list of frameworks which will help with cluster computing.

From the list, pp, jug, pyro and celery look like sensible options although I can't personally vouch for any since I have no experience with any of them (I use mainly MPI).

If ease of installation/use is important, I would start by exploring jug. It's easy to install, supports common batch cluster systems, and looks well documented.

Behavior of multiprocessing module on cluster

Your qsub call gives you 4 processors per node, with 1 node. Thus multiprocessing is going to be limited to using a maximum of 4 processors.

BTW, if you want to do hierarchical parallel computing: across multiple clusters using sockets or ssh, using MPI and in coordination with cluster schedulers, and using multiprocessing and threading… you might want to have a look at pathos and it's sister package pyina (which interacts with MPI and the cluster scheduler).

For example, see: https://stackoverflow.com/questions/28203774/how-to-do-hierarchical-parallelism-in-ipython-parallel

Get pathos here: https://github.com/uqfoundation

Using multiprocessing module to runs parallel processes where one is fed (dependent) by the other for Viterbi Algorithm

Welcome to SO. Consider taking a look at producer-consumer pattern that is heavily used in multiprocessing.

Beware that multiprocessing in Python reinstantiates your code for every process you create on Windows. So your Viterbi objects and therefore their Queue fields are not the same.

Observe this behaviour through:

import os

def get_arg(self):
'''Dependent Process'''
print("Dependent ", self)
print("Dependent ", self.output)
print("Dependent ", os.getpid())

def get_val(self):
'''Independent Process'''
print("Independent ", self)
print("Independent ", self.output)
print("Independent ", os.getpid())

if __name__ == "__main__":
print("Hello from main process", os.getpid())
obs = np.array([0,1,2]) # normal then cold and finally dizzy

pi = np.array([0.6,0.4])

A = np.array([[0.7,0.3],
[0.4,0.6]])

B = np.array([[0.5,0.4,0.1],
[0.1,0.3,0.6]])

viterbi = Viterbi(A,B,pi)
print("Main viterbi object", viterbi)
print("Main viterbi object queue", viterbi.output)
path = viterbi.get_path_parallel(obs)

There are three different Viterbi objects as there are three different processes. So, what you need in terms of parallelism is not processes. You should explore the threading library that Python offers.

How to distribute multiprocess CPU usage over multiple nodes?

To run your job as-is, you could simply request ncpu=32 and then in your python script set num_cores = 2. Obviously this has you paying for 32 cores and then leaving 30 of them idle, which is wasteful.

The real problem here is that your current algorithm is memory-bound, not CPU-bound. You should be going to great lengths to read only chunks of your files into memory, operating on the chunks, and then writing the result chunks to disk to be organized later.

Fortunately Dask is built to do exactly this kind of thing. As a first step, you can take out the parallelize_dataframe function and directly load and map your some_algorithm with a dask.dataframe and dask.array:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import dask.dataframe as dd
import dask.array as da

def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = dd.read_csv(direc + file, blocksize=25e6)
a_and_b = da.from_np_stack(direc)

df['col3'] = df.apply(some_algorithm, args=(a_and_b,))

# dask is lazy, this is the only line that does any work
# Saving:
df.to_csv(
direc + 'outfile.csv',
index = False,
compute_kwargs={"scheduler": "threads"}, # also "processes", but try threads first
)

if __name__ == '__main__':
main()

That will require some tweaks to some_algorithm, and to_csv and from_np_stack work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue.



Related Topics



Leave a reply



Submit