Spark sql how to explode without losing null values
Spark 2.2+
You can use explode_outer
function:
import org.apache.spark.sql.functions.explode_outer
df.withColumn("likes", explode_outer($"likes")).show
// +---+----+--------+
// | id|name| likes|
// +---+----+--------+
// | 1|Luke|baseball|
// | 1|Luke| soccer|
// | 2|Lucy| null|
// +---+----+--------+
Spark <= 2.1
In Scala but Java equivalent should be almost identical (to import individual functions use import static
).
import org.apache.spark.sql.functions.{array, col, explode, lit, when}
val df = Seq(
(1, "Luke", Some(Array("baseball", "soccer"))),
(2, "Lucy", None)
).toDF("id", "name", "likes")
df.withColumn("likes", explode(
when(col("likes").isNotNull, col("likes"))
// If null explode an array<string> with a single null
.otherwise(array(lit(null).cast("string")))))
The idea here is basically to replace NULL
with an array(NULL)
of a desired type. For complex type (a.k.a structs
) you have to provide full schema:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
val st = StructType(Seq(
StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast(st)))))
or
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Note:
If array Column
has been created with containsNull
set to false
you should change this first (tested with Spark 2.1):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
Spark Explode of Empty Column returns Empty Row
That is the behaviour of the explode api. If you want to get the desired output use explode_outer
df.withColumn("age",explode_outer(col("age")))
Explode map column in Pyspark without losing null values
So I have finally made it. I have replaced empty map with some dummy values and then used explode and drop original column.
replace_empty_map = udf(lambda x: {"key": [0, 1]} if len(x) == 0 else x,
MapType(StringType(),
StructType(
[StructField("first", LongType()), StructField("last", LongType())]
)
)
)
df = df.withColumn("foo_replaced",replace_empty_map(df["foo"])).drop("foo")
df = df.select('*', explode('foo_replaced').alias('foo_key', 'foo_val')).drop("foo_replaced")
Explode in Spark missing records
explode()
will not emit any rows where the array being exploded is null
. Therefore, you should use explode_outer()
instead.
Spark explode in Scala - Add exploded column to the row
You can use stack
function for this particular case.
df.selectExpr('Name', "stack(3, E1, 'E1', E2, 'E2', E3, 'E3')").toDF('Name', 'value', 'EType').show()
df.selectExpr('Name', "stack(3, E1, 'E1', E2, 'E2', E3, 'E3')").toDF('Name', 'value', 'EType').show()
df.selectExpr('Name', "stack(3, E1, 'E1', E2, 'E2', E3, 'E3')").toDF('Name', 'value', 'EType').show()
+----+-----+-----+
|Name|value|EType|
+----+-----+-----+
| abc| 4| E1|
| abc| 5| E2|
| abc| 6| E3|
+----+-----+-----+
Related Topics
Comparing Float/Double Values Using == Operator
Jfreechart Series Tool Tip Above Shape Annotation
Java Recursive Fibonacci Sequence
How to Combine Two Hashmap Objects Containing the Same Types
How to Add a New Line of Text to an Existing File in Java
Execute .Jar File from a Java Program
The Server Time Zone Value 'Aest' Is Unrecognized or Represents More Than One Time Zone
Java.Lang.Classcastexception Using Lambda Expressions in Spark Job on Remote Server
Where Is the Javabean Property Naming Convention Defined
Adding JPAnels from Other Classes to the Cardlayout
Where Should I Put the Log4J.Properties File
Case Insensitive String as Hashmap Key
How to Import Spring-Config.Xml of One Project into Spring-Config.Xml of Another Project
Entry Point for Java Applications: Main(), Init(), or Run()