Rename nested field in spark dataframe
Python
It is not possible to modify a single nested field. You have to recreate a whole structure. In this particular case the simplest solution is to use cast
.
First a bunch of imports:
from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)
and example data:
Record = namedtuple("Record", ["a", "b", "c"])
df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
Let's confirm that the schema is the same as in your case:
df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
You can define a new schema for example as a string:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
or a DataType
:
struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))
df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
Scala
The same techniques can be used in Scala:
case class Record(a: String, b: Long, c: Long)
val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")
val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select($"array_field".cast(strSchema))
or
import org.apache.spark.sql.types._
val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))
df.select($"array_field".cast(structSchema))
Possible improvements:
If you use an expressive data manipulation or JSON processing library it could be easier to dump data types to dict
or JSON string and take it from there for example (Python / toolz
):
from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter
# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)
updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))
df.select(col("array_field").cast(updated_schema)).printSchema()
Rename nested struct columns in a Spark DataFrame
You can create a recursive method to traverse the DataFrame schema for renaming the columns:
import org.apache.spark.sql.types._
def renameAllCols(schema: StructType, rename: String => String): StructType = {
def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
case StructField(name, dtype: StructType, nullable, meta) =>
StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
case StructField(name, dtype, nullable, meta) =>
StructField(rename(name), dtype, nullable, meta)
}
StructType(recurRename(schema))
}
Testing it with the following example:
import org.apache.spark.sql.functions._
import spark.implicits._
val renameFcn = (s: String) =>
s.replace("_", "").replaceAll("([A-Z])", "_$1").toLowerCase.dropWhile(_ == '_')
case class C(A_Bc: Int, D_Ef: Int)
val df = Seq(
(10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
(20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")
val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))
newDF.printSchema
// root
// |-- vkj_lmn_vop: integer (nullable = false)
// |-- ka_tas_lop: string (nullable = true)
// |-- abc_def: struct (nullable = true)
// | |-- a_bc: integer (nullable = false)
// | |-- d_ef: integer (nullable = false)
// |-- arr_struct: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- a_bc: integer (nullable = false)
// | | |-- d_ef: integer (nullable = false)
// |-- arr_int: array (nullable = true)
// | |-- element: integer (containsNull = false)
Rename nested struct columns to all in lower case in a Spark DataFrame using PySpark
I guess this is what you wanted. Hope it helps!
def get_column_wise_schema(df_string_schema, df_columns):
# Returns a dictionary containing column name and corresponding column schema as string.
column_schema_dict = {}
i = 0
while i < len(df_columns):
current_col = df_columns[i]
next_col = df_columns[i + 1] if i < len(df_columns) - 1 else None
current_col_split_key = '[' + current_col + ': ' if i == 0 else ' ' + current_col + ': '
next_col_split_key = ']' if i == len(df_columns) - 1 else ', ' + next_col + ': '
column_schema_dict[current_col] = df_string_schema.split(current_col_split_key)[1].\
split(next_col_split_key)[0]
i += 1
return column_schema_dict
def convert_colnames_to_lower(spark_df):
columns = spark_df.columns
column_wise_schema_dict = get_column_wise_schema(spark_df.__str__(), columns)
col_exprs = []
for column_name in columns:
column_schema_lowercase = column_wise_schema_dict[column_name]
col_exprs.append(spf.col(column_name).cast(column_schema_lowercase).
alias(column_name.lower()))
return spark_df.select(*col_exprs)
ds = {'AbcDef': {'UvwXyz': {'VkjLmnVop': 'abcd'}}, 'HijKS': 'fgds'}
df = spark.read.json(sc.parallelize([ds]))
df.printSchema()
"""
root
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- VkjLmnVop: string (nullable = true)
|-- HijKS: string (nullable = true)
"""
converted_df = convert_colnames_to_lower(df)
converted_df.printSchema()
"""
root
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- vkjlmnvop: string (nullable = true)
|-- hijks: string (nullable = true)
"""
Rename nested column in array with spark DataFrame
Try this-
Load the test data
val data =
"""
|{
| "parentArray": [
| {
| "child 1": 0
| },
| {
| "child 1": 1
| }
| ]
|}
""".stripMargin
val df = spark.read.option("multiLine", true)
.json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +-----------+
* |parentArray|
* +-----------+
* |[[0], [1]] |
* +-----------+
*
* root
* |-- parentArray: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- child 1: long (nullable = true)
*/
change the column name inside array
val p = df.withColumn("parentArray", col("parentArray").cast("array<struct<new_col: long>>"))
p.show(false)
p.printSchema()
/**
* +-----------+
* |parentArray|
* +-----------+
* |[[0], [1]] |
* +-----------+
*
* root
* |-- parentArray: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- new_col: long (nullable = true)
*/
How to rename column inside struct in spark scala
As you ask about renaming insude structs, you can achieve this using Schema DSL:
import org.apache.spark.sql.types._
val schema: StructType = df.schema.fields.find(_.name=="Col3").get.dataType.asInstanceOf[StructType]
val newSchema = StructType.apply(schema.fields.map(sf => StructField.apply("Col3="+sf.name,sf.dataType)))
df
.withColumn("Col3",$"Col3".cast(newSchema))
.printSchema()
gives
root
|-- Col1: string (nullable = true)
|-- Col2: string (nullable = true)
|-- Col3: struct (nullable = false)
| |-- Col3=513: long (nullable = true)
| |-- Col3=549: long (nullable = true)
Then you can unpack it using select($"col3.*")
.
You could also unpack the struct first and then rename all the columns which have an number as column name...
Renaming nested elements in Scala Spark Dataframe
The simplest approach is to use type casting with properly named schema string (or equivalent StructField
definition):
val schema = """struct<
Article: array<struct<Id:string,Timestamp:bigint>>,
Channel: struct<Cultura: bigint, Deportes: array<bigint>>>"""
df.withColumn("_History", $"_History".cast(schema))
You could also model this with case classes:
import org.apache.spark.sql.Row
case class ChannelRecord(Cultura: Option[Long], Deoprtes: Option[Seq[Long]])
val rename = udf((row: Row) =>
ChannelRecord(Option(row.getLong(0)), Option(row.getSeq[Long](1))))
df.withColumn("_History",
struct($"_History.Article", rename($"_History.channel").alias("channel")))
Renaming columns recursively in a nested structure in Spark
Here's a recursive method that revise a DataFrame schema by renaming via replaceAll
any columns whose name consists of a substring to be replaced:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
def renameAllColumns(schema: StructType, from: String, to: String): StructType = {
def recurRename(schema: StructType, from: String, to:String): Seq[StructField] =
schema.fields.map{
case StructField(name, dtype: StructType, nullable, meta) =>
StructField(name.replaceAll(from, to), StructType(recurRename(dtype, from, to)), nullable, meta)
case StructField(name, dtype: ArrayType, nullable, meta) => dtype.elementType match {
case struct: StructType => StructField(name.replaceAll(from, to), ArrayType(StructType(recurRename(struct, from, to)), true), nullable, meta)
case other => StructField(name.replaceAll(from, to), other, nullable, meta)
}
case StructField(name, dtype, nullable, meta) =>
StructField(name.replaceAll(from, to), dtype, nullable, meta)
}
StructType(recurRename(schema, from, to))
}
Testing the method on a sample DataFrame with a nested structure:
case class M(i: Int, `p:q`: String)
case class N(j: Int, m: M)
val df = Seq(
(1, "a", Array(N(7, M(11, "x")), N(72, M(112, "x2")))),
(2, "b", Array(N(8, M(21, "y")))),
(3, "c", Array(N(9, M(31, "z"))))
).toDF("c1", "c2:0", "c3")
df.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2:0: string (nullable = true)
// |-- c3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- j: integer (nullable = false)
// | | |-- m: struct (nullable = true)
// | | | |-- i: integer (nullable = false)
// | | | |-- p:q: string (nullable = true)
val newSchema = renameAllColumns(df.schema, ":", "_")
spark.createDataFrame(df.rdd, newSchema).printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2_0: string (nullable = true)
// |-- c3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- j: integer (nullable = false)
// | | |-- m: struct (nullable = true)
// | | | |-- i: integer (nullable = false)
// | | | |-- p_q: string (nullable = true)
Note that since method replaceAll
supports Regex
pattern, one can apply the method with more versatile replacement condition. For example, here's how to trim off column name starting from the ':' character:
val newSchema = renameAllColumns(df.schema, """:.*""", "")
spark.createDataFrame(df.rdd, newSchema).printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- j: integer (nullable = false)
// | | |-- m: struct (nullable = true)
// | | | |-- i: integer (nullable = false)
// | | | |-- p: string (nullable = true)
Related Topics
Matching Any Character Including Newlines in a Python Regex Subexpression, Not Globally
Access Data in Package Subdirectory
When Should I Be Using Classes in Python
Reference Requirements.Txt for the Install_Requires Kwarg in Setuptools Setup.Py File
How to Create a Slug in Django
Convert Pandas Series to Dataframe
Tab Completion in Python's Raw_Input()
How to Trace the Path in a Breadth-First Search
MySQL "Incorrect String Value" Error When Save Unicode String in Django
Regular Expression: Match Start or Whitespace
Convert Dictionary Entries into Variables
Rect Collision with List of Rects
Create a Main Loop with Tkinter