Duplicate Columns in Spark Dataframe

duplicate a column in pyspark data frame

Just

df.withColumn("Rate2", df["Rate"])

or (in SQL)

SELECT *, Rate AS Rate2 FROM df

How to merge duplicate columns in pyspark?

Edited to answer OP request to coalesce from list,

Here's a reproducible example

    import pyspark.sql.functions as F

df = spark.createDataFrame([
("z","a", None, None),
("b",None,"c", None),
("c","b", None, None),
("d",None, None, "z"),
], ["a","c", "c","c"])

df.show()

#fix duplicated column names
old_col=df.schema.names
running_list=[]
new_col=[]
i=0
for column in old_col:
if(column in running_list):
new_col.append(column+"_"+str(i))
i=i+1
else:
new_col.append(column)
running_list.append(column)
print(new_col)

df1 = df.toDF(*new_col)

#coalesce columns to get one column from a list

a=['c','c_0','c_1']
to_drop=['c_0','c_1']
b=[]
[b.append(df1[col]) for col in a]

#coalesce columns to get one column
df_merged=df1.withColumn('c',F.coalesce(*b)).drop(*to_drop)

df_merged.show()

Output:

+---+----+----+----+
| a| c| c| c|
+---+----+----+----+
| z| a|null|null|
| b|null| c|null|
| c| b|null|null|
| d|null|null| z|
+---+----+----+----+

['a', 'c', 'c_0', 'c_1']

+---+---+
| a| c|
+---+---+
| z| a|
| b| c|
| c| b|
| d| z|
+---+---+

How to find the symmetrical duplicate columns(2 columns) using spark dataframe in scala?

You can call dropDuplicates on a sorted array column:

val df2 = df.withColumn(
"arr",
sort_array(array(col("col1"), col("col2")))
).dropDuplicates("arr").drop("arr")

df2.show
+----+----+
|col1|col2|
+----+----+
| 2| 3|
| 1| 2|
| 7| 0|
+----+----+

Rename Duplicate Columns of a Spark DataFrame?

The simplest way I found to do this is:

val data = Seq((1,2),(3,4)).toDF("a","a")
val deduped = data.toDF("a","a_2")
deduped.show

+---+---+
| a|a_2|
+---+---+
| 1| 2|
| 3| 4|
+---+---+

For a more general solution:

val data = Seq(
(1,2,3,4,5,6,7,8),
(9,0,1,2,3,4,5,6)
).toDF("a","b","c","a","d","b","e","b")
data.show

+---+---+---+---+---+---+---+---+
| a| b| c| a| d| b| e| b|
+---+---+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6| 7| 8|
| 9| 0| 1| 2| 3| 4| 5| 6|
+---+---+---+---+---+---+---+---+

import scala.annotation.tailrec

def dedupeColumnNames(df: DataFrame): DataFrame = {

@tailrec
def dedupe(fixed_columns: List[String], columns: List[String]): List[String] = {
if (columns.isEmpty) fixed_columns
else {
val count = columns.groupBy(identity).mapValues(_.size)(columns.head)
if (count == 1) dedupe(columns.head :: fixed_columns, columns.tail)
else dedupe(s"${columns.head}_${count}":: fixed_columns, columns.tail)
}
}

val new_columns = dedupe(List.empty[String], df.columns.reverse.toList).toArray
df.toDF(new_columns:_*)
}

data
.transform(dedupeColumnNames)
.show

+---+---+---+---+---+---+---+---+
| a| b| c|a_2| d|b_2| e|b_3|
+---+---+---+---+---+---+---+---+
| 1| 2| 3| 4| 5| 6| 7| 8|
| 9| 0| 1| 2| 3| 4| 5| 6|
+---+---+---+---+---+---+---+---+

Duplicate columns in Spark Dataframe

The best way would be to change the column name upstream ;)

However, it seems that is not possible, so there are a couple of options:

  1. If the case of the columns are different("email" vs "Email") you can turn on case sensitivity:

         sql(sqlContext, "set spark.sql.caseSensitive=true")
  2. If the column names are exactly the same, you will need to manually specify the schema and skip the first row to avoid the headers:

    customSchema <- structType(
    structField("year", "integer"),
    structField("make", "string"),
    structField("model", "string"),
    structField("comment", "string"),
    structField("blank", "string"))

    df <- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", header="true", schema = customSchema)

pyspark duplicate a column on pyspark data frame

It looks like you might have an extra space in the column name, Instead of age you have age

Please check the schema and use it as below

df = df.withColumn('age2', F.col(' age'))
df.show()

Rather, please check ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace too skip the leading and trailing spaces.

Duplicate column in json file throw error when creating PySpark dataframe Databricks after upgrading runtime 7.3LTS(Spark3.0.1) to 9.1LTS(Spark3.1.2)

There have been different good suggestions which may be helpful from case-to-case.

As pointed out by @ScootCork, defining schema beforehand helps as Spark does not have to create schema on its own. However, my file was quite huge and heavily nested because of which defining schema manually would have been cumbersome.

Finally I did use schema but found a workaround so that I did not have to create it manually.
Even with duplicate columns, I was able to create the dataframe in 7.3 LTS runtime as stated in orginal question. Hence I read one file on this runtime and wrote it to ADLS Gen2 (you can store it anywhere). This is a one-time activity and now you can read this file back every time you run your code (multiline does not need to be true while reading this back), get its schema using .schema, and use this schema to read new json files. Since spark does not have to infer schema on its own, it does not throw error for duplicate columns. Note that the duplicate column still exists and you WILL get ambiguous error if you try to use it. However, this method is quite useful if manually defining schema is not very practical due to shear size and complex json structure and if the duplicated columns are of no use. Described below:-

One time activity on 7.3 LTS runtime

# Few columns were coming as duplicate in raw file. e.g.: languages[0].groupingsets[0].element.attributes.tags[0] was repeated twice.
# This caused errror while creating dataframe.
# However, we are able to read it in Databricks Runtime 7.3 LTS. Hence used this runtime to read a file and write it to ADLS as ONE-TIME activity.
# For all further runs, this file can be read using multiline as false, then use its schema while reading the other new files (which in this case needs multiline as true). In this way spark does not have to create schema on its own hence does not throw error eben in higher runtime versions.
# Have used a historical file initially delivered which had a lot of records due to historical data. This ensures we cover all possibilities.
# Can be created again using 7.3 LTS runtime cluster if this schema is deleted.

dfOldRuntime = spark.read.option("multiline","true").json(pathOneFile) # Can take any file to creat sample schema.
dfOldRuntime.coalesce(1).write.mode('overwrite').format('json').save(pathSchema)

Now use this written file for all future runs even on higher runtimes.

# Read sample which was created using 7.3 LTS runtime.
# The multiline does NOT have to be true for this.
# Get its schema and use it to read new files even on higher runtime without error which was caused due to duplicate columns.
dfSchema = spark.read.json(pathSchema)
schema = dfSchema.schema

# Read new json files using this schema by using `.schema()`. Works on higher runtimes as well since spark now does not have to create schema on its own.
intermediate_df = spark.read.option("multiline","true").schema(schema).json(f"{json_path}")

How to resolve duplicate column names while joining two dataframes in PySpark?

There is no shortcut here. Pyspark expects the left and right dataframes to have distinct sets of field names (with the exception of the join key).

One solution would be to prefix each field name with either a "left_" or "right_" as follows:

# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns

# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])

# Perform join
df3 = df.alias('l').join(df2.alias('r'), on='c_0')


Related Topics



Leave a reply



Submit