Pyspark: Split Multiple Array Columns into Rows

Pyspark: Split multiple array columns into rows

Spark >= 2.4

You can replace zip_ udf with arrays_zip function

from pyspark.sql.functions import arrays_zip, col, explode

(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2.4

With DataFrames and UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)

(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

With RDDs:

(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))

Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")

or even:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))

(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

Pyspark DataFrame: Split column with multiple values into rows

You can use explode but first you'll have to convert the string representation of the array into an array.

One way is to use regexp_replace to remove the leading and trailing square brackets, followed by split on ", ".

from pyspark.sql.functions import col, explode, regexp_replace, split

df.withColumn(
"col2",
explode(split(regexp_replace(col("col2"), "(^\[)|(\]$)", ""), ", "))
).show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#| z1| a1| foo|
#| z1| b2| foo|
#| z1| c3| foo|
#+----+----+----+

Split multiple array columns into rows

the below answer might be helpful to you,

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

val arrayData = Seq(
Row(1,List(1,2,3),List(0,8,9),"foo"))
val arraySchema = new StructType().add("a",IntegerType).add("b", ArrayType(IntegerType)).add("c", ArrayType(IntegerType)).add("d",StringType)

val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema)

df.select($"a",$"d",explode($"b",$"c")).show(false)

val zip = udf((x: Seq[Int], y: Seq[Int]) => x.zip(y))

df.withColumn("vars", explode(zip($"b", $"c"))).select($"a", $"d",$"vars._1".alias("b"), $"vars._2".alias("c")).show()

/*
+---+---+---+---+
| a| d| b| c|
+---+---+---+---+
| 1|foo| 1| 0|
| 1|foo| 2| 8|
| 1|foo| 3| 9|
+---+---+---+---+
*/

Pyspark Dataframe with multiple array columns into multiple rows with one value each

In case both columns list_a and list_b could be empty, I would add a 4th case in the dataset

data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])

I would then split the original df in 3 (both nulls, list_a exploded and list_b exploded) and the execute a unionByName

dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
.withColumn("list_a", lit(None))\
.withColumn("list_b", lit(None))

df1 = df\
.withColumn("list_a", explode_outer(col("list_a")))\
.withColumn("list_b", lit(None))\
.filter(~col("list_a").isNull())

df2 = df\
.withColumn("list_b", explode_outer(col("list_b")))\
.withColumn("list_a", lit(None))\
.filter(~col("list_b").isNull())

merged_df = df1.unionByName(df2).unionByName(dfnulls)

merged_df.show()

+---+------+------+
| id|list_a|list_b|
+---+------+------+
| A| a| null|
| A| c| null|
| B| a| null|
| B| b| null|
| A| null| 1|
| A| null| 5|
| C| null| 1|
| D| null| null|
+---+------+------+

PySpark - Split Array Column into smaller chunks

Another way of using transform and filter is using if and using mod to decide the splits and using slice (slices an array)

from pyspark.sql import functions as F
n = 2
df.withColumn("NewCol",F.expr(f"""
filter(
transform(arrayCol,(x,i)-> if (i%{n}=0 ,slice(arrayCol,i+1,{n}), null)),x->
x is not null)
""")).show(truncate=False)

+---------------+---------------------+
|arrayCol |NewCol |
+---------------+---------------------+
|[1, 2, 3, 4, 5]|[[1, 2], [3, 4], [5]]|
+---------------+---------------------+

Explode multiple columns to rows in pyspark

I did this by passing columns as list to a for loop and exploded the dataframe for every element in list

split a array columns into rows pyspark

You can convert items to map:

from pyspark.sql.functions import *
from operator import itemgetter

@udf("map<string, string>")
def as_map(vks):
return {k: v for v, k in vks}

remapped = new_df.select("frequency", as_map("items").alias("items"))

Collect the keys:

keys = remapped.select("items").rdd \
.flatMap(lambda x: x[0].keys()).distinct().collect()

And select:

remapped.select([col("items")[key] for key in keys] + ["frequency"]) 

+------------+------------------+---------+
|items[color]|items[productcode]|frequency|
+------------+------------------+---------+
| red| hello| 7|
| blue| hi| 8|
| black| hoi| 7|
+------------+------------------+---------+

Dataframe explode list columns in multiple rows

# This is not part of the solution, just creation of the data sample
# df = spark.sql("select stack(1, array(1, 2, 3, 4, 5, 6) ,array('x1', 'x2', 'x3', 'x4', 'x5', 'x6') ,array('y1', 'y2', 'y3', 'y4', 'y5', 'y6') ,array('v1', 'v2', 'v3', 'v4', 'v5', 'v6')) as (Country_1, Country_2,Country_3,Country_4)")

df.selectExpr('inline(arrays_zip(*))').show()


+---------+---------+---------+---------+
|Country_1|Country_2|Country_3|Country_4|
+---------+---------+---------+---------+
| 1| x1| y1| v1|
| 2| x2| y2| v2|
| 3| x3| y3| v3|
| 4| x4| y4| v4|
| 5| x5| y5| v5|
| 6| x6| y6| v6|
+---------+---------+---------+---------+


Related Topics



Leave a reply



Submit