Using the multiprocessing module for cluster computing
If by cluster computing you mean distributed memory systems (multiple nodes rather that SMP) then Python's multiprocessing may not be a suitable choice. It can spawn multiple processes but they will still be bound within a single node.
What you will need is a framework that handles spawing of processes across multiple nodes and provides a mechanism for communication between the processors. (pretty much what MPI does).
See the page on Parallel Processing on the Python wiki for a list of frameworks which will help with cluster computing.
From the list, pp, jug, pyro and celery look like sensible options although I can't personally vouch for any since I have no experience with any of them (I use mainly MPI).
If ease of installation/use is important, I would start by exploring jug
. It's easy to install, supports common batch cluster systems, and looks well documented.
Behavior of multiprocessing module on cluster
Your qsub
call gives you 4 processors per node, with 1 node. Thus multiprocessing
is going to be limited to using a maximum of 4 processors.
BTW, if you want to do hierarchical parallel computing: across multiple clusters using sockets or ssh, using MPI and in coordination with cluster schedulers, and using multiprocessing and threading… you might want to have a look at pathos
and it's sister package pyina
(which interacts with MPI and the cluster scheduler).
For example, see: https://stackoverflow.com/questions/28203774/how-to-do-hierarchical-parallelism-in-ipython-parallel
Get pathos
here: https://github.com/uqfoundation
Using multiprocessing module to runs parallel processes where one is fed (dependent) by the other for Viterbi Algorithm
Welcome to SO. Consider taking a look at producer-consumer pattern that is heavily used in multiprocessing.
Beware that multiprocessing in Python reinstantiates your code for every process you create on Windows. So your Viterbi objects and therefore their Queue fields are not the same.
Observe this behaviour through:
import os
def get_arg(self):
'''Dependent Process'''
print("Dependent ", self)
print("Dependent ", self.output)
print("Dependent ", os.getpid())
def get_val(self):
'''Independent Process'''
print("Independent ", self)
print("Independent ", self.output)
print("Independent ", os.getpid())
if __name__ == "__main__":
print("Hello from main process", os.getpid())
obs = np.array([0,1,2]) # normal then cold and finally dizzy
pi = np.array([0.6,0.4])
A = np.array([[0.7,0.3],
[0.4,0.6]])
B = np.array([[0.5,0.4,0.1],
[0.1,0.3,0.6]])
viterbi = Viterbi(A,B,pi)
print("Main viterbi object", viterbi)
print("Main viterbi object queue", viterbi.output)
path = viterbi.get_path_parallel(obs)
There are three different Viterbi objects as there are three different processes. So, what you need in terms of parallelism is not processes. You should explore the threading
library that Python offers.
How to distribute multiprocess CPU usage over multiple nodes?
To run your job as-is, you could simply request ncpu=32
and then in your python script set num_cores = 2
. Obviously this has you paying for 32 cores and then leaving 30 of them idle, which is wasteful.
The real problem here is that your current algorithm is memory-bound, not CPU-bound. You should be going to great lengths to read only chunks of your files into memory, operating on the chunks, and then writing the result chunks to disk to be organized later.
Fortunately Dask
is built to do exactly this kind of thing. As a first step, you can take out the parallelize_dataframe
function and directly load and map your some_algorithm
with a dask.dataframe
and dask.array
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import dask.dataframe as dd
import dask.array as da
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = dd.read_csv(direc + file, blocksize=25e6)
a_and_b = da.from_np_stack(direc)
df['col3'] = df.apply(some_algorithm, args=(a_and_b,))
# dask is lazy, this is the only line that does any work
# Saving:
df.to_csv(
direc + 'outfile.csv',
index = False,
compute_kwargs={"scheduler": "threads"}, # also "processes", but try threads first
)
if __name__ == '__main__':
main()
That will require some tweaks to some_algorithm
, and to_csv
and from_np_stack
work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue.
Related Topics
Checking Odd/Even Numbers and Changing Outputs on Number Size
Loop Over a List Containing Path to Sound Files
Pycharm Import External Library
How to Construct a Set Out of List Items in Python
How to Make Lists Contain Only Distinct Element in Python
How to Quickly Estimate the Distance Between Two (Latitude, Longitude) Points
Removing Control Characters from a String in Python
Anaconda/Conda - Install a Specific Package Version
Mixing Cdef and Regular Python Attributes in Cdef Class
Passing Numpy Arrays to a C Function for Input and Output
Building Lxml for Python 2.7 on Windows
How to Pip or Easy_Install Tkinter on Windows
Importerror: No Module Named 'Django.Core.Urlresolvers'
CSV New-Line Character Seen in Unquoted Field Error
Serving Dynamically Generated Zip Archives in Django