Create a Custom Transformer in Pyspark Ml

Create a custom Transformer in PySpark ML

Can I extend the default one?

Not really. Default Tokenizer is a subclass of pyspark.ml.wrapper.JavaTransformer and, same as other transfromers and estimators from pyspark.ml.feature, delegates actual processing to its Scala counterpart. Since you want to use Python you should extend pyspark.ml.pipeline.Transformer directly.

import nltk

from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class NLTKWordPunctTokenizer(
Transformer, HasInputCol, HasOutputCol,
# Credits https://stackoverflow.com/a/52467470
# by https://stackoverflow.com/users/234944/benjamin-manns
DefaultParamsReadable, DefaultParamsWritable):

stopwords = Param(Params._dummy(), "stopwords", "stopwords",
typeConverter=TypeConverters.toListString)

@keyword_only
def __init__(self, inputCol=None, outputCol=None, stopwords=None):
super(NLTKWordPunctTokenizer, self).__init__()
self.stopwords = Param(self, "stopwords", "")
self._setDefault(stopwords=[])
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(self, inputCol=None, outputCol=None, stopwords=None):
kwargs = self._input_kwargs
return self._set(**kwargs)

def setStopwords(self, value):
return self._set(stopwords=list(value))

def getStopwords(self):
return self.getOrDefault(self.stopwords)

# Required in Spark >= 3.0
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self._set(inputCol=value)

# Required in Spark >= 3.0
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
return self._set(outputCol=value)

def _transform(self, dataset):
stopwords = set(self.getStopwords())

def f(s):
tokens = nltk.tokenize.wordpunct_tokenize(s)
return [t for t in tokens if t.lower() not in stopwords]

t = ArrayType(StringType())
out_col = self.getOutputCol()
in_col = dataset[self.getInputCol()]
return dataset.withColumn(out_col, udf(f, t)(in_col))

Example usage (data from ML - Features):

sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=nltk.corpus.stopwords.words('english'))

tokenizer.transform(sentenceDataFrame).show()

For custom Python Estimator see How to Roll a Custom Estimator in PySpark mllib

⚠ This answer depends on internal API and is compatible with Spark 2.0.3, 2.1.1, 2.2.0 or later (SPARK-19348). For code compatible with previous Spark versions please see revision 8.

How to add my own function as a custom stage in a ML pyspark Pipeline?

I believe this does what you want. You can create a custom Transformer, and add that to the stages in the Pipeline. Note that I slightly changed your functions because we do not have access to all variables you mentioned, but the concept remains the same.

Hope this helps!

import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd

# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnDropper(Transformer):
"""
A custom Transformer which drops all columns that have at least one of the
words from the banned_list in the name.
"""

def __init__(self, banned_list: Iterable[str]):
super(ColumnDropper, self).__init__()
self.banned_list = banned_list

def _transform(self, df: DataFrame) -> DataFrame:
df = df.drop(*[x for x in df.columns if any(y in x for y in self.banned_list)])
return df

# SAMPLE DATA -----------------------------------------------------------------------
df = pd.DataFrame({'ball_column': [0,1,2,3,4,5,6],
'keep_the': [6,5,4,3,2,1,0],
'hall_column': [2,2,2,2,2,2,2] })
df = spark.createDataFrame(df)

# EXAMPLE 1: USE THE TRANSFORMER WITHOUT PIPELINE -----------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
df_example = column_dropper.transform(df)

# EXAMPLE 2: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
bagging = Bucketizer(
splits=[-float("inf"), 3, float("inf")],
inputCol= 'keep_the',
outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()

Output:

+--------+---------------+
|keep_the|keep_the_bucket|
+--------+---------------+
| 6| 1.0|
| 5| 1.0|
| 4| 1.0|
| 3| 1.0|
| 2| 0.0|
| 1| 0.0|
| 0| 0.0|
+--------+---------------+

Also, note that if your custom method requires to be fitted (e.g. a custom StringIndexer), you should also create a custom Estimator:

class CustomTransformer(Transformer):

def _transform(self, df) -> DataFrame:

class CustomEstimator(Estimator):

def _fit(self, df) -> CustomTransformer:

Best way to Create a custom Transformer In Java spark ml

You need to extends org.apache.spark.ml.Transformer class, this is an abstract class so you have to provide implementation of abstract methods.

As I have seen that in most of the cases we needs to provide implementation of transform(Dataset<?> dataset) method and implementation of String uid() .

Example:

public class CustomTransformer extends Transformer{

private final String uid_;

public CustomTransformer(){
this(Identifiable.randomUID("Custom Transformer"));
}

@Override
public String uid(){
return uid_;
}

@Override
public Transformer copy(ParamMap extra){
return defaultCopy(extra);
}

@Override
public Dataset<Row> transform(Dataset<?> dataset){
// do your work and return Dataset object
}

@Override
public StructType transformSchema(StructType schema){
return schema;
}

}

I am also new in this so I suggest you should learn what are the uses of these abstract methods.

pyspark.ml pipelines: are custom transformers necessary for basic preprocessing tasks?

I'd say it is primarily opinion based, although it looks unnecessarily verbose and Python Transformers don't integrate well with the rest of the Pipeline API.

It is also worth pointing out that everything you have here can be easily achieved with SQLTransformer. For example:

from pyspark.ml.feature import SQLTransformer

def column_selector(columns):
return SQLTransformer(
statement="SELECT {} FROM __THIS__".format(", ".join(columns))
)

or

def na_dropper(columns):
return SQLTransformer(
statement="SELECT * FROM __THIS__ WHERE {}".format(
" AND ".join(["{} IS NOT NULL".format(x) for x in columns])
)
)

With a little bit of effort you can use SQLAlchemy with Hive dialect to avoid handwritten SQL.



Related Topics



Leave a reply



Submit