Groupby Column and Filter Rows with Maximum Value in Pyspark

GroupBy column and filter rows with maximum value in Pyspark

You can do this without a udf using a Window.

Consider the following example:

import pyspark.sql.functions as f
data = [
('a', 5),
('a', 8),
('a', 7),
('b', 1),
('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 5|
#| a| 8|
#| a| 7|
#| b| 1|
#| b| 3|
#+---+---+

Create a Window to partition by column A and use this to compute the maximum of each group. Then filter out the rows such that the value in column B is equal to the max.

from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
.where(f.col('B') == f.col('maxB'))\
.drop('maxB')\
.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 8|
#| b| 3|
#+---+---+

Or equivalently using pyspark-sql:

df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#| A| B|
#+---+---+
#| b| 3|
#| a| 8|
#+---+---+

Groupby and return the row label of the maximum value in PySpark Dataframe

You can use window functions, i.e. row_number

from pyspark.sql import functions as F, Window as W

w = W.partitionBy('group').orderBy(F.desc('len_text'))
df = df.withColumn('_rn', F.row_number().over(w))
df = df.filter('_rn=1').drop('_rn')

df.show()
# +--------------------+----------+-----+--------+
# | text|word_count|group|len_text|
# +--------------------+----------+-----+--------+
# | I went home| 3| 1| 11|
# | I looked at the cat| 4| 2| 19|
# |The cat looked at me| 5| 3| 20|
# | I went homes| 3| 4| 12|
# +--------------------+----------+-----+--------+

Find maximum row per group in Spark DataFrame

Using join (it will result in more than one row in group in case of ties):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

Using window functions (will drop ties):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))

Using struct ordering:

from pyspark.sql.functions import struct

(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))

See also How to select the first row of each group?

Create new column with max value based on filtered rows with groupby in pyspark

Use when function for conditionnal aggregation max:

from pyspark.sql import Window
from pyspark.sql import functions as F

w = Window.partitionBy('id')

foo = foo.withColumn('max_value', F.max(F.when(F.col('col') == F.col('col_b'), F.col('value'))).over(w))

How to select rows with max values in categories?

The only solution that comes to my mind is to :

  • Get the highest day for each ID (using groupBy)
  • Append the value of the highest day to each line (with matching ID) using join
  • Then a simple filter where the value of the two lines match
# select the max value for each of the ID
maxDayForIDs = df.groupBy("ID").max("day").withColumnRenamed("max(day)", "maxDay")

# now add the max value of the day for each line (with matching ID)
df = df.join(maxDayForIDs, "ID")

# keep only the lines where it matches "day" equals "maxDay"
df = df.filter(df.day == df.maxDay)

Querying one column by max value on another column after groupBy

First groupBy to identify rows with the largest count for each category_code, then join with the original dataframe to retrieve brand value corresponding to max count:

df1 =  df.groupBy("category_code").agg(F.max("count").alias("count"))

df2 = df.join(df1, ["count", "category_code"]).drop("count")

this will produce df2 as follows

category_code          brand
---------------------------
electronics.smart... samsung
electronics.video.tv samsung
electronics.audio apple
computers.notebook acer
electronics.clocks casio

How to select all columns for rows with max value

In spark there are two ways either join it with the previous dataframe like this :

a=df.groupby(df['id']).agg({"date": "max"}
df = df.join(
a,
on = "id",
how = "inner"
)
df.show()

or use window partition by like this :

from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy("id")
a = df.withColumn(
"max",
(F.max(F.col("date")).over(window))
)
a.show()

I would say to prefer the first one as it is less costly even after join.

How to get a single row with the maximum value while keeping the whole row?

It is necessary to group the rows after the id column and then find the maximum value for the charge column in each group. If groupBy is used to achieve this the name column will disappear, as you noticed. Another way is to use a window and partition by id.

To make sure that both rows are kept when the id and charge are the same value but the name is different, the best way is to add a new column maxCharge and then filter the dataframe.

Using the example dataframe from the question:

val w = Window.partitionBy($"id")
val df2 = df.withColumn("maxCharge", max("charge").over(w))
.filter($"maxCharge" === $"charge")
.drop("charge")
.withColumnRenamed("maxCharge", "charge")

Here, first a new column is added with the max value for each id. Then the rows with a charge value less than this are removed. Finally, the new column is renamed charge to match the required output.

Final result:

+---+----+------+
| id|name|charge|
+---+----+------+
| 22| aa| 40|
| 22| bb| 40|
| 11| mm| 20|
+---+----+------+

Get row with maximum value from groupby with several columns in PySpark

Based on your expected output, it seems you are only grouping by id and ship - since you already have distinct values in grouped - and consequently drop duplicate elements based on the columns id, ship and count, sorted by type.

To accomplish this, we can use Window functions:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = (Window
.partitionBy(grouped['id'],
grouped['ship'])
.orderBy(grouped['count'].desc(), grouped['type']))

(grouped
.select('*', rank()
.over(window)
.alias('rank'))
.filter(col('rank') == 1)
.orderBy(col('id'))
.dropDuplicates(['id', 'ship', 'count'])
.drop('rank')
.show())
+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
| 0| D|DOCK| 2|
| 0| A|PORT| 3|
| 1| A|DOCK| 1|
| 1| B|PORT| 3|
| 2| C|DOCK| 1|
| 2| A|PORT| 1|
| 3| A|DOCK| 1|
| 3| C|PORT| 2|
+---+----+----+-----+


Related Topics



Leave a reply



Submit