Select First Row in Each GROUP BY Group

Select first row in each GROUP BY group?

On databases that support CTE and windowing functions:

WITH summary AS (
SELECT p.id,
p.customer,
p.total,
ROW_NUMBER() OVER(PARTITION BY p.customer
ORDER BY p.total DESC) AS rank
FROM PURCHASES p)
SELECT *
FROM summary
WHERE rank = 1

Supported by any database:

But you need to add logic to break ties:

  SELECT MIN(x.id),  -- change to MAX if you want the highest
x.customer,
x.total
FROM PURCHASES x
JOIN (SELECT p.customer,
MAX(total) AS max_total
FROM PURCHASES p
GROUP BY p.customer) y ON y.customer = x.customer
AND y.max_total = x.total
GROUP BY x.customer, x.total

Get top 1 row of each group

;WITH cte AS
(
SELECT *,
ROW_NUMBER() OVER (PARTITION BY DocumentID ORDER BY DateCreated DESC) AS rn
FROM DocumentStatusLogs
)
SELECT *
FROM cte
WHERE rn = 1

If you expect 2 entries per day, then this will arbitrarily pick one. To get both entries for a day, use DENSE_RANK instead

As for normalised or not, it depends if you want to:

  • maintain status in 2 places
  • preserve status history
  • ...

As it stands, you preserve status history. If you want latest status in the parent table too (which is denormalisation) you'd need a trigger to maintain "status" in the parent. or drop this status history table.

Selecting first row per group

SELECT  a, b, c
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, c) rn
FROM mytable
) q
WHERE rn = 1
ORDER BY
a

or

SELECT  mi.*
FROM (
SELECT DISTINCT a
FROM mytable
) md
CROSS APPLY
(
SELECT TOP 1 *
FROM mytable mi
WHERE mi.a = md.a
ORDER BY
b, c
) mi
ORDER BY
a

Create a composite index on (a, b, c) for the queries to work faster.

Which one is more efficient depends on your data distribution.

If you have few distinct values of a but lots of records within each a, the second query would be better.

You could improve it even more by creating an indexed view:

CREATE VIEW v_mytable_da
WITH SCHEMABINDING
AS
SELECT a, COUNT_BIG(*) cnt
FROM dbo.mytable
GROUP BY
a

GO

CREATE UNIQUE CLUSTERED INDEX
pk_vmytableda_a
ON v_mytable_da (a)

GO

SELECT mi.*
FROM v_mytable_da md
CROSS APPLY
(
SELECT TOP 1 *
FROM mytable mi
WHERE mi.a = md.a
ORDER BY
b, c
) mi
ORDER BY
a

sql server select first row from a group

If as you indicated, order doesn't matter, any aggregate function on b would be sufficient.

Example Using MIN

SELECT a, b = MIN(b)
FROM YourTable
GROUP BY
a

Select first row in each group in sql

You can use distinct on directly with group by:

select distinct on ("Country") Sum("Price"), "In-app Product", "Country"
from cleandatase
group by "Country", "In-app Product"
order by "Country", Sum("Price") desc;

Note: As Thorsten points out, if there are ties and you want all the ties, then distinct on is not the simplest solution.

get first row in a group and assign values

use df.groupby(...).cumcount() to get a counter of rows within the group which you can then manipulate.

In [51]: df
Out[51]:
a b c
0 def 1 0
1 abc 0 1
2 def 1 0
3 abc 0 1

In [52]: df2 = df.sort_values(['a','b','c'])

In [53]: df2['result'] = df2.groupby(['a', 'b', 'c']).cumcount()

In [54]: df2['result'] = np.where(df2['result'] == 0, 1, 0)

In [55]: df2
Out[55]:
a b c result
1 abc 0 1 1
3 abc 0 1 0
0 def 1 0 1
2 def 1 0 0

How to get the first row per group?

if your MySQL version support ROW_NUMBER + window function, you can try to use ROW_NUMBER to get the biggest num by category_id

Query #1

SELECT num,business_id,category_id
FROM (
SELECT *,ROW_NUMBER() OVER(PARTITION BY category_id ORDER BY num desc) rn
FROM (
select count(1) num, business_id, category_id
from mytable
group by business_id, category_id
) t1
) t1
WHERE rn = 1





















numbusiness_idcategory_id
2255438
13324211

How to select the first row of each group?

Window functions:

Something like this should do the trick:

import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window

val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)

val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")

dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+

This method will be inefficient in case of significant data skew. This problem is tracked by SPARK-34775 and might be resolved in the future (SPARK-37099).

Plain SQL aggregation followed by join:

Alternatively you can join with aggregated data frame:

val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))

val dfTopByJoin = df.join(broadcast(dfMax),
($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
.drop("max_hour")
.drop("max_value")

dfTopByJoin.show

// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+

It will keep duplicate values (if there is more than one category per hour with the same total value). You can remove these as follows:

dfTopByJoin
.groupBy($"hour")
.agg(
first("category").alias("category"),
first("TotalValue").alias("TotalValue"))

Using ordering over structs:

Neat, although not very well tested, trick which doesn't require joins or window functions:

val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
.groupBy($"hour")
.agg(max("vs").alias("vs"))
.select($"Hour", $"vs.Category", $"vs.TotalValue")

dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+

With DataSet API (Spark 1.6+, 2.0+):

Spark 1.6:

case class Record(Hour: Integer, Category: String, TotalValue: Double)

df.as[Record]
.groupBy($"hour")
.reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
.show

// +---+--------------+
// | _1| _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+

Spark 2.0 or later:

df.as[Record]
.groupByKey(_.Hour)
.reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)

The last two methods can leverage map side combine and don't require full shuffle so most of the time should exhibit a better performance compared to window functions and joins. These cane be also used with Structured Streaming in completed output mode.

Don't use:

df.orderBy(...).groupBy(...).agg(first(...), ...)

It may seem to work (especially in the local mode) but it is unreliable (see SPARK-16207, credits to Tzach Zohar for linking relevant JIRA issue, and SPARK-30335).

The same note applies to

df.orderBy(...).dropDuplicates(...)

which internally uses equivalent execution plan.



Related Topics



Leave a reply



Submit