Pyspark: Split multiple array columns into rows
Spark >= 2.4
You can replace zip_
udf
with arrays_zip
function
from pyspark.sql.functions import arrays_zip, col, explode
(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))
Spark < 2.4
With DataFrames
and UDF:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
With RDDs
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
or even:
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:
# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))
Pyspark DataFrame: Split column with multiple values into rows
You can use explode
but first you'll have to convert the string representation of the array into an array.
One way is to use regexp_replace
to remove the leading and trailing square brackets, followed by split
on ", "
.
from pyspark.sql.functions import col, explode, regexp_replace, split
df.withColumn(
"col2",
explode(split(regexp_replace(col("col2"), "(^\[)|(\]$)", ""), ", "))
).show()
#+----+----+----+
#|col1|col2|col3|
#+----+----+----+
#| z1| a1| foo|
#| z1| b2| foo|
#| z1| c3| foo|
#+----+----+----+
Split multiple array columns into rows
the below answer might be helpful to you,
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
val arrayData = Seq(
Row(1,List(1,2,3),List(0,8,9),"foo"))
val arraySchema = new StructType().add("a",IntegerType).add("b", ArrayType(IntegerType)).add("c", ArrayType(IntegerType)).add("d",StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData),arraySchema)
df.select($"a",$"d",explode($"b",$"c")).show(false)
val zip = udf((x: Seq[Int], y: Seq[Int]) => x.zip(y))
df.withColumn("vars", explode(zip($"b", $"c"))).select($"a", $"d",$"vars._1".alias("b"), $"vars._2".alias("c")).show()
/*
+---+---+---+---+
| a| d| b| c|
+---+---+---+---+
| 1|foo| 1| 0|
| 1|foo| 2| 8|
| 1|foo| 3| 9|
+---+---+---+---+
*/
Pyspark Dataframe with multiple array columns into multiple rows with one value each
In case both columns list_a and list_b could be empty, I would add a 4th case in the dataset
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])
I would then split the original df in 3 (both nulls, list_a exploded and list_b exploded) and the execute a unionByName
dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
.withColumn("list_a", lit(None))\
.withColumn("list_b", lit(None))
df1 = df\
.withColumn("list_a", explode_outer(col("list_a")))\
.withColumn("list_b", lit(None))\
.filter(~col("list_a").isNull())
df2 = df\
.withColumn("list_b", explode_outer(col("list_b")))\
.withColumn("list_a", lit(None))\
.filter(~col("list_b").isNull())
merged_df = df1.unionByName(df2).unionByName(dfnulls)
merged_df.show()
+---+------+------+
| id|list_a|list_b|
+---+------+------+
| A| a| null|
| A| c| null|
| B| a| null|
| B| b| null|
| A| null| 1|
| A| null| 5|
| C| null| 1|
| D| null| null|
+---+------+------+
PySpark - Split Array Column into smaller chunks
Another way of using transform
and filter
is using if and using mod to decide the splits and using slice
(slices an array)
from pyspark.sql import functions as F
n = 2
df.withColumn("NewCol",F.expr(f"""
filter(
transform(arrayCol,(x,i)-> if (i%{n}=0 ,slice(arrayCol,i+1,{n}), null)),x->
x is not null)
""")).show(truncate=False)
+---------------+---------------------+
|arrayCol |NewCol |
+---------------+---------------------+
|[1, 2, 3, 4, 5]|[[1, 2], [3, 4], [5]]|
+---------------+---------------------+
Explode multiple columns to rows in pyspark
I did this by passing columns as list to a for loop and exploded the dataframe for every element in list
split a array columns into rows pyspark
You can convert items to map
:
from pyspark.sql.functions import *
from operator import itemgetter
@udf("map<string, string>")
def as_map(vks):
return {k: v for v, k in vks}
remapped = new_df.select("frequency", as_map("items").alias("items"))
Collect the keys:
keys = remapped.select("items").rdd \
.flatMap(lambda x: x[0].keys()).distinct().collect()
And select:
remapped.select([col("items")[key] for key in keys] + ["frequency"])
+------------+------------------+---------+
|items[color]|items[productcode]|frequency|
+------------+------------------+---------+
| red| hello| 7|
| blue| hi| 8|
| black| hoi| 7|
+------------+------------------+---------+
Dataframe explode list columns in multiple rows
# This is not part of the solution, just creation of the data sample
# df = spark.sql("select stack(1, array(1, 2, 3, 4, 5, 6) ,array('x1', 'x2', 'x3', 'x4', 'x5', 'x6') ,array('y1', 'y2', 'y3', 'y4', 'y5', 'y6') ,array('v1', 'v2', 'v3', 'v4', 'v5', 'v6')) as (Country_1, Country_2,Country_3,Country_4)")
df.selectExpr('inline(arrays_zip(*))').show()
+---------+---------+---------+---------+
|Country_1|Country_2|Country_3|Country_4|
+---------+---------+---------+---------+
| 1| x1| y1| v1|
| 2| x2| y2| v2|
| 3| x3| y3| v3|
| 4| x4| y4| v4|
| 5| x5| y5| v5|
| 6| x6| y6| v6|
+---------+---------+---------+---------+
Related Topics
Sorting by a Custom List in Pandas
Plotting 3D Polygons in Python-Matplotlib
Getting Only Element from a Single-Element List in Python
Pytz Localize VS Datetime Replace
How to Find Where Python Is Installed on Windows
How to Trigger Function on Value Change
Display Fullscreen Mode on Tkinter
In Django - Model Inheritance - Does It Allow You to Override a Parent Model's Attribute
How to Get Python's Elementtree to Pretty Print to an Xml File
"List Index Out of Range" When Using Sys.Argv[1]
Pandas Dataframe Concat VS Append
A Fast Way to Find the Largest N Elements in an Numpy Array
How to Transpose Dataframe in Pandas Without Index
Setting an Environment Variable in Virtualenv
Calculating Arithmetic Mean (One Type of Average) in Python