How to split Vector into columns - using PySpark
Spark >= 3.0.0
Since Spark 3.0.0 this can be done without using UDF.
from pyspark.ml.functions import vector_to_array
(df
.withColumn("xs", vector_to_array("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
Spark < 3.0.0
One possible approach is to convert to and from RDD:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
("assert", Vectors.dense([1, 2, 3])),
("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])
def extract(row):
return (row.word, ) + tuple(row.vector.toArray().tolist())
df.rdd.map(extract).toDF(["word"]) # Vector values will be named _2, _3, ...
## +-------+---+---+---+
## | word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+
An alternative solution would be to create an UDF:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
# Important: asNondeterministic requires Spark 2.3 or later
# It can be safely removed i.e.
# return udf(to_array_, ArrayType(DoubleType()))(col)
# but at the cost of decreased performance
return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
(df
.withColumn("xs", to_array(col("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
For Scala equivalent see Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)].
unfold vector column into normal columns in DataFrame
feature column contain the type pyspark.ml.linalg.DenseVector
, and the feature vector elements are of type numpy.float64
.
To convert numpy dtypes
to native python
types value.item()
split1_udf = udf(lambda value: value[0].item(), DoubleType())
split2_udf = udf(lambda value: value[1].item(), DoubleType())
Using this fix results the following output
+---+----+---+----+----------+---+----+
| A| B| C| D| features| c1| c2|
+---+----+---+----+----------+---+----+
|0.2|53.3|0.2|53.3|[0.2,53.3]|0.2|53.3|
|1.1|43.3|0.3|51.3|[0.3,51.3]|0.3|51.3|
|2.6|22.4|0.4|43.3|[0.4,43.3]|0.4|43.3|
|3.7|25.6|0.2|23.4|[0.2,23.4]|0.2|23.4|
+---+----+---+----+----------+---+----+
Split & Map fields of array<string> in pyspark
We exchanged a couple of comments above, and I think there's nothing special about the array(array(string)) column. So I post this answer to show the solution posted in How to explode multiple columns of a dataframe in pyspark
df = spark.createDataFrame([
(['1', '2', '3'], [['1'], ['2'], ['3']])
], ['col1', 'col2'])
df = (df
.withColumn('zipped', f.arrays_zip(f.col('col1'), f.col('col2')))
.withColumn('unzipped', f.explode(f.col('zipped')))
.select(f.col('unzipped.col1'),
f.col('unzipped.col2')
)
)
df.show()
The input is:
+---------+---------------+
| col1| col2|
+---------+---------------+
|[1, 2, 3]|[[1], [2], [3]]|
+---------+---------------+
And the output is:
+----+----+
|col1|col2|
+----+----+
| 1| [1]|
| 2| [2]|
| 3| [3]|
+----+----+
PySpark - Split Array Column into smaller chunks
Another way of using transform
and filter
is using if and using mod to decide the splits and using slice
(slices an array)
from pyspark.sql import functions as F
n = 2
df.withColumn("NewCol",F.expr(f"""
filter(
transform(arrayCol,(x,i)-> if (i%{n}=0 ,slice(arrayCol,i+1,{n}), null)),x->
x is not null)
""")).show(truncate=False)
+---------------+---------------------+
|arrayCol |NewCol |
+---------------+---------------------+
|[1, 2, 3, 4, 5]|[[1, 2], [3, 4], [5]]|
+---------------+---------------------+
How to split a list to multiple columns in Pyspark?
It depends on the type of your "list":
If it is of type
ArrayType()
:df = hc.createDataFrame(sc.parallelize([['a', [1,2,3]], ['b', [2,3,4]]]), ["key", "value"])
df.printSchema()
df.show()
root
|-- key: string (nullable = true)
|-- value: array (nullable = true)
| |-- element: long (containsNull = true)you can access the values like you would with python using
[]
:df.select("key", df.value[0], df.value[1], df.value[2]).show()
+---+--------+--------+--------+
|key|value[0]|value[1]|value[2]|
+---+--------+--------+--------+
| a| 1| 2| 3|
| b| 2| 3| 4|
+---+--------+--------+--------+
+---+-------+
|key| value|
+---+-------+
| a|[1,2,3]|
| b|[2,3,4]|
+---+-------+If it is of type
StructType()
: (maybe you built your dataframe by reading a JSON)df2 = df.select("key", psf.struct(
df.value[0].alias("value1"),
df.value[1].alias("value2"),
df.value[2].alias("value3")
).alias("value"))
df2.printSchema()
df2.show()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = false)
| |-- value1: long (nullable = true)
| |-- value2: long (nullable = true)
| |-- value3: long (nullable = true)
+---+-------+
|key| value|
+---+-------+
| a|[1,2,3]|
| b|[2,3,4]|
+---+-------+you can directly 'split' the column using
*
:df2.select('key', 'value.*').show()
+---+------+------+------+
|key|value1|value2|value3|
+---+------+------+------+
| a| 1| 2| 3|
| b| 2| 3| 4|
+---+------+------+------+
Pyspark: Split a single column with multiple values into separate columns
What you could do is a chain of when
conditions based on a list of regex to decide how to handle the row. Then, depending on the logic, you can extract a list of of key value pairs (column name and value).
The logic in the code below may not be exactly what you need (it produces the output you expect though), but the way it is done, you can easily add or modify conditions.
It could look like this
from pyspark.sql.import functions as F
# So let's first define the conditions and the associated logic
transfo=dict()
# List of pairs
transfo['^([a-zA-Z0-9]+\\s*:\\s*[a-zA-Z0-9]+\\s*)+$'] = F.split(
F.regexp_replace(F.col('contextMap_ID1'), "\\s*:\\s*", ":"), "\\s+")
# 9 digit number
transfo['^[0-9]{9}$'] = F.array(F.concat_ws(':',
F.lit("caseId"),
F.col("contextMap_ID1")))
# Three letters and a number
transfo['^[A-Z]{3}[0-9]+$'] = F.array(F.concat_ws(':',
F.regexp_extract(F.col("contextMap_ID1"), '[A-Z]+', 0),
F.regexp_extract(F.col("contextMap_ID1"), '[0-9]+', 0 )))
# let's combine the conditions into a chain of when/otherwise.
# the initialization of my_fun is meant to avoid discarding rows
# without key value pairs.
my_fun = F.array(F.lit('caseId'))
for x in transfo:
my_fun = F.when(F.col('contextMap_ID1').rlike(x),
transfo[x]).otherwise(my_fun)
Once we prepared the main transformation, we can wrap everything up. We explode the generated key value pairs using my_fun
, extract them and pivot on the key to generate new columns.
Note that we add an id in case contextMap_ID1
is not unique. If it is, you may remove the id.
dfx\
.select('contextMap_ID1', F.monotonically_increasing_id().alias('id'))\
.select('contextMap_ID1', 'id', F.explode(my_fun).alias("keyvalue"))\
.withColumn("key", F.split(F.col('keyvalue'), ":").getItem(0))\
.withColumn("value", F.split(F.col('keyvalue'), ":").getItem(1))\
.groupBy("id", "contextMap_ID1")\
.pivot("key")\
.agg(F.first(F.col('value')))
.show(truncate=False)
+-----------+----------------------------------------------------------------------+----------+---------+--------+
|id |contextMap_ID1 |ABC |caseId |personId|
+-----------+----------------------------------------------------------------------+----------+---------+--------+
|34359738368|caseId: 2345678 personId: 87654321 |null |2345678 |87654321|
|25769803776|123456789 |null |123456789|null |
|60129542144|ABC9876543210 |9876543210|null |null |
|8589934592 |blah blah blah createdTimeStamp=2020-08-11 15:31:37.458 blah blah blah|null |null |null |
|51539607552|CRON |null |null |null |
+-----------+----------------------------------------------------------------------+----------+---------+--------+
Related Topics
Send a File Through Sockets in Python
Python: Create 50 Objects Using a for Loop
Creating a List of Random Numbers Without Duplicates in Python
How to Assign Class Instance to a Variable and Use That in Other Class
Sharing a Complex Object Between Processes
When to Use Cla(), Clf() or Close() for Clearing a Plot in Matplotlib
How to Stop Execution of All Cells in Jupyter Notebook
Search and Replace a Line in a File in Python
How to Get Current Cpu and Ram Usage in Python
Getting the Id of the Last Record Inserted for Postgresql Serial Key With Python
Loop Over List of Elements for Find_Element_By_Xpath() by Selenium and Webdriver
Python Json Serialize a Decimal Object
How to Find the Closest Values in a Pandas Series to an Input Number
Pandas: Group by Name and Take Row With Most Recent Date
Plot Line Graph from Pandas Dataframe (With Multiple Lines)
Set Working Directory in Python/Spyder So That It's Reproducible
How to Count Values Greater Than the Group Mean in Pandas
Google Chrome Closes Immediately After Being Launched With Selenium