Rename Nested Field in Spark Dataframe

Rename nested field in spark dataframe

Python

It is not possible to modify a single nested field. You have to recreate a whole structure. In this particular case the simplest solution is to use cast.

First a bunch of imports:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)

and example data:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

Let's confirm that the schema is the same as in your case:

df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

You can define a new schema for example as a string:

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

or a DataType:

struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))

df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

Scala

The same techniques can be used in Scala:

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

or

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

Possible improvements:

If you use an expressive data manipulation or JSON processing library it could be easier to dump data types to dict or JSON string and take it from there for example (Python / toolz):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()

Rename nested struct columns in a Spark DataFrame

You can create a recursive method to traverse the DataFrame schema for renaming the columns:

import org.apache.spark.sql.types._

def renameAllCols(schema: StructType, rename: String => String): StructType = {
def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
case StructField(name, dtype: StructType, nullable, meta) =>
StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
case StructField(name, dtype, nullable, meta) =>
StructField(rename(name), dtype, nullable, meta)
}
StructType(recurRename(schema))
}

Testing it with the following example:

import org.apache.spark.sql.functions._
import spark.implicits._

val renameFcn = (s: String) =>
s.replace("_", "").replaceAll("([A-Z])", "_$1").toLowerCase.dropWhile(_ == '_')

case class C(A_Bc: Int, D_Ef: Int)

val df = Seq(
(10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
(20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")

val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))

newDF.printSchema
// root
// |-- vkj_lmn_vop: integer (nullable = false)
// |-- ka_tas_lop: string (nullable = true)
// |-- abc_def: struct (nullable = true)
// | |-- a_bc: integer (nullable = false)
// | |-- d_ef: integer (nullable = false)
// |-- arr_struct: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- a_bc: integer (nullable = false)
// | | |-- d_ef: integer (nullable = false)
// |-- arr_int: array (nullable = true)
// | |-- element: integer (containsNull = false)

Rename nested struct columns to all in lower case in a Spark DataFrame using PySpark

I guess this is what you wanted. Hope it helps!


def get_column_wise_schema(df_string_schema, df_columns):
# Returns a dictionary containing column name and corresponding column schema as string.
column_schema_dict = {}
i = 0
while i < len(df_columns):
current_col = df_columns[i]
next_col = df_columns[i + 1] if i < len(df_columns) - 1 else None
current_col_split_key = '[' + current_col + ': ' if i == 0 else ' ' + current_col + ': '
next_col_split_key = ']' if i == len(df_columns) - 1 else ', ' + next_col + ': '
column_schema_dict[current_col] = df_string_schema.split(current_col_split_key)[1].\
split(next_col_split_key)[0]
i += 1
return column_schema_dict

def convert_colnames_to_lower(spark_df):
columns = spark_df.columns
column_wise_schema_dict = get_column_wise_schema(spark_df.__str__(), columns)
col_exprs = []
for column_name in columns:
column_schema_lowercase = column_wise_schema_dict[column_name]
col_exprs.append(spf.col(column_name).cast(column_schema_lowercase).
alias(column_name.lower()))
return spark_df.select(*col_exprs)

ds = {'AbcDef': {'UvwXyz': {'VkjLmnVop': 'abcd'}}, 'HijKS': 'fgds'}
df = spark.read.json(sc.parallelize([ds]))
df.printSchema()
"""
root
|-- AbcDef: struct (nullable = true)
| |-- UvwXyz: struct (nullable = true)
| | |-- VkjLmnVop: string (nullable = true)
|-- HijKS: string (nullable = true)
"""
converted_df = convert_colnames_to_lower(df)
converted_df.printSchema()
"""
root
|-- abcdef: struct (nullable = true)
| |-- uvwxyz: struct (nullable = true)
| | |-- vkjlmnvop: string (nullable = true)
|-- hijks: string (nullable = true)
"""

Rename nested column in array with spark DataFrame

Try this-

Load the test data

 val data =
"""
|{
| "parentArray": [
| {
| "child 1": 0
| },
| {
| "child 1": 1
| }
| ]
|}
""".stripMargin
val df = spark.read.option("multiLine", true)
.json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +-----------+
* |parentArray|
* +-----------+
* |[[0], [1]] |
* +-----------+
*
* root
* |-- parentArray: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- child 1: long (nullable = true)
*/

change the column name inside array

    val p = df.withColumn("parentArray", col("parentArray").cast("array<struct<new_col: long>>"))
p.show(false)
p.printSchema()

/**
* +-----------+
* |parentArray|
* +-----------+
* |[[0], [1]] |
* +-----------+
*
* root
* |-- parentArray: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- new_col: long (nullable = true)
*/

How to rename column inside struct in spark scala

As you ask about renaming insude structs, you can achieve this using Schema DSL:

import org.apache.spark.sql.types._

val schema: StructType = df.schema.fields.find(_.name=="Col3").get.dataType.asInstanceOf[StructType]
val newSchema = StructType.apply(schema.fields.map(sf => StructField.apply("Col3="+sf.name,sf.dataType)))

df
.withColumn("Col3",$"Col3".cast(newSchema))
.printSchema()

gives

root
|-- Col1: string (nullable = true)
|-- Col2: string (nullable = true)
|-- Col3: struct (nullable = false)
| |-- Col3=513: long (nullable = true)
| |-- Col3=549: long (nullable = true)

Then you can unpack it using select($"col3.*").

You could also unpack the struct first and then rename all the columns which have an number as column name...

Renaming nested elements in Scala Spark Dataframe

The simplest approach is to use type casting with properly named schema string (or equivalent StructField definition):

val schema = """struct<
Article: array<struct<Id:string,Timestamp:bigint>>,
Channel: struct<Cultura: bigint, Deportes: array<bigint>>>"""
df.withColumn("_History", $"_History".cast(schema))

You could also model this with case classes:

import org.apache.spark.sql.Row

case class ChannelRecord(Cultura: Option[Long], Deoprtes: Option[Seq[Long]])

val rename = udf((row: Row) =>
ChannelRecord(Option(row.getLong(0)), Option(row.getSeq[Long](1))))

df.withColumn("_History",
struct($"_History.Article", rename($"_History.channel").alias("channel")))

Renaming columns recursively in a nested structure in Spark

Here's a recursive method that revise a DataFrame schema by renaming via replaceAll any columns whose name consists of a substring to be replaced:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

def renameAllColumns(schema: StructType, from: String, to: String): StructType = {
def recurRename(schema: StructType, from: String, to:String): Seq[StructField] =
schema.fields.map{
case StructField(name, dtype: StructType, nullable, meta) =>
StructField(name.replaceAll(from, to), StructType(recurRename(dtype, from, to)), nullable, meta)
case StructField(name, dtype: ArrayType, nullable, meta) => dtype.elementType match {
case struct: StructType => StructField(name.replaceAll(from, to), ArrayType(StructType(recurRename(struct, from, to)), true), nullable, meta)
case other => StructField(name.replaceAll(from, to), other, nullable, meta)
}
case StructField(name, dtype, nullable, meta) =>
StructField(name.replaceAll(from, to), dtype, nullable, meta)
}

StructType(recurRename(schema, from, to))
}

Testing the method on a sample DataFrame with a nested structure:

case class M(i: Int, `p:q`: String)
case class N(j: Int, m: M)

val df = Seq(
(1, "a", Array(N(7, M(11, "x")), N(72, M(112, "x2")))),
(2, "b", Array(N(8, M(21, "y")))),
(3, "c", Array(N(9, M(31, "z"))))
).toDF("c1", "c2:0", "c3")

df.printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2:0: string (nullable = true)
// |-- c3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- j: integer (nullable = false)
// | | |-- m: struct (nullable = true)
// | | | |-- i: integer (nullable = false)
// | | | |-- p:q: string (nullable = true)

val newSchema = renameAllColumns(df.schema, ":", "_")

spark.createDataFrame(df.rdd, newSchema).printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2_0: string (nullable = true)
// |-- c3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- j: integer (nullable = false)
// | | |-- m: struct (nullable = true)
// | | | |-- i: integer (nullable = false)
// | | | |-- p_q: string (nullable = true)

Note that since method replaceAll supports Regex pattern, one can apply the method with more versatile replacement condition. For example, here's how to trim off column name starting from the ':' character:

val newSchema = renameAllColumns(df.schema, """:.*""", "")

spark.createDataFrame(df.rdd, newSchema).printSchema
// root
// |-- c1: integer (nullable = false)
// |-- c2: string (nullable = true)
// |-- c3: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- j: integer (nullable = false)
// | | |-- m: struct (nullable = true)
// | | | |-- i: integer (nullable = false)
// | | | |-- p: string (nullable = true)


Related Topics



Leave a reply



Submit