Comparing Columns in Pyspark

Comparing columns in Pyspark

You can reduce using SQL expressions over a list of columns:

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
return reduce(
lambda x, y: when(x > y, x).otherwise(y),
[col(c) if isinstance(c, str) else c for c in cols]
)

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
.toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ also provides least, greatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

If you want to keep name of the max you can use `structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

And finally you can use above to find select "top" column:

from pyspark.sql.functions import max

((_, c), ) = (maxs
.groupBy(col("maxs")["col"].alias("col"))
.count()
.agg(max(struct(col("count"), col("col"))))
.first())

df.select(c)

In Pyspark how do I compare two columns and use x whenever they are not the same

You can use a column expression in place of your literal. This would look very close to what you have,

SUMMARY = SUMMARY.withColumn(
"type_description",
F.when(SUMMARY.type_description != SUMMARY.rename_description, SUMMARY.x).otherwise(
SUMMARY.type_description
)
)

Comparing 3 columns in PySpark

If I correctly understand, you can compare with greatest and return the column names , then concat:
Example:

Input:

np.random.seed(111)
df = spark.createDataFrame(pd.DataFrame(np.random.randint(0,100,(5,5)),
columns=list('ABCDE')))
df.show()

+---+---+---+---+---+
| A| B| C| D| E|
+---+---+---+---+---+
| 84| 84| 84| 86| 19|
| 41| 66| 82| 40| 71|
| 57| 7| 12| 10| 65|
| 88| 28| 14| 34| 21|
| 54| 72| 37| 76| 58|
+---+---+---+---+---+

Proposed solution:

import pyspark.sql.functions as F

cols = ['A','B','C']
df.withColumn("max_of_ABC",F.concat_ws("",
*[F.when(F.col(i) == F.greatest(*cols),i) for i in cols])).show()

+---+---+---+---+---+----------+
| A| B| C| D| E|max_of_ABC|
+---+---+---+---+---+----------+
| 84| 84| 84| 86| 19| ABC|
| 41| 66| 82| 40| 71| C|
| 57| 7| 12| 10| 65| A|
| 88| 28| 14| 34| 21| A|
| 54| 72| 37| 76| 58| B|
+---+---+---+---+---+----------+

Pyspark: Compare column value with another value

Use F.lit(1.5) inside F.least, because it requires a column and does not accept a float:

df2 = df.withColumn('new_col', F.when(cond, F.least(F.col('col1')*0.2, F.lit(1.5))).otherwise(F.lit(100)))

Pyspark: compare one column with other columns and flag if similar

You can use Spark built-in function isin

from pyspark.sql import functions as F

df = (spark
.sparkContext
.parallelize([
('A', 'A', 'B', 'C'),
('A', 'B', 'C', 'A'),
('A', 'C', 'A', 'B'),
('A', 'X', 'Y', 'Z'),
])
.toDF(['ca', 'cb', 'cc', 'cd'])
)

(df
.withColumn('flag', F.col('ca').isin(
F.col('cb'),
F.col('cc'),
F.col('cd'),
))
.show()
)
# +---+---+---+---+-----+
# | ca| cb| cc| cd| flag|
# +---+---+---+---+-----+
# | A| A| B| C| true|
# | A| B| C| A| true|
# | A| C| A| B| true|
# | A| X| Y| Z|false|
# +---+---+---+---+-----+

Compare two couple of columns from two different pyspark dataframe to display the data that are different

I think what you want is:

df2.subtract(df1.intersect(df2)).show()

I want what is in df2 that is not in both df1 and df2.

+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| c| j|4.2| 1|
| c| d|3.3| 5|
| c| d|7.3| 7|
+---+---+---+---+

I also agree with @pltc that call out you might have made a mistake in your output table.



Related Topics



Leave a reply



Submit