Pyspark - Compare Dataframes

Compare two dataframes Pyspark

Assuming that we can use id to join these two datasets I don't think that there is a need for UDF. This could be solved just by using inner join, array and array_remove functions among others.

First let's create the two datasets:

df1 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "UK"],
[3, "GHI", 3000, "JPN"],
[4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "CAN"],
[3, "GHI", 3500, "JPN"],
[4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

First we do an inner join between the two datasets then we generate the condition df1[col] != df2[col] for each column except id. When the columns aren't equal we return the column name otherwise an empty string. The list of conditions will consist the items of an array from which finally we remove the empty items:

from pyspark.sql.functions import col, array, when, array_remove

# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']

select_expr =[
col("id"),
*[df2[c] for c in df2.columns if c != 'id'],
array_remove(array(*conditions_), "").alias("column_names")
]

df1.join(df2, "id").select(*select_expr).show()

# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# | 1| ABC|5000| US| []|
# | 3| GHI|3500| JPN| [sal]|
# | 2| DEF|4000| CAN| [Address]|
# | 4|JKL_M|4800| CHN| [name, sal]|
# +---+-----+----+-------+------------+

compare two dataframes and display the data that are different

Since your dataframes has the same schema, you can use subtract

df1
df1 = spark.createDataFrame([
(1, 2, 3, 4),
(5, 6, 7, 8),
], ['a', 'b', 'c', 'd'])

+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 1| 2| 3| 4|
| 5| 6| 7| 8|
+---+---+---+---+
df2
df2 = spark.createDataFrame([
(5, 6, 7, 8),
], ['a', 'b', 'c', 'd'])

+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 5| 6| 7| 8|
+---+---+---+---+
subtract to get data that exists in df1 but does not exists in df2
df1.subtract(df2).show()

+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 1| 2| 3| 4|
+---+---+---+---+

PySpark - Compare DataFrames

So I create a third DataFrame, joining DataFrame1 and DataFrame2, and then filter by the counts fields to check if they are equal or not:

Mismatch:

df3 = df1.join(df2, [df1.name == df2.name] , how = 'inner' )
df3.filter(df3.df1_count != df3.df2_count).show()

Match:

df3 = df1.join(df2, [df1.name == df2.name] , how = 'inner' )
df3.filter(df3.df1_count == df3.df2_count).show()

Hope this comes in useful for someone

Compare a pyspark dataframe to another dataframe

For this who are looking for an answer, I transposed the data frame and then did a comparison.

from pyspark.sql.functions import array, col, explode, struct, lit
def Transposedf(df, by,colheader):

# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"

# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([ struct(lit(c).alias("Field"), col(c).alias(colheader)) for c in cols ])).alias("kvs")

return df.select(by + [kvs]).select(by + ["kvs.Field", "kvs."+colheader])

Then the comparison looks like this

def Compare_df(df_Expected,df_Actual):
df_combined = (df_Actual
.join(df_Expected, ((df_Actual.id == df_Expected.id)
& (df_Actual.Field == df_Expected.Field)
& (df_Actual.Actual_value != df_Expected.Expected_value)))
.select([df_Actual.account_unique_id,df_Actual.Field,df_Actual.Actual_value,df_Expected.Expected_value])
)
return df_combined

I called these 2 functions as

df_Actual=Transposedf(df_Actual, ["id"],'Actual_value')
df_Expected=Transposedf(df_Expected, ["id"],'Expected_value')

#Compare the expected and actual
df_result=Compare_df(df_Expected,df_Actual)

Compare two DataFrames and check for changes

Similar questions have been asked multiple times here in SO.

Use a simple join to get rows that are in df1 and df2 and filter on those that have different values for the 2 other columns:

from pyspark.sql.functions import col

df_final = df2.alias("new").join(
df1.alias("old"),
(col("new.PROGRAM_NAME") == col("old.PROGRAM_NAME")) & (col("new.ACTION") == col("old.ACTION"))
).filter(
(col("new.VALUE1") != col("old.VALUE1")) | (col("new.STATUS") != col("old.STATUS"))
).select("new.*")

df_final.show()

#+------------+-------+------+-----------+
#|PROGRAM_NAME| ACTION|VALUE1| STATUS|
#+------------+-------+------+-----------+
#| PROG3|ACTION1| 20| FINISHED|
#| PROG4|ACTION4| 14|IN PROGRESS|
#| PROG1|ACTION1| 11|IN PROGRESS|
#+------------+-------+------+-----------+

You can also add the filter condition directly to the join condition

How to compare two dataframes and calculate the differences in PySpark?

dataframe.subtract(dataframe) is logical subtraction (EXCEPT DISTINCT).

enter image description here



So, instead you can join and do arithmetic subtraction between columns.
df = df1.join(df2, on='City').cache()

for col in df1.columns:
if col != 'City':
df = df.withColumn('diff_' + col, df2[col] - df1[col]).drop(col)


Related Topics



Leave a reply



Submit