How to flatten a struct in a Spark dataframe?
This should work in Spark 1.6 or later:
df.select(df.col("data.*"))
or
df.select(df.col("data.id"), df.col("data.keyNote"), df.col("data.details"))
Flatten dataframe with nested struct ArrayType using pyspark
Using inline
function:
df2 = (df.selectExpr("AUTHOR_ID", "NAME", "inline(Books)")
.selectExpr("*", "inline(Chapters)")
.drop("Chapters")
)
Or explode
:
from pyspark.sql import functions as F
df2 = (df.withColumn("Books", F.explode("Books"))
.select("*", "Books.*")
.withColumn("Chapters", F.explode("Chapters"))
.select("*", "Chapters.*")
)
How to flatten array of struct?
Use a combination of explode
and the *
selector:
import pyspark.sql.functions as F
df_flat = df.withColumn('device_exploded', F.explode('device'))
.select('id', 'device_exploded.*')
df_flat.printSchema()
# root
# |-- id: string (nullable = true)
# |-- device_vendor: string (nullable = true)
# |-- device_name: string (nullable = true)
# |-- device_manufacturer: string (nullable = true)
explode
creates a separate record for each element of the array-valued column, repeating the value(s) of the other column(s). The column.*
selector turns all fields of the struct-valued column into separate columns.
How to flatten nested struct using PySpark?
While I agree with Phantoms that it is very basic to flatten a df still if you still haven't figured it out you can use below function to flatten your df
def flattenNestedData(nestedDF):
from pyspark.sql.functions import col
from pyspark.sql.types import StructType,ArrayType
try:
##Fetching Complex Datatype Columns from Schema
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(fieldNames)!=0:
fieldName=list(fieldNames.keys())[0]
print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
if type(fieldNames[fieldName]) == StructType:
extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)
elif type(fieldNames[fieldName]) == ArrayType:
nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return nestedDF
except Exception as err:
raise Exception("Error Occured at while flattening the dataframe : " + str(err))
You can remove Arraytype check if you don't want to explode those
Pyspark - How do I Flatten Nested Struct Column perserving parent name
I'm afraid it's not straightforward as it should. You'll need to loop through the schema to get and build your desired column names, then rename columns in a bulk. Something like this
Sample dataset
df = spark.createDataFrame([
((1, 2, 3),),
((4, 5, 6),),
], 'structA struct<a:int, b:int, c:int>')
df.show()
df.printSchema()
+---------+
| structA|
+---------+
|{1, 2, 3}|
|{4, 5, 6}|
+---------+
root
|-- structA: struct (nullable = true)
| |-- a: integer (nullable = true)
| |-- b: integer (nullable = true)
| |-- c: integer (nullable = true)
from pyspark.sql import functions as F
struct_col = 'structA'
struct_cols = [[F.col(b.name).alias(f'{a.name}_{b.name}') for b in a.dataType.fields] for a in df.schema if a.name == struct_col][0]
# [Column<'a AS structA_a'>, Column<'b AS structA_b'>, Column<'c AS structA_c'>]
df.select(f'{struct_col}.*').select(struct_cols).show()
+---------+---------+---------+
|structA_a|structA_b|structA_c|
+---------+---------+---------+
| 1| 2| 3|
| 4| 5| 6|
+---------+---------+---------+
Automatically and Elegantly flatten DataFrame in Spark SQL
The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...)
statement by walking through the DataFrame.schema
.
The recursive function should return an Array[Column]
. Every time the function hits a StructType
, it would call itself and append the returned Array[Column]
to its own Array[Column]
.
Something like:
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.col
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}
You would then use it like this:
df.select(flattenSchema(df.schema):_*)
Flatten nested array in Spark DataFrame
You can use transform
:
df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
Related Topics
How to Mark Jtable Cell Input as Invalid
Java Simpledateformat Always Returning January for Month
Recommended Way to Get Hostname in Java
Why Is the String Class Declared Final in Java
If/Else Statements in Antlr Using Listeners
Spring Boot + JPA:Column Name Annotation Ignored
Java 8: Lambda-Streams, Filter by Method with Exception
How to Hash Some String with Sha256 in Java
Parsing a String to Date Format in Java Defaults Date to 1 and Month to January
Is There a Java API That Can Create Rich Word Documents
How to Randomize Two Arraylists in the Same Fashion
Different Between Parseint() and Valueof() in Java
Differencebetween Compare() and Compareto()
How to Get the Current Screen Orientation
Gmail Rest API:400 Bad Request + Failed Precondition