How to Split Vector into Columns - Using Pyspark

How to split Vector into columns - using PySpark

Spark >= 3.0.0

Since Spark 3.0.0 this can be done without using UDF.

from pyspark.ml.functions import vector_to_array

(df
.withColumn("xs", vector_to_array("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+

Spark < 3.0.0

One possible approach is to convert to and from RDD:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
("assert", Vectors.dense([1, 2, 3])),
("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])

def extract(row):
return (row.word, ) + tuple(row.vector.toArray().tolist())

df.rdd.map(extract).toDF(["word"]) # Vector values will be named _2, _3, ...

## +-------+---+---+---+
## | word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+

An alternative solution would be to create an UDF:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
def to_array_(v):
return v.toArray().tolist()
# Important: asNondeterministic requires Spark 2.3 or later
# It can be safely removed i.e.
# return udf(to_array_, ArrayType(DoubleType()))(col)
# but at the cost of decreased performance
return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

(df
.withColumn("xs", to_array(col("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+

For Scala equivalent see Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)].

unfold vector column into normal columns in DataFrame

feature column contain the type pyspark.ml.linalg.DenseVector, and the feature vector elements are of type numpy.float64.

To convert numpy dtypes to native python types value.item()

split1_udf = udf(lambda value: value[0].item(), DoubleType())
split2_udf = udf(lambda value: value[1].item(), DoubleType())

Using this fix results the following output

+---+----+---+----+----------+---+----+
| A| B| C| D| features| c1| c2|
+---+----+---+----+----------+---+----+
|0.2|53.3|0.2|53.3|[0.2,53.3]|0.2|53.3|
|1.1|43.3|0.3|51.3|[0.3,51.3]|0.3|51.3|
|2.6|22.4|0.4|43.3|[0.4,43.3]|0.4|43.3|
|3.7|25.6|0.2|23.4|[0.2,23.4]|0.2|23.4|
+---+----+---+----+----------+---+----+

Split & Map fields of array<string> in pyspark

We exchanged a couple of comments above, and I think there's nothing special about the array(array(string)) column. So I post this answer to show the solution posted in How to explode multiple columns of a dataframe in pyspark

df = spark.createDataFrame([
(['1', '2', '3'], [['1'], ['2'], ['3']])
], ['col1', 'col2'])

df = (df
.withColumn('zipped', f.arrays_zip(f.col('col1'), f.col('col2')))
.withColumn('unzipped', f.explode(f.col('zipped')))
.select(f.col('unzipped.col1'),
f.col('unzipped.col2')
)
)

df.show()

The input is:

+---------+---------------+
| col1| col2|
+---------+---------------+
|[1, 2, 3]|[[1], [2], [3]]|
+---------+---------------+

And the output is:

+----+----+
|col1|col2|
+----+----+
| 1| [1]|
| 2| [2]|
| 3| [3]|
+----+----+

PySpark - Split Array Column into smaller chunks

Another way of using transform and filter is using if and using mod to decide the splits and using slice (slices an array)

from pyspark.sql import functions as F
n = 2
df.withColumn("NewCol",F.expr(f"""
filter(
transform(arrayCol,(x,i)-> if (i%{n}=0 ,slice(arrayCol,i+1,{n}), null)),x->
x is not null)
""")).show(truncate=False)


+---------------+---------------------+
|arrayCol |NewCol |
+---------------+---------------------+
|[1, 2, 3, 4, 5]|[[1, 2], [3, 4], [5]]|
+---------------+---------------------+

How to split a list to multiple columns in Pyspark?

It depends on the type of your "list":

  • If it is of type ArrayType():

    df = hc.createDataFrame(sc.parallelize([['a', [1,2,3]], ['b', [2,3,4]]]), ["key", "value"])
    df.printSchema()
    df.show()
    root
    |-- key: string (nullable = true)
    |-- value: array (nullable = true)
    | |-- element: long (containsNull = true)

    you can access the values like you would with python using []:

    df.select("key", df.value[0], df.value[1], df.value[2]).show()
    +---+--------+--------+--------+
    |key|value[0]|value[1]|value[2]|
    +---+--------+--------+--------+
    | a| 1| 2| 3|
    | b| 2| 3| 4|
    +---+--------+--------+--------+

    +---+-------+
    |key| value|
    +---+-------+
    | a|[1,2,3]|
    | b|[2,3,4]|
    +---+-------+
  • If it is of type StructType(): (maybe you built your dataframe by reading a JSON)

    df2 = df.select("key", psf.struct(
    df.value[0].alias("value1"),
    df.value[1].alias("value2"),
    df.value[2].alias("value3")
    ).alias("value"))
    df2.printSchema()
    df2.show()
    root
    |-- key: string (nullable = true)
    |-- value: struct (nullable = false)
    | |-- value1: long (nullable = true)
    | |-- value2: long (nullable = true)
    | |-- value3: long (nullable = true)

    +---+-------+
    |key| value|
    +---+-------+
    | a|[1,2,3]|
    | b|[2,3,4]|
    +---+-------+

    you can directly 'split' the column using *:

    df2.select('key', 'value.*').show()
    +---+------+------+------+
    |key|value1|value2|value3|
    +---+------+------+------+
    | a| 1| 2| 3|
    | b| 2| 3| 4|
    +---+------+------+------+

Pyspark: Split a single column with multiple values into separate columns

What you could do is a chain of when conditions based on a list of regex to decide how to handle the row. Then, depending on the logic, you can extract a list of of key value pairs (column name and value).

The logic in the code below may not be exactly what you need (it produces the output you expect though), but the way it is done, you can easily add or modify conditions.

It could look like this

from pyspark.sql.import functions as F

# So let's first define the conditions and the associated logic
transfo=dict()
# List of pairs
transfo['^([a-zA-Z0-9]+\\s*:\\s*[a-zA-Z0-9]+\\s*)+$'] = F.split(
F.regexp_replace(F.col('contextMap_ID1'), "\\s*:\\s*", ":"), "\\s+")
# 9 digit number
transfo['^[0-9]{9}$'] = F.array(F.concat_ws(':',
F.lit("caseId"),
F.col("contextMap_ID1")))
# Three letters and a number
transfo['^[A-Z]{3}[0-9]+$'] = F.array(F.concat_ws(':',
F.regexp_extract(F.col("contextMap_ID1"), '[A-Z]+', 0),
F.regexp_extract(F.col("contextMap_ID1"), '[0-9]+', 0 )))

# let's combine the conditions into a chain of when/otherwise.
# the initialization of my_fun is meant to avoid discarding rows
# without key value pairs.
my_fun = F.array(F.lit('caseId'))
for x in transfo:
my_fun = F.when(F.col('contextMap_ID1').rlike(x),
transfo[x]).otherwise(my_fun)

Once we prepared the main transformation, we can wrap everything up. We explode the generated key value pairs using my_fun, extract them and pivot on the key to generate new columns.

Note that we add an id in case contextMap_ID1 is not unique. If it is, you may remove the id.

dfx\
.select('contextMap_ID1', F.monotonically_increasing_id().alias('id'))\
.select('contextMap_ID1', 'id', F.explode(my_fun).alias("keyvalue"))\
.withColumn("key", F.split(F.col('keyvalue'), ":").getItem(0))\
.withColumn("value", F.split(F.col('keyvalue'), ":").getItem(1))\
.groupBy("id", "contextMap_ID1")\
.pivot("key")\
.agg(F.first(F.col('value')))
.show(truncate=False)
+-----------+----------------------------------------------------------------------+----------+---------+--------+
|id |contextMap_ID1 |ABC |caseId |personId|
+-----------+----------------------------------------------------------------------+----------+---------+--------+
|34359738368|caseId: 2345678 personId: 87654321 |null |2345678 |87654321|
|25769803776|123456789 |null |123456789|null |
|60129542144|ABC9876543210 |9876543210|null |null |
|8589934592 |blah blah blah createdTimeStamp=2020-08-11 15:31:37.458 blah blah blah|null |null |null |
|51539607552|CRON |null |null |null |
+-----------+----------------------------------------------------------------------+----------+---------+--------+


Related Topics



Leave a reply



Submit