How to Perform Union on Two Dataframes With Different Amounts of Columns in Spark

How to perform union on two DataFrames with different amounts of columns in spark?

In Scala you just have to append all missing columns as nulls.

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+

Update

Both temporal DataFrames will have the same order of columns, because we are mapping through total in both cases.

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+

Merge two spark dataframes with different columns to get all columns

union : this function resolves columns by position (not by name)

That is the reason why you believed "The values are being swapped and one column from second dataframe is missing."

You should use unionByName, but this functions requires both dataframe to have the same structure.

I offer you this simple code to harmonize the structure of your dataframes and then do the union(ByName).

from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def add_missing_columns(df: DataFrame, ref_df: DataFrame) -> DataFrame:
"""Add missing columns from ref_df to df

Args:
df (DataFrame): dataframe with missing columns
ref_df (DataFrame): referential dataframe

Returns:
DataFrame: df with additionnal columns from ref_df
"""
for col in ref_df.schema:
if col.name not in df.columns:
df = df.withColumn(col.name, F.lit(None).cast(col.dataType))

return df


df1 = add_missing_columns(df1, df2)
df2 = add_missing_columns(df2, df1)

df_result = df1.unionByName(df2)

Union of two Spark dataframes with different columns

I'd use built-in schema inference for this. It is way more expensive, but much simpler than matching complex structures, with possible conflicts:

spark.read.json(df1.toJSON.union(df2.toJSON))

You can also import all files at the same time, and join with information extracted from header, using input_file_name.

import org.apache.spark.sql.function

val metadata: DataFrame // Just metadata from the header
val data: DataFrame // All files loaded together

metadata.withColumn("file", input_file_name)
.join(data.withColumn("file", input_file_name), Seq("file"))

PySpark: dynamic union of DataFrames with different columns

There are probably plenty of better ways to do it, but maybe the below is useful to anyone in the future.

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder\
.appName("DynamicFrame")\
.getOrCreate()

df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))

dataframes = [df01, df02, df03]

# Create a list of all the column names and sort them
cols = set()
for df in dataframes:
for x in df.columns:
cols.add(x)
cols = sorted(cols)

# Create a dictionary with all the dataframes
dfs = {}
for i, d in enumerate(dataframes):
new_name = 'df' + str(i) # New name for the key, the dataframe is the value
dfs[new_name] = d
# Loop through all column names. Add the missing columns to the dataframe (with value 0)
for x in cols:
if x not in d.columns:
dfs[new_name] = dfs[new_name].withColumn(x, lit(0))
dfs[new_name] = dfs[new_name].select(cols) # Use 'select' to get the columns sorted

# Now put it al together with a loop (union)
result = dfs['df0'] # Take the first dataframe, add the others to it
dfs_to_add = dfs.keys() # List of all the dataframes in the dictionary
dfs_to_add.remove('df0') # Remove the first one, because it is already in the result
for x in dfs_to_add:
result = result.union(dfs[x])
result.show()

Output:

+---+---+---+---+
| C1| C2| C3| C4|
+---+---+---+---+
| 1| 2| 3| 0|
| 9| 5| 6| 0|
| 0| 11| 12| 13|
| 0| 10| 15| 16|
|111| 0| 0|112|
|110| 0| 0|115|
+---+---+---+---+

Merging two data frames with different number of columns with no similar column(s)

If it is the same number of rows, you can create a temporary column for each dataframe, which contains a generated ID and join the two dataframes on this column.
The example has two dataframes with identical values in each column but the column names differ. So the combined result should contain 8 columns with the corresponding values.

test_df = spark.createDataFrame([
(1,"2",5,1),(3,"4",7,8),(10,"11",12,13),
], ("col1","col2","col3","col4"))

test_df2 = spark.createDataFrame([
(1,"2",5,1),(3,"4",7,8),(10,"11",12,13),
], ("col5","col6","col7","col8"))

test_df = test_df.withColumn("id", monotonically_increasing_id())
test_df2 = test_df2.withColumn("id", monotonically_increasing_id())

test_df.join(test_df2, "id", "inner").drop("id").show()

Result:

+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|
+----+----+----+----+----+----+----+----+
| 1| 2| 5| 1| 1| 2| 5| 1|
| 3| 4| 7| 8| 3| 4| 7| 8|
| 10| 11| 12| 13| 10| 11| 12| 13|
+----+----+----+----+----+----+----+----+


Related Topics



Leave a reply



Submit