Create a custom Transformer in PySpark ML
Not really. DefaultCan I extend the default one?
Tokenizer
is a subclass of pyspark.ml.wrapper.JavaTransformer
and, same as other transfromers and estimators from pyspark.ml.feature
, delegates actual processing to its Scala counterpart. Since you want to use Python you should extend pyspark.ml.pipeline.Transformer
directly. import nltk
from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
class NLTKWordPunctTokenizer(
Transformer, HasInputCol, HasOutputCol,
# Credits https://stackoverflow.com/a/52467470
# by https://stackoverflow.com/users/234944/benjamin-manns
DefaultParamsReadable, DefaultParamsWritable):
stopwords = Param(Params._dummy(), "stopwords", "stopwords",
typeConverter=TypeConverters.toListString)
@keyword_only
def __init__(self, inputCol=None, outputCol=None, stopwords=None):
super(NLTKWordPunctTokenizer, self).__init__()
self.stopwords = Param(self, "stopwords", "")
self._setDefault(stopwords=[])
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, stopwords=None):
kwargs = self._input_kwargs
return self._set(**kwargs)
def setStopwords(self, value):
return self._set(stopwords=list(value))
def getStopwords(self):
return self.getOrDefault(self.stopwords)
# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)
# Required in Spark >= 3.0
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
return self._set(outputCol=value)
def _transform(self, dataset):
stopwords = set(self.getStopwords())
def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
return [t for t in tokens if t.lower() not in stopwords]
t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))
Example usage (data from ML - Features):sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=nltk.corpus.stopwords.words('english'))
tokenizer.transform(sentenceDataFrame).show()
For custom Python Estimator
see How to Roll a Custom Estimator in PySpark mllib⚠ This answer depends on internal API and is compatible with Spark 2.0.3, 2.1.1, 2.2.0 or later (SPARK-19348). For code compatible with previous Spark versions please see revision 8.
How to add my own function as a custom stage in a ML pyspark Pipeline?
I believe this does what you want. You can create a custom Transformer
, and add that to the stages in the Pipeline
. Note that I slightly changed your functions because we do not have access to all variables you mentioned, but the concept remains the same.
Hope this helps!
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd
# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnDropper(Transformer):
"""
A custom Transformer which drops all columns that have at least one of the
words from the banned_list in the name.
"""
def __init__(self, banned_list: Iterable[str]):
super(ColumnDropper, self).__init__()
self.banned_list = banned_list
def _transform(self, df: DataFrame) -> DataFrame:
df = df.drop(*[x for x in df.columns if any(y in x for y in self.banned_list)])
return df
# SAMPLE DATA -----------------------------------------------------------------------
df = pd.DataFrame({'ball_column': [0,1,2,3,4,5,6],
'keep_the': [6,5,4,3,2,1,0],
'hall_column': [2,2,2,2,2,2,2] })
df = spark.createDataFrame(df)
# EXAMPLE 1: USE THE TRANSFORMER WITHOUT PIPELINE -----------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
df_example = column_dropper.transform(df)
# EXAMPLE 2: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
bagging = Bucketizer(
splits=[-float("inf"), 3, float("inf")],
inputCol= 'keep_the',
outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()
Output:+--------+---------------+
|keep_the|keep_the_bucket|
+--------+---------------+
| 6| 1.0|
| 5| 1.0|
| 4| 1.0|
| 3| 1.0|
| 2| 0.0|
| 1| 0.0|
| 0| 0.0|
+--------+---------------+
Also, note that if your custom method requires to be fitted (e.g. a custom
StringIndexer
), you should also create a custom Estimator
:class CustomTransformer(Transformer):
def _transform(self, df) -> DataFrame:
class CustomEstimator(Estimator):
def _fit(self, df) -> CustomTransformer:
Best way to Create a custom Transformer In Java spark ml
You need to extends org.apache.spark.ml.Transformer class, this is an abstract class so you have to provide implementation of abstract methods.
As I have seen that in most of the cases we needs to provide implementation of transform(Dataset<?> dataset) method and implementation of String uid() .
Example:
public class CustomTransformer extends Transformer{
private final String uid_;
public CustomTransformer(){
this(Identifiable.randomUID("Custom Transformer"));
}
@Override
public String uid(){
return uid_;
}
@Override
public Transformer copy(ParamMap extra){
return defaultCopy(extra);
}
@Override
public Dataset<Row> transform(Dataset<?> dataset){
// do your work and return Dataset object
}
@Override
public StructType transformSchema(StructType schema){
return schema;
}
}I am also new in this so I suggest you should learn what are the uses of these abstract methods.
pyspark.ml pipelines: are custom transformers necessary for basic preprocessing tasks?
I'd say it is primarily opinion based, although it looks unnecessarily verbose and Python Transformers
don't integrate well with the rest of the Pipeline
API.
It is also worth pointing out that everything you have here can be easily achieved with SQLTransformer
. For example:
from pyspark.ml.feature import SQLTransformer
def column_selector(columns):
return SQLTransformer(
statement="SELECT {} FROM __THIS__".format(", ".join(columns))
)
ordef na_dropper(columns):
return SQLTransformer(
statement="SELECT * FROM __THIS__ WHERE {}".format(
" AND ".join(["{} IS NOT NULL".format(x) for x in columns])
)
)
With a little bit of effort you can use SQLAlchemy with Hive dialect to avoid handwritten SQL.
Related Topics
How to Access Class Member Variables in Python
Does Conda Replace the Need for Virtualenv
Scipy Curve_Fit Doesn't Like Math Module
How to Check If Stdin Has Some Data
In Python, What Happens When You Import Inside of a Function
How to Fix Selenium Webdriverexception: the Browser Appears to Have Exited Before We Could Connect
Overflowerror: (34, 'Result Too Large')
What Are All the Dtypes That Pandas Recognizes
Difference Between Exit(0) and Exit(1) in Python
Multiple Plots in One Figure in Python
Pandas: Subindexing Dataframes: Copies VS Views
Loading JSONl File as JSON Objects
How to Save and Restore Multiple Variables in Python
Splitting List Based on Missing Numbers in a Sequence
Make Part of a Matplotlib Title Bold and a Different Color