Explode (Transpose) Multiple Columns in Spark SQL Table

Explode (transpose?) multiple columns in Spark SQL table

Spark >= 2.4

You can skip zip udf and use arrays_zip function:

df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars.varA", $"vars.varB").show

Spark < 2.4

What you want is not possible without a custom UDF. In Scala you could do something like this:

val data = sc.parallelize(Seq(
"""{"userId": 1, "someString": "example1",
"varA": [0, 2, 5], "varB": [1, 2, 9]}""",
"""{"userId": 2, "someString": "example2",
"varA": [1, 20, 5], "varB": [9, null, 6]}"""
))

val df = spark.read.json(data)

df.printSchema
// root
// |-- someString: string (nullable = true)
// |-- userId: long (nullable = true)
// |-- varA: array (nullable = true)
// | |-- element: long (containsNull = true)
// |-- varB: array (nullable = true)
// | |-- element: long (containsNull = true)

Now we can define zip udf:

import org.apache.spark.sql.functions.{udf, explode}

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show

// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// | 1| example1| 0| 1|
// | 1| example1| 2| 2|
// | 1| example1| 5| 9|
// | 2| example2| 1| 9|
// | 2| example2| 20|null|
// | 2| example2| 5| 6|
// +------+----------+----+----+

With raw SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")

sqlContext.sql(
"""SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")

Explode multiple columns in Spark SQL table

The approach with the zip udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:

def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}

val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)

Spark: explode multiple columns into one

You could wrap the two arrays into one and flatten the nested array before exploding it, as shown below:

val df = Seq(
(1, Seq(0, 2, 5), Seq(1, 2, 9)),
(2, Seq(1, 3, 4), Seq(2, 3, 8))
).toDF("userId", "varA", "varB")

df.
select($"userId", explode(flatten(array($"varA", $"varB"))).as("bothVars")).
show
// +------+--------+
// |userId|bothVars|
// +------+--------+
// | 1| 0|
// | 1| 2|
// | 1| 5|
// | 1| 1|
// | 1| 2|
// | 1| 9|
// | 2| 1|
// | 2| 3|
// | 2| 4|
// | 2| 2|
// | 2| 3|
// | 2| 8|
// +------+--------+

Note that flatten is available on Spark 2.4+.

Spark: How to transpose and explode columns with dynamic nested arrays

stack requires that all stacked columns have the same type. The problem here is that the structs inside of the arrays have different members. One approach would be to add the missing members to all structs so that the approach of my previous answer works again.

cols = ['a', 'b', 'c']

#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields))
for i,c in enumerate(df.columns) if c in cols}

#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))

#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" +
",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields])
+ f")) as {c}" for c in cols]

#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)

full_struct_df has now the schema

root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)

From here the logic works as before:

stack_expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"

transpose_df = full_struct_df.selectExpr("id", stack_expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')

The first part of this answer requires that

  • each column mentioned in cols is an array of structs
  • all members of all structs are longs. The reason for this restriction is the cast(null as long) when creating the transform expression.

Spark: Transpose Rows to Columns with Multiple Fields

This might not be the most efficient/clean approach but it works when you provide aliases.

//Source data
val inputDF = Seq(
("100","A", 10, 200),
("100","B", 20, 300),
("101","A", 30, 100)
).toDF("ID", "Type", "Value", "Allocation")

val valueColumn = inputDF.columns.tail(1)
val allocationColumn = inputDF.columns.tail(2)

import org.apache.spark.sql.functions._
val outputDF = inputDF.groupBy("ID").pivot("Type").agg(first(s"$valueColumn").as(s"$valueColumn"), first(s"$allocationColumn").as(s"$allocationColumn"))
display(outputDF)

You can see the output as below :

Sample Image

I could not identify a way to make it more generic. Also it is prefixing the column name with the type value but that would kind of help and should work if you just want to distinguish the columns based on their value.

See if it helps or someone could come up with a more generic/dynamic approach based on this answer.

Independently explode multiple columns in Spark

You can always generate the select programmatically

val df = Seq(
(1, "example1", Seq(0,2,5), Seq(Some(1),Some(2),Some(9)), true),
(2, "example2", Seq(1,20,5), Seq(Some(9),Option.empty[Int],Some(6)), false)
).toDF("userId", "someString", "varA", "varB", "someBool")

val arrayColumns = df.schema.fields.collect {
case StructField(name, ArrayType(_, _), _, _) => name
}

val dfs = arrayColumns.map { expname =>
val columns = df.schema.fields.map {
case StructField(name, ArrayType(_, _), _, _) if expname == name => explode(df.col(name)) as name
case StructField(name, ArrayType(_, _), _, _) => lit(null) as name
case StructField(name, _, _, _) => df.col(name)
}
df.select(columns:_*)
}

dfs.reduce(_ union _).show()
+------+----------+----+----+--------+
|userId|someString|varA|varB|someBool|
+------+----------+----+----+--------+
| 1| example1| 0|null| true|
| 1| example1| 2|null| true|
| 1| example1| 5|null| true|
| 2| example2| 1|null| false|
| 2| example2| 20|null| false|
| 2| example2| 5|null| false|
| 1| example1|null| 1| true|
| 1| example1|null| 2| true|
| 1| example1|null| 9| true|
| 2| example2|null| 9| false|
| 2| example2|null|null| false|
| 2| example2|null| 6| false|
+------+----------+----+----+--------+

How to transpose data in pyspark for multiple different columns

A cleaned PySpark version of this

from pyspark.sql import functions as F
df_a = spark.createDataFrame([(1,'xyz','MS','abc','Phd','pqr','BS'),(2,"POR","MS","ABC","Phd","","")],[

"id","Education1CollegeName","Education1Degree","Education2CollegeName","Education2Degree","Education3CollegeName","Education3Degree"])

+---+---------------------+----------------+---------------------+----------------+---------------------+----------------+
| id|Education1CollegeName|Education1Degree|Education2CollegeName|Education2Degree|Education3CollegeName|Education3Degree|

+---+---------------------+----------------+---------------------+----------------+---------------------+----------------+
| 1| xyz| MS| abc| Phd| pqr| BS|
| 2| POR| MS| ABC| Phd| | |
+---+---------------------+----------------+---------------------+----------------+---------------------+----------------+

Code -

df = df_a.selectExpr("id", "stack(3, Education1CollegeName, Education1Degree,Education2CollegeName, Education2Degree,Education3CollegeName, Education3Degree) as (B, C)")

+---+---+---+
| id| B| C|
+---+---+---+
| 1|xyz| MS|
| 1|abc|Phd|
| 1|pqr| BS|
| 2|POR| MS|
| 2|ABC|Phd|
| 2| | |
+---+---+---+


Related Topics



Leave a reply



Submit