Encode and assemble multiple features in PySpark
Spark >= 2.3, >= 3.0
Since Spark 2.3 OneHotEncoder
is deprecated in favor of OneHotEncoderEstimator
. If you use a recent release please modify encoder
code
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
In Spark 3.0 this variant has been renamed to OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
Additionally StringIndexer
has been extended to support multiple input columns:
StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])
Spark < 2.3
Well, you can write an UDF but why would you? There are already quite a few tools designed to handle this category of tasks:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector
row = Row("gender", "foo", "bar")
df = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()
First of all StringIndexer
.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()
## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## | 0| 3.0| 0.0|
## | 1| 1.0| 1.0|
## | 1|-1.0| 1.0|
## | 0|-3.0| 0.0|
## +------+----+--------------+
Next OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()
## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## | 0| 3.0| 0.0|(1,[0],[1.0])|
## | 1| 1.0| 1.0| (1,[],[])|
## | 1|-1.0| 1.0| (1,[],[])|
## | 0|-3.0| 0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+
VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["gender_vector", "bar", "foo"], outputCol="features")
encoded_df_with_indexed_bar = (vector_indexer
.fit(encoded_df)
.transform(encoded_df))
final_df = assembler.transform(encoded_df)
If bar
contained categorical variables you could use VectorIndexer
to set required metadata:
from pyspark.ml.feature import VectorIndexer
vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
but it is not the case here.
Finally you can wrap all of that using pipelines:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)
Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer
and VectorIndexer
.
Another way to get a comparable output is RFormula
which:
RFormula
produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double withStringIndexer
. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.
from pyspark.ml.feature import RFormula
rf = RFormula(formula="~ gender + bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)
As you can see it is much more concise, but harder to compose doesn't allow much customization. Nevertheless the result for a simple pipeline like this one will be identical:
final_df_rf.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
final_df.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
Regarding your questions:
make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)
It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.
take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField
schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)
Note:
For Spark 1.x replace pyspark.ml.linalg
with pyspark.mllib.linalg
.
Create features column in PySpark with both numerical and categorical variables
I have found a way to do it but I not sure if this is the most efficient way of achieving what I want.
cat_cols = ["cat_1", "cat_2", "cat_3"]
num_cols = ["num_1", "num_2", "num_3", "num_4"]
indexers = [StringIndexer(inputCol = c, outputCol="{0}_indexed".format(c)) for c in cat_cols]
encoders = [StringIndexer(inputCol = indexer.getOutputCol(), outputCol = "{0}_encoded".format(indexer.getOutputCol()))
for indexer in indexers]
assemblerCat = VectorAssembler(inputCols = [encoder.getOutputCol() for encoder in encoders], outputCol = "cat")
pipelineCat = Pipeline(stages = indexers + encoders + [assemblerCat])
df = pipelineCat.fit(df).transform(df)
assemblerNum = VectorAssembler(inputCols = num_cols, outputCol = "num")
pipelineNum = Pipeline(stages = [assemblerNum])
df = pipelineNum.fit(df).transform(df)
assembler = VectorAssembler(inputCols = ["cat", "num"], outputCol = "features")
pipeline = Pipeline(stages = [assembler])
df = pipeline.fit(df).transform(df)
Essentially I am creating one pipeline for categorical and one for numerical variables and then I am merging them to create a single "features" column which contains both.
Aggregating a One-Hot Encoded feature in pyspark
You can use Counvectorizer for this. It converts category index array to encoded vector.
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.sql import functions as F
df = sc.parallelize([
(1, 'grocery'),
(1, 'drinks'),
(1, 'bakery'),
(2, 'grocery'),
(3, 'bakery'),
(3, 'bakery'),
]).toDF(["id", "category"]) \
.groupBy('id') \
.agg(F.collect_list('category').alias('categoryIndexes'))
cv = CountVectorizer(inputCol='categoryIndexes', outputCol='categoryVec')
transformed_df = cv.fit(df).transform(df)
transformed_df.show()
results:
+---+--------------------+--------------------+
| id| categoryIndexes| categoryVec|
+---+--------------------+--------------------+
| 1|[grocery, drinks,...|(3,[0,1,2],[1.0,1...|
| 3| [bakery, bakery]| (3,[0],[2.0])|
| 2| [grocery]| (3,[1],[1.0])|
+---+--------------------+--------------------+
One-Hot Encoding to a list feature. Pyspark
Not sure if there is a way to apply one-hot encoding directly, I would also like to know.
In the meantime, the straightforward way of doing that is to collect and explode tags
in order to create one-hot encoding columns.
Example:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
{"ID": 1, "tags": ["A", "B", "C"]},
{"ID": 2, "tags": ["A", "D", "E"]},
{"ID": 3, "tags": ["A", "C", "F"]},
]
)
tags = [
x[0]
for x in df.select(F.explode("tags").alias("tags"))
.distinct()
.orderBy("tags")
.collect()
]
df = df.select(
"*",
*[
F.array_contains("tags", tag).alias("tags{}".format(tag)).cast("integer")
for tag in tags
]
)
Result:
+---+---------+-----+-----+-----+-----+-----+-----+
|ID |tags |tagsA|tagsB|tagsC|tagsD|tagsE|tagsF|
+---+---------+-----+-----+-----+-----+-----+-----+
|1 |[A, B, C]|1 |1 |1 |0 |0 |0 |
|2 |[A, D, E]|1 |0 |0 |1 |1 |0 |
|3 |[A, C, F]|1 |0 |1 |0 |0 |1 |
+---+---------+-----+-----+-----+-----+-----+-----+
How to merge multiple feature vectors in DataFrame?
You can use VectorAssembler:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.DataFrame
val df: DataFrame = ???
val assembler = new VectorAssembler()
.setInputCols(Array("text_features", "color_features", "type_features"))
.setOutputCol("features")
val transformed = assembler.transform(df)
For PySpark example see: Encode and assemble multiple features in PySpark
encode pyspark column creating another column of factorial values
You can use StringIndexer from ml - lib after casting the array column to string:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="PathsStr", outputCol="encodedPaths")
df2 = df.withColumn("PathsStr",F.col("Paths").cast("string"))
#or df2 = df.withColumn("PathsStr",F.concat_ws(",","Paths"))
out = stringIndexer.fit(df2).transform(df2)\
.withColumn("encodedPaths",F.col("encodedPaths")+1)\
.select(*df.columns,"encodedPaths")
out.show(truncate=False)
+---------------------+------------+
|Paths |encodedPaths|
+---------------------+------------+
|[link1, link2, link3]|1.0 |
|[link1, link2, link4]|2.0 |
|[link1, link2, link3]|1.0 |
|[link1, link2, link4]|2.0 |
+---------------------+------------+
Related Topics
What Determines Which Strings Are Interned and When
When Are Objects Garbage Collected in Python
Tkinter Vanishing Photoimage Issue
How to Run Scrapy from Within a Python Script
How to Increment a Shared Counter from Multiple Processes
What Is the Syntax to Insert One List into Another List in Python
Progress Indicator During Pandas Operations
Ssl Insecureplatform Error When Using Requests Package
Adding a Legend to Pyplot in Matplotlib in the Simplest Manner Possible
Sorting by a Custom List in Pandas
Pyspark: Split Multiple Array Columns into Rows
How to Insert Pandas Dataframe via MySQLdb into Database
Best Way to Format Integer as String with Leading Zeros
Plotting Grouped Data in Same Plot Using Pandas
Non-Ascii Characters in Matplotlib