Spark SQL How to Explode Without Losing Null Values

Spark sql how to explode without losing null values

Spark 2.2+

You can use explode_outer function:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name| likes|
// +---+----+--------+
// | 1|Luke|baseball|
// | 1|Luke| soccer|
// | 2|Lucy| null|
// +---+----+--------+

Spark <= 2.1

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
(1, "Luke", Some(Array("baseball", "soccer"))),
(2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
when(col("likes").isNotNull, col("likes"))
// If null explode an array<string> with a single null
.otherwise(array(lit(null).cast("string")))))

The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st = StructType(Seq(
StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast(st)))))

or

dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

Note:

If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))

Spark Explode of Empty Column returns Empty Row

That is the behaviour of the explode api. If you want to get the desired output use explode_outer

df.withColumn("age",explode_outer(col("age")))

Explode map column in Pyspark without losing null values

So I have finally made it. I have replaced empty map with some dummy values and then used explode and drop original column.

replace_empty_map = udf(lambda x: {"key": [0, 1]} if len(x) == 0 else x, 
MapType(StringType(),
StructType(
[StructField("first", LongType()), StructField("last", LongType())]
)
)
)

df = df.withColumn("foo_replaced",replace_empty_map(df["foo"])).drop("foo")
df = df.select('*', explode('foo_replaced').alias('foo_key', 'foo_val')).drop("foo_replaced")

Explode in Spark missing records

explode() will not emit any rows where the array being exploded is null. Therefore, you should use explode_outer() instead.

Spark explode in Scala - Add exploded column to the row

You can use stack function for this particular case.

df.selectExpr('Name', "stack(3, E1, 'E1', E2, 'E2', E3, 'E3')").toDF('Name', 'value', 'EType').show()

df.selectExpr('Name', "stack(3, E1, 'E1', E2, 'E2', E3, 'E3')").toDF('Name', 'value', 'EType').show()
df.selectExpr('Name', "stack(3, E1, 'E1', E2, 'E2', E3, 'E3')").toDF('Name', 'value', 'EType').show()
+----+-----+-----+
|Name|value|EType|
+----+-----+-----+
| abc| 4| E1|
| abc| 5| E2|
| abc| 6| E3|
+----+-----+-----+


Related Topics



Leave a reply



Submit