GroupBy column and filter rows with maximum value in Pyspark
You can do this without a udf
using a Window
.
Consider the following example:
import pyspark.sql.functions as f
data = [
('a', 5),
('a', 8),
('a', 7),
('b', 1),
('b', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 5|
#| a| 8|
#| a| 7|
#| b| 1|
#| b| 3|
#+---+---+
Create a Window
to partition by column A
and use this to compute the maximum of each group. Then filter out the rows such that the value in column B
is equal to the max.
from pyspark.sql import Window
w = Window.partitionBy('A')
df.withColumn('maxB', f.max('B').over(w))\
.where(f.col('B') == f.col('maxB'))\
.drop('maxB')\
.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 8|
#| b| 3|
#+---+---+
Or equivalently using pyspark-sql
:
df.registerTempTable('table')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#| A| B|
#+---+---+
#| b| 3|
#| a| 8|
#+---+---+
Groupby and return the row label of the maximum value in PySpark Dataframe
You can use window functions, i.e. row_number
from pyspark.sql import functions as F, Window as W
w = W.partitionBy('group').orderBy(F.desc('len_text'))
df = df.withColumn('_rn', F.row_number().over(w))
df = df.filter('_rn=1').drop('_rn')
df.show()
# +--------------------+----------+-----+--------+
# | text|word_count|group|len_text|
# +--------------------+----------+-----+--------+
# | I went home| 3| 1| 11|
# | I looked at the cat| 4| 2| 19|
# |The cat looked at me| 5| 3| 20|
# | I went homes| 3| 4| 12|
# +--------------------+----------+-----+--------+
Find maximum row per group in Spark DataFrame
Using join
(it will result in more than one row in group in case of ties):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
Using window functions (will drop ties):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
Using struct
ordering:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
See also How to select the first row of each group?
Create new column with max value based on filtered rows with groupby in pyspark
Use when
function for conditionnal aggregation max
:
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy('id')
foo = foo.withColumn('max_value', F.max(F.when(F.col('col') == F.col('col_b'), F.col('value'))).over(w))
How to select rows with max values in categories?
The only solution that comes to my mind is to :
- Get the highest day for each ID (using groupBy)
- Append the value of the highest day to each line (with matching ID) using join
- Then a simple filter where the value of the two lines match
# select the max value for each of the ID
maxDayForIDs = df.groupBy("ID").max("day").withColumnRenamed("max(day)", "maxDay")
# now add the max value of the day for each line (with matching ID)
df = df.join(maxDayForIDs, "ID")
# keep only the lines where it matches "day" equals "maxDay"
df = df.filter(df.day == df.maxDay)
Querying one column by max value on another column after groupBy
First groupBy to identify rows with the largest count for each category_code, then join with the original dataframe to retrieve brand value corresponding to max count:
df1 = df.groupBy("category_code").agg(F.max("count").alias("count"))
df2 = df.join(df1, ["count", "category_code"]).drop("count")
this will produce df2 as follows
category_code brand
---------------------------
electronics.smart... samsung
electronics.video.tv samsung
electronics.audio apple
computers.notebook acer
electronics.clocks casio
How to select all columns for rows with max value
In spark there are two ways either join it with the previous dataframe like this :
a=df.groupby(df['id']).agg({"date": "max"}
df = df.join(
a,
on = "id",
how = "inner"
)
df.show()
or use window partition by like this :
from pyspark.sql import Window
import pyspark.sql.functions as F
window = Window.partitionBy("id")
a = df.withColumn(
"max",
(F.max(F.col("date")).over(window))
)
a.show()
I would say to prefer the first one as it is less costly even after join.
How to get a single row with the maximum value while keeping the whole row?
It is necessary to group the rows after the id
column and then find the maximum value for the charge
column in each group. If groupBy
is used to achieve this the name
column will disappear, as you noticed. Another way is to use a window
and partition by id
.
To make sure that both rows are kept when the id
and charge
are the same value but the name
is different, the best way is to add a new column maxCharge
and then filter
the dataframe.
Using the example dataframe from the question:
val w = Window.partitionBy($"id")
val df2 = df.withColumn("maxCharge", max("charge").over(w))
.filter($"maxCharge" === $"charge")
.drop("charge")
.withColumnRenamed("maxCharge", "charge")
Here, first a new column is added with the max value for each id
. Then the rows with a charge
value less than this are removed. Finally, the new column is renamed charge
to match the required output.
Final result:
+---+----+------+
| id|name|charge|
+---+----+------+
| 22| aa| 40|
| 22| bb| 40|
| 11| mm| 20|
+---+----+------+
Get row with maximum value from groupby with several columns in PySpark
Based on your expected output, it seems you are only grouping by id
and ship
- since you already have distinct values in grouped
- and consequently drop duplicate elements based on the columns id
, ship
and count
, sorted by type
.
To accomplish this, we can use Window
functions:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
window = (Window
.partitionBy(grouped['id'],
grouped['ship'])
.orderBy(grouped['count'].desc(), grouped['type']))
(grouped
.select('*', rank()
.over(window)
.alias('rank'))
.filter(col('rank') == 1)
.orderBy(col('id'))
.dropDuplicates(['id', 'ship', 'count'])
.drop('rank')
.show())
+---+----+----+-----+
| id|type|ship|count|
+---+----+----+-----+
| 0| D|DOCK| 2|
| 0| A|PORT| 3|
| 1| A|DOCK| 1|
| 1| B|PORT| 3|
| 2| C|DOCK| 1|
| 2| A|PORT| 1|
| 3| A|DOCK| 1|
| 3| C|PORT| 2|
+---+----+----+-----+
Related Topics
Python Create Unix Timestamp Five Minutes in the Future
Group Dataframe and Get Sum and Count
Find the Indexes of All Regex Matches
Ambiguity in Pandas Dataframe/Numpy Array "Axis" Definition
Pandas: Change Data Type of Series to String
Efficient Way to Remove Keys with Empty Strings from a Dict
How to Check If an Object Is a List or Tuple (But Not String)
Putting a 'Cookie' in a 'Cookiejar'
Python/Selenium Incognito/Private Mode
Download File Using Partial Download (Http)
Asyncio.Gather VS Asyncio.Wait
Web Scraping Dynamic Content with Python
Python List Comprehension - Want to Avoid Repeated Evaluation
Why Is 'Object' an Instance of 'Type' and 'Type' an Instance of 'Object'