Shipping Python Modules in Pyspark to Other Nodes

Shipping Python modules in pyspark to other nodes

If you can package your module into a .egg or .zip file, you should be able to list it in pyFiles when constructing your SparkContext (or you can add it later through sc.addPyFile).

For Python libraries that use setuptools, you can run python setup.py bdist_egg to build an egg distribution.

Another option is to install the library cluster-wide, either by using pip/easy_install on each machine or by sharing a Python installation over a cluster-wide filesystem (like NFS).

How do I get Python libraries in pyspark?

In the Spark context try using:

SparkContext.addPyFile("module.py")  # Also supports .zip

Quoting from the docs:

Add a .py or .zip dependency for all tasks to be executed on this
SparkContext in the future. The path passed can be either a local
file, a file in HDFS (or other Hadoop-supported filesystems), or an
HTTP, HTTPS or FTP URI.

Easiest way to install Python dependencies on Spark executor nodes?

Actually having actually tried it, I think the link I posted as a comment doesn't do exactly what you want with dependencies. What you are quite reasonably asking for is a way to have Spark play nicely with setuptools and pip regarding installing dependencies. It blows my mind that this isn't supported better in Spark. The third-party dependency problem is largely solved in general purpose Python, but under Spark, it seems the assumption is you'll go back to manual dependency management or something.

I have been using an imperfect but functional pipeline based on virtualenv. The basic idea is

  1. Create a virtualenv purely for your Spark nodes
  2. Each time you run a Spark job, run a fresh pip install of all your own in-house Python libraries. If you have set these up with setuptools, this will install their dependencies
  3. Zip up the site-packages dir of the virtualenv. This will include your library and it's dependencies, which the worker nodes will need, but not the standard Python library, which they already have
  4. Pass the single .zip file, containing your libraries and their dependencies as an argument to --py-files

Of course you would want to code up some helper scripts to manage this process. Here is a helper script adapted from one I have been using, which could doubtless be improved a lot:

#!/usr/bin/env bash
# helper script to fulfil Spark's python packaging requirements.
# Installs everything in a designated virtualenv, then zips up the virtualenv for using as an the value of
# supplied to --py-files argument of `pyspark` or `spark-submit`
# First argument should be the top-level virtualenv
# Second argument is the zipfile which will be created, and
# which you can subsequently supply as the --py-files argument to
# spark-submit
# Subsequent arguments are all the private packages you wish to install
# If these are set up with setuptools, their dependencies will be installed

VENV=$1; shift
ZIPFILE=$1; shift
PACKAGES=$*

. $VENV/bin/activate
for pkg in $PACKAGES; do
pip install --upgrade $pkg
done
TMPZIP="$TMPDIR/$RANDOM.zip" # abs path. Use random number to avoid clashes with other processes
( cd "$VENV/lib/python2.7/site-packages" && zip -q -r $TMPZIP . )
mv $TMPZIP $ZIPFILE

I have a collection of other simple wrapper scripts I run to submit my spark jobs. I simply call this script first as part of that process and make sure that the second argument (name of a zip file) is then passed as the --py-files argument when I run spark-submit (as documented in the comments). I always run these scripts, so I never end up accidentally running old code. Compared to the Spark overhead, the packaging overhead is minimal for my small scale project.

There are loads of improvements that could be made – eg being smart about when to create a new zip file, splitting it up two zip files, one containing often-changing private packages, and one containing rarely changing dependencies, which don't need to be rebuilt so often. You could be smarter about checking for file changes before rebuilding the zip. Also checking validity of arguments would be a good idea. However for now this suffices for my purposes.

The solution I have come up with is not designed for large-scale dependencies like NumPy specifically (although it may work for them). Also, it won't work if you are building C-based extensions, and your driver node has a different architecture to your cluster nodes.

I have seen recommendations elsewhere to just run a Python distribution like Anaconda on all your nodes since it already includes NumPy (and many other packages), and that might be the better way to get NumPy as well as other C-based extensions going. Regardless, we can't always expect Anaconda to have the PyPI package we want in the right version, and in addition you might not be able to control your Spark environment to be able to put Anaconda on it, so I think this virtualenv-based approach is still helpful.

pyspark import user defined module or .py files

It turned out that since I'm submitting my application in client mode, then the machine I run the spark-submit command from will run the driver program and will need to access the module files.

Sample Image

I added my module to the PYTHONPATH environment variable on the node I'm submitting my job from by adding the following line to my .bashrc file (or execute it before submitting my job).

export PYTHONPATH=$PYTHONPATH:/home/welshamy/modules

And that solved the problem. Since the path is on the driver node, I don't have to zip and ship the module with --py-files or use sc.addPyFile().

The key to solving any pyspark module import error problem is understanding whether the driver or worker (or both) nodes need the module files.

Important
If the worker nodes need your module files, then you need to pass it as a zip archive with --py-files and this argument must precede your .py file argument. For example, notice the order of arguments in these examples:

This is correct:

./bin/spark-submit --py-files wesam.zip mycode.py

this is not correct:

./bin/spark-submit mycode.py --py-files wesam.zip

Bundling Python3 packages for PySpark results in missing imports

Update: There's a cohesive repo that includes a sample project that does this quite wonderfully. You should take a look, especially if my example below doesn't work for you. The repo is here: https://github.com/massmutual/sample-pyspark-application
and includes this example for running on YARN:
https://github.com/massmutual/sample-pyspark-application/blob/master/setup-and-submit.sh
that expects you to first export several environment variables. (The values I provided are specific to EMR, so your values might be different.)

export HADOOP_CONF_DIR="/etc/hadoop/conf"
export PYTHON="/usr/bin/python3"
export SPARK_HOME="/usr/lib/spark"
export PATH="$SPARK_HOME/bin:$PATH"

As mentioned here: I can't seem to get --py-files on Spark to work
it is necessary to use something like virtualenv (or perhaps conda might work) to avoid experiencing problems associated with the compilation of C libraries for Python packages (such as Numpy) that depend upon the underlying hardware architecture in a manner that fails to successfully port to other machines in the cluster due to hard links in the dependencies and/or task nodes that may have different hardware from the master node instance.

Some of the differences between --archives and --py-files are discussed here: Shipping and using virtualenv in a pyspark job

I suggest using --archives with virtualenv for providing the zipped file that contains package dependencies to avoid some of the problems I mentioned above.

For example, from an Amazon Elastic Map Reduce (EMR) cluster, while ssh'd into the master instance, I was able to successfully use spark-submit to execute a test python script from a virtualenv environment like this:

pip-3.4 freeze | egrep -v sagemaker > requirements.txt
# Above line is just in case you want to port installed packages across environments.
virtualenv -p python3 spark_env3
virtualenv -p python3 --relocatable spark_env3
source spark_env3/bin/activate
sudo pip-3.4 install -U pandas boto3 findspark jaydebeapi
# Note that the above libraries weren't required for the test script, but I'm showing how you can add additional dependencies if needed.
sudo pip-3.4 install -r requirements.txt
# The above line is just to show how you can load from a requirements file if needed.
cd spark_env3
# We must cd into the directory before we zip it for Spark to find the resources.
zip -r ../spark_env3_inside.zip *
# Be sure to cd back out after building the zip file.
cd ..

PYSPARK_PYTHON=./spark_env3/bin/python3 spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./spark_env3/bin/python3 \
--master yarn-cluster \
--archives /home/hadoop/spark_env3_inside.zip#spark_env3 \
test_spark.py

Note that the hashtag near the end of the last line above is not a comment. It is a directive to spark-submit, as explained here: Upload zip file using --archives option of spark-submit on yarn

The source of the test script that I'm running is from this article that talks about using conda instead of virtualenv for running pyspark jobs: http://quasiben.github.io/blog/2016/4/15/conda-spark/

and contains this code for the test_spark.py script:

# test_spark.py
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf()
conf.setAppName("get-hosts")

sc = SparkContext(conf=conf)

def noop(x):
import socket
import sys
return socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)

rdd = sc.parallelize(range(1000), 100)
hosts = rdd.map(noop).distinct().collect()
print(hosts)

If you want some background information about using virtualenv to execute a pyspark job, as @Mariusz mentioned already, there is a useful example in this blog post: https://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv/ (though it doesn't explain some of the subtleties that I clarified with the other links that I provided).

There is also an additional example in the answer post provided here: Elephas not loaded in PySpark: No module named elephas.spark_model

There's yet another example available here: https://community.hortonworks.com/articles/104947/using-virtualenv-with-pyspark.html

PySpark Copy objects and libraries to all worker nodes - Pandas UDF

If a user defined function is referencing objects outside of it`s scope, spark will automatically try to pickle those objects and ship them with each parallel task.

Multiple spark tasks can run in parallel on one machine on a separate core.

Check the doc about shared variables.

These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program.

There is also an option to broadcast the variable to each node, so that it exist only once per executor jvm.

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Importing user-defined module fails in PySpark

If you are running your jobs through spark-submit you need to provide the python files using the --py-files flag. First, create a .zip file with all the dependencies:

pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .

and finally pass the dependencies using --py-files:

spark-submit --py-files dependencies.zip your_spark_job.py

Finally inside your spark job's script add the following line:

sc.addPyFile("dependencies.zip")

Alternatively, if you are using a Jupyter Notebook, all you have to do is to append the module’s path to PYTHONPATH:

export PYTHONPATH="${PYTHONPATH}:/path/to/your/service.py"


Related Topics



Leave a reply



Submit