How to Flatten a Struct in a Spark Dataframe

How to flatten a struct in a Spark dataframe?

This should work in Spark 1.6 or later:

df.select(df.col("data.*"))

or

df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details"))

Flatten dataframe with nested struct ArrayType using pyspark

Using inline function:

df2 = (df.selectExpr("AUTHOR_ID", "NAME", "inline(Books)")
.selectExpr("*", "inline(Chapters)")
.drop("Chapters")
)

Or explode:

from pyspark.sql import functions as F

df2 = (df.withColumn("Books", F.explode("Books"))
.select("*", "Books.*")
.withColumn("Chapters", F.explode("Chapters"))
.select("*", "Chapters.*")
)

How to flatten array of struct?

Use a combination of explode and the * selector:

import pyspark.sql.functions as F

df_flat = df.withColumn('device_exploded', F.explode('device'))
.select('id', 'device_exploded.*')

df_flat.printSchema()
# root
# |-- id: string (nullable = true)
# |-- device_vendor: string (nullable = true)
# |-- device_name: string (nullable = true)
# |-- device_manufacturer: string (nullable = true)

explode creates a separate record for each element of the array-valued column, repeating the value(s) of the other column(s). The column.* selector turns all fields of the struct-valued column into separate columns.

How to flatten nested struct using PySpark?

While I agree with Phantoms that it is very basic to flatten a df still if you still haven't figured it out you can use below function to flatten your df

def flattenNestedData(nestedDF):
from pyspark.sql.functions import col
from pyspark.sql.types import StructType,ArrayType
try:
##Fetching Complex Datatype Columns from Schema
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(fieldNames)!=0:
fieldName=list(fieldNames.keys())[0]
print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
if type(fieldNames[fieldName]) == StructType:
extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)

elif type(fieldNames[fieldName]) == ArrayType:
nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))

fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

return nestedDF


except Exception as err:
raise Exception("Error Occured at while flattening the dataframe : " + str(err))

You can remove Arraytype check if you don't want to explode those

Pyspark - How do I Flatten Nested Struct Column perserving parent name

I'm afraid it's not straightforward as it should. You'll need to loop through the schema to get and build your desired column names, then rename columns in a bulk. Something like this

Sample dataset
df = spark.createDataFrame([
((1, 2, 3),),
((4, 5, 6),),
], 'structA struct<a:int, b:int, c:int>')
df.show()
df.printSchema()

+---------+
| structA|
+---------+
|{1, 2, 3}|
|{4, 5, 6}|
+---------+

root
|-- structA: struct (nullable = true)
| |-- a: integer (nullable = true)
| |-- b: integer (nullable = true)
| |-- c: integer (nullable = true)
from pyspark.sql import functions as F

struct_col = 'structA'
struct_cols = [[F.col(b.name).alias(f'{a.name}_{b.name}') for b in a.dataType.fields] for a in df.schema if a.name == struct_col][0]
# [Column<'a AS structA_a'>, Column<'b AS structA_b'>, Column<'c AS structA_c'>]

df.select(f'{struct_col}.*').select(struct_cols).show()
+---------+---------+---------+
|structA_a|structA_b|structA_c|
+---------+---------+---------+
| 1| 2| 3|
| 4| 5| 6|
+---------+---------+---------+

Automatically and Elegantly flatten DataFrame in Spark SQL

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].

Something like:

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.col

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)

f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}

You would then use it like this:

df.select(flattenSchema(df.schema):_*)

Flatten nested array in Spark DataFrame

You can use transform:

df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")


Related Topics



Leave a reply



Submit