How to perform union on two DataFrames with different amounts of columns in spark?
In Scala you just have to append all missing columns as nulls
.
import org.apache.spark.sql.functions._
// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")
val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
Update
Both temporal DataFrames
will have the same order of columns, because we are mapping through total
in both cases.
df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()
+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
Merge two spark dataframes with different columns to get all columns
union
: this function resolves columns by position (not by name)
That is the reason why you believed "The values are being swapped and one column from second dataframe is missing."
You should use unionByName
, but this functions requires both dataframe to have the same structure.
I offer you this simple code to harmonize the structure of your dataframes and then do the union(ByName).
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
def add_missing_columns(df: DataFrame, ref_df: DataFrame) -> DataFrame:
"""Add missing columns from ref_df to df
Args:
df (DataFrame): dataframe with missing columns
ref_df (DataFrame): referential dataframe
Returns:
DataFrame: df with additionnal columns from ref_df
"""
for col in ref_df.schema:
if col.name not in df.columns:
df = df.withColumn(col.name, F.lit(None).cast(col.dataType))
return df
df1 = add_missing_columns(df1, df2)
df2 = add_missing_columns(df2, df1)
df_result = df1.unionByName(df2)
Union of two Spark dataframes with different columns
I'd use built-in schema inference for this. It is way more expensive, but much simpler than matching complex structures, with possible conflicts:
spark.read.json(df1.toJSON.union(df2.toJSON))
You can also import all files at the same time, and join
with information extracted from header, using input_file_name
.
import org.apache.spark.sql.function
val metadata: DataFrame // Just metadata from the header
val data: DataFrame // All files loaded together
metadata.withColumn("file", input_file_name)
.join(data.withColumn("file", input_file_name), Seq("file"))
PySpark: dynamic union of DataFrames with different columns
There are probably plenty of better ways to do it, but maybe the below is useful to anyone in the future.
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
spark = SparkSession.builder\
.appName("DynamicFrame")\
.getOrCreate()
df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))
dataframes = [df01, df02, df03]
# Create a list of all the column names and sort them
cols = set()
for df in dataframes:
for x in df.columns:
cols.add(x)
cols = sorted(cols)
# Create a dictionary with all the dataframes
dfs = {}
for i, d in enumerate(dataframes):
new_name = 'df' + str(i) # New name for the key, the dataframe is the value
dfs[new_name] = d
# Loop through all column names. Add the missing columns to the dataframe (with value 0)
for x in cols:
if x not in d.columns:
dfs[new_name] = dfs[new_name].withColumn(x, lit(0))
dfs[new_name] = dfs[new_name].select(cols) # Use 'select' to get the columns sorted
# Now put it al together with a loop (union)
result = dfs['df0'] # Take the first dataframe, add the others to it
dfs_to_add = dfs.keys() # List of all the dataframes in the dictionary
dfs_to_add.remove('df0') # Remove the first one, because it is already in the result
for x in dfs_to_add:
result = result.union(dfs[x])
result.show()
Output:
+---+---+---+---+
| C1| C2| C3| C4|
+---+---+---+---+
| 1| 2| 3| 0|
| 9| 5| 6| 0|
| 0| 11| 12| 13|
| 0| 10| 15| 16|
|111| 0| 0|112|
|110| 0| 0|115|
+---+---+---+---+
Merging two data frames with different number of columns with no similar column(s)
If it is the same number of rows, you can create a temporary column for each dataframe, which contains a generated ID and join the two dataframes on this column.
The example has two dataframes with identical values in each column but the column names differ. So the combined result should contain 8 columns with the corresponding values.
test_df = spark.createDataFrame([
(1,"2",5,1),(3,"4",7,8),(10,"11",12,13),
], ("col1","col2","col3","col4"))
test_df2 = spark.createDataFrame([
(1,"2",5,1),(3,"4",7,8),(10,"11",12,13),
], ("col5","col6","col7","col8"))
test_df = test_df.withColumn("id", monotonically_increasing_id())
test_df2 = test_df2.withColumn("id", monotonically_increasing_id())
test_df.join(test_df2, "id", "inner").drop("id").show()
Result:
+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|
+----+----+----+----+----+----+----+----+
| 1| 2| 5| 1| 1| 2| 5| 1|
| 3| 4| 7| 8| 3| 4| 7| 8|
| 10| 11| 12| 13| 10| 11| 12| 13|
+----+----+----+----+----+----+----+----+
Related Topics
Why I Get Key Error Even Though Column Present in Pandas
Easiest Way to Ignore Blank Lines When Reading a File in Python
Counting the No. of Black to White Pixels in the Image Using Opencv
Pandas Filtering for Multiple Substrings in Series
In Python, How to Split a String and Keep the Separators
How to Append a List Withoud Adding the Quote
Python: How to Find the First Day of Every Month Between Two Date Ranges
How to Execute Two Commands in Terminal Using Python'S Subprocess Module
Combine Date and Time Columns Using Python Pandas
Pandas: How to Assign Values Based on Multiple Conditions for Existing Columns
Python Pandas Dataframe Get All Combinations of Column Values
How to Read Numbers from File in Python
Loading and Parsing a Json File With Multiple Json Objects
Programme to Print Mulitples of 5 in a Range Specified by User
How Does \R (Carriage Return) Work in Python
How to Set Automatically the Width of a Column in Xlsxwriter