Compare two dataframes Pyspark
Assuming that we can use id to join these two datasets I don't think that there is a need for UDF. This could be solved just by using inner join, array and array_remove functions among others.
First let's create the two datasets:
df1 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "UK"],
[3, "GHI", 3000, "JPN"],
[4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])
df2 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "CAN"],
[3, "GHI", 3500, "JPN"],
[4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])
First we do an inner join between the two datasets then we generate the condition df1[col] != df2[col]
for each column except id
. When the columns aren't equal we return the column name otherwise an empty string. The list of conditions will consist the items of an array from which finally we remove the empty items:
from pyspark.sql.functions import col, array, when, array_remove
# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']
select_expr =[
col("id"),
*[df2[c] for c in df2.columns if c != 'id'],
array_remove(array(*conditions_), "").alias("column_names")
]
df1.join(df2, "id").select(*select_expr).show()
# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# | 1| ABC|5000| US| []|
# | 3| GHI|3500| JPN| [sal]|
# | 2| DEF|4000| CAN| [Address]|
# | 4|JKL_M|4800| CHN| [name, sal]|
# +---+-----+----+-------+------------+
compare two dataframes and display the data that are different
Since your dataframes has the same schema, you can use subtract
df1
df1 = spark.createDataFrame([
(1, 2, 3, 4),
(5, 6, 7, 8),
], ['a', 'b', 'c', 'd'])
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 1| 2| 3| 4|
| 5| 6| 7| 8|
+---+---+---+---+
df2
df2 = spark.createDataFrame([
(5, 6, 7, 8),
], ['a', 'b', 'c', 'd'])
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 5| 6| 7| 8|
+---+---+---+---+
subtract
to get data that exists in df1
but does not exists in df2
df1.subtract(df2).show()
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 1| 2| 3| 4|
+---+---+---+---+
PySpark - Compare DataFrames
So I create a third DataFrame, joining DataFrame1 and DataFrame2, and then filter by the counts fields to check if they are equal or not:
Mismatch:
df3 = df1.join(df2, [df1.name == df2.name] , how = 'inner' )
df3.filter(df3.df1_count != df3.df2_count).show()
Match:
df3 = df1.join(df2, [df1.name == df2.name] , how = 'inner' )
df3.filter(df3.df1_count == df3.df2_count).show()
Hope this comes in useful for someone
Compare a pyspark dataframe to another dataframe
For this who are looking for an answer, I transposed the data frame and then did a comparison.
from pyspark.sql.functions import array, col, explode, struct, lit
def Transposedf(df, by,colheader):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([ struct(lit(c).alias("Field"), col(c).alias(colheader)) for c in cols ])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.Field", "kvs."+colheader])
Then the comparison looks like this
def Compare_df(df_Expected,df_Actual):
df_combined = (df_Actual
.join(df_Expected, ((df_Actual.id == df_Expected.id)
& (df_Actual.Field == df_Expected.Field)
& (df_Actual.Actual_value != df_Expected.Expected_value)))
.select([df_Actual.account_unique_id,df_Actual.Field,df_Actual.Actual_value,df_Expected.Expected_value])
)
return df_combined
I called these 2 functions as
df_Actual=Transposedf(df_Actual, ["id"],'Actual_value')
df_Expected=Transposedf(df_Expected, ["id"],'Expected_value')
#Compare the expected and actual
df_result=Compare_df(df_Expected,df_Actual)
Compare two DataFrames and check for changes
Similar questions have been asked multiple times here in SO.
Use a simple join
to get rows that are in df1
and df2
and filter on those that have different values for the 2 other columns:
from pyspark.sql.functions import col
df_final = df2.alias("new").join(
df1.alias("old"),
(col("new.PROGRAM_NAME") == col("old.PROGRAM_NAME")) & (col("new.ACTION") == col("old.ACTION"))
).filter(
(col("new.VALUE1") != col("old.VALUE1")) | (col("new.STATUS") != col("old.STATUS"))
).select("new.*")
df_final.show()
#+------------+-------+------+-----------+
#|PROGRAM_NAME| ACTION|VALUE1| STATUS|
#+------------+-------+------+-----------+
#| PROG3|ACTION1| 20| FINISHED|
#| PROG4|ACTION4| 14|IN PROGRESS|
#| PROG1|ACTION1| 11|IN PROGRESS|
#+------------+-------+------+-----------+
You can also add the filter condition directly to the join condition
How to compare two dataframes and calculate the differences in PySpark?
dataframe.subtract(dataframe)
is logical subtraction (EXCEPT DISTINCT
).
So, instead you can join and do arithmetic subtraction between columns.
df = df1.join(df2, on='City').cache()
for col in df1.columns:
if col != 'City':
df = df.withColumn('diff_' + col, df2[col] - df1[col]).drop(col)
Related Topics
Pandas Populate New Dataframe Column Based on Matching Columns in Another Dataframe
Python: Editing List While Iterating Over It
How to Check List Containing Nan
Python: How to Turn CSV Data in to Array
Loading and Parsing a Json File With Multiple Json Objects
How to Count the Amount of Sentences in a Paragraph in Python
How to Read a Date in Excel Format in Python
Sqlalchemy: How to Join Several Tables by One Query
How to Substitute Value for a Variable in a Json in Python
Convert CSV to Parquet File Using Python
How to Clear Only Last One Line in Python Output Console
Spark Equivalent of If Then Else
How to Make Python Code to Execute Only Once
How to Convert Number 1 to a Boolean in Python
Copy All Values in a Column to a New Column in a Pandas Dataframe
How to Convert Strings With Billion or Million Abbreviation into Integers in a List
Decode Utf-8 Encoding in Json String
What Does Sqlite3.Operationalerror: Near "-": Syntax Error Mean