Explode (transpose?) multiple columns in Spark SQL table
Spark >= 2.4
You can skip zip
udf
and use arrays_zip
function:
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars.varA", $"vars.varB").show
Spark < 2.4
What you want is not possible without a custom UDF. In Scala you could do something like this:
val data = sc.parallelize(Seq(
"""{"userId": 1, "someString": "example1",
"varA": [0, 2, 5], "varB": [1, 2, 9]}""",
"""{"userId": 2, "someString": "example2",
"varA": [1, 20, 5], "varB": [9, null, 6]}"""
))
val df = spark.read.json(data)
df.printSchema
// root
// |-- someString: string (nullable = true)
// |-- userId: long (nullable = true)
// |-- varA: array (nullable = true)
// | |-- element: long (containsNull = true)
// |-- varB: array (nullable = true)
// | |-- element: long (containsNull = true)
Now we can define zip
udf:
import org.apache.spark.sql.functions.{udf, explode}
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// | 1| example1| 0| 1|
// | 1| example1| 2| 2|
// | 1| example1| 5| 9|
// | 2| example2| 1| 9|
// | 2| example2| 20|null|
// | 2| example2| 5| 6|
// +------+----------+----+----+
With raw SQL:
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
Explode multiple columns in Spark SQL table
The approach with the zip
udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:
def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}
val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)
Spark: explode multiple columns into one
You could wrap the two arrays into one and flatten
the nested array before exploding it, as shown below:
val df = Seq(
(1, Seq(0, 2, 5), Seq(1, 2, 9)),
(2, Seq(1, 3, 4), Seq(2, 3, 8))
).toDF("userId", "varA", "varB")
df.
select($"userId", explode(flatten(array($"varA", $"varB"))).as("bothVars")).
show
// +------+--------+
// |userId|bothVars|
// +------+--------+
// | 1| 0|
// | 1| 2|
// | 1| 5|
// | 1| 1|
// | 1| 2|
// | 1| 9|
// | 2| 1|
// | 2| 3|
// | 2| 4|
// | 2| 2|
// | 2| 3|
// | 2| 8|
// +------+--------+
Note that flatten
is available on Spark 2.4
+.
Spark: How to transpose and explode columns with dynamic nested arrays
stack
requires that all stacked columns have the same type. The problem here is that the structs inside of the arrays have different members. One approach would be to add the missing members to all structs so that the approach of my previous answer works again.
cols = ['a', 'b', 'c']
#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields))
for i,c in enumerate(df.columns) if c in cols}
#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))
#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" +
",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields])
+ f")) as {c}" for c in cols]
#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)
full_struct_df
has now the schema
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true)
| | |-- date: long (nullable = true)
From here the logic works as before:
stack_expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
transpose_df = full_struct_df.selectExpr("id", stack_expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
The first part of this answer requires that
- each column mentioned in
cols
is an array of structs - all members of all structs are
long
s. The reason for this restriction is thecast(null as long)
when creating the transform expression.
Spark: Transpose Rows to Columns with Multiple Fields
This might not be the most efficient/clean approach but it works when you provide aliases.
//Source data
val inputDF = Seq(
("100","A", 10, 200),
("100","B", 20, 300),
("101","A", 30, 100)
).toDF("ID", "Type", "Value", "Allocation")
val valueColumn = inputDF.columns.tail(1)
val allocationColumn = inputDF.columns.tail(2)
import org.apache.spark.sql.functions._
val outputDF = inputDF.groupBy("ID").pivot("Type").agg(first(s"$valueColumn").as(s"$valueColumn"), first(s"$allocationColumn").as(s"$allocationColumn"))
display(outputDF)
You can see the output as below :
I could not identify a way to make it more generic. Also it is prefixing the column name with the type value but that would kind of help and should work if you just want to distinguish the columns based on their value.
See if it helps or someone could come up with a more generic/dynamic approach based on this answer.
Independently explode multiple columns in Spark
You can always generate the select programmatically
val df = Seq(
(1, "example1", Seq(0,2,5), Seq(Some(1),Some(2),Some(9)), true),
(2, "example2", Seq(1,20,5), Seq(Some(9),Option.empty[Int],Some(6)), false)
).toDF("userId", "someString", "varA", "varB", "someBool")
val arrayColumns = df.schema.fields.collect {
case StructField(name, ArrayType(_, _), _, _) => name
}
val dfs = arrayColumns.map { expname =>
val columns = df.schema.fields.map {
case StructField(name, ArrayType(_, _), _, _) if expname == name => explode(df.col(name)) as name
case StructField(name, ArrayType(_, _), _, _) => lit(null) as name
case StructField(name, _, _, _) => df.col(name)
}
df.select(columns:_*)
}
dfs.reduce(_ union _).show()
+------+----------+----+----+--------+
|userId|someString|varA|varB|someBool|
+------+----------+----+----+--------+
| 1| example1| 0|null| true|
| 1| example1| 2|null| true|
| 1| example1| 5|null| true|
| 2| example2| 1|null| false|
| 2| example2| 20|null| false|
| 2| example2| 5|null| false|
| 1| example1|null| 1| true|
| 1| example1|null| 2| true|
| 1| example1|null| 9| true|
| 2| example2|null| 9| false|
| 2| example2|null|null| false|
| 2| example2|null| 6| false|
+------+----------+----+----+--------+
How to transpose data in pyspark for multiple different columns
A cleaned PySpark version of this
from pyspark.sql import functions as F
df_a = spark.createDataFrame([(1,'xyz','MS','abc','Phd','pqr','BS'),(2,"POR","MS","ABC","Phd","","")],[
"id","Education1CollegeName","Education1Degree","Education2CollegeName","Education2Degree","Education3CollegeName","Education3Degree"])
+---+---------------------+----------------+---------------------+----------------+---------------------+----------------+
| id|Education1CollegeName|Education1Degree|Education2CollegeName|Education2Degree|Education3CollegeName|Education3Degree|
+---+---------------------+----------------+---------------------+----------------+---------------------+----------------+
| 1| xyz| MS| abc| Phd| pqr| BS|
| 2| POR| MS| ABC| Phd| | |
+---+---------------------+----------------+---------------------+----------------+---------------------+----------------+
Code -
df = df_a.selectExpr("id", "stack(3, Education1CollegeName, Education1Degree,Education2CollegeName, Education2Degree,Education3CollegeName, Education3Degree) as (B, C)")
+---+---+---+
| id| B| C|
+---+---+---+
| 1|xyz| MS|
| 1|abc|Phd|
| 1|pqr| BS|
| 2|POR| MS|
| 2|ABC|Phd|
| 2| | |
+---+---+---+
Related Topics
Does Anyone Use Right Outer Joins
Postgresql Sequence Based on Another Column
Including Null Values in an Apache Spark Join
SQL Server Convert String to Datetime
Select for Update with SQL Server
SQL Server Select Distinct Rows Using Most Recent Value Only
SQL to Find the Number of Distinct Values in a Column
Count Number of Consecutive Occurrence of Values in Table
SQL Server: Drop Table Cascade Equivalent
Change Postgresql Columns Used in Views
Combine Two Columns and Add into One New Column
Disable Rails SQL Logging in Console
How to Combine Multiple Rows into a Comma-Delimited List in SQL Server 2005
Selecting Data into a Postgres Array
In SQL/Mysql, Differencebetween "On" and "Where" in a Join Statement