How to get other columns when using Spark DataFrame groupby?
Long story short in general you have to join aggregated results with the original table. Spark SQL follows the same pre-SQL:1999 convention as most of the major databases (PostgreSQL, Oracle, MS SQL Server) which doesn't allow additional columns in aggregation queries.
Since for aggregations like count results are not well defined and behavior tends to vary in systems which supports this type of queries you can just include additional columns using arbitrary aggregate like first
or last
.
In some cases you can replace agg
using select
with window functions and subsequent where
but depending on the context it can be quite expensive.
Include other columns when doing groupBy and agg in SparkSQL
in the agg
expression also get first
from the others fields
dsJoin.groupBy("transID").agg(functions.max("subSeq"),functions.first("principal")).show();
Groupby column and create lists for other columns, preserving order
I don't think the order can be reliably preserved using groupBy
aggregations. So window functions seems to be the way to go.
Setup:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('abc', 789, 0, 1),
('def', 456, 1, 0),
('abc', 123, 1, 0),
('def', 321, 0, 1)],
['Id', 'timestamp', 'col1', 'col2'])
Script:
w1 = W.partitionBy('Id').orderBy('timestamp')
w2 = W.partitionBy('Id').orderBy(F.desc('timestamp'))
df = df.select(
'Id',
*[F.collect_list(c).over(w1).alias(c) for c in df.columns if c != 'Id']
)
df = (df
.withColumn('_rn', F.row_number().over(w2))
.filter('_rn=1')
.drop('_rn')
)
Result:
df.show()
# +---+----------+------+------+
# | Id| timestamp| col1| col2|
# +---+----------+------+------+
# |abc|[123, 789]|[1, 0]|[0, 1]|
# |def|[321, 456]|[0, 1]|[1, 0]|
# +---+----------+------+------+
You were also very close to what you needed. I've played around and this seems to be working too:
window_spec = W.partitionBy("Id").orderBy('timestamp')
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
df1 = (df
.withColumn("timestamp", F.collect_list("timestamp").over(ranged_spec))
.withColumn("col1", F.collect_list("col1").over(ranged_spec))
.withColumn("col2", F.collect_list("col2").over(ranged_spec))
).drop_duplicates()
df1.show()
How to get all columns after groupby on Dataset Row in spark sql 2.1.0
For your solution you have to try different approach. You was almost there for solution but let me help you understand.
Dataset<Row> resultset = studentDataSet.groupBy("name").max("age");
now what you can do is you can join the resultset
with studentDataSet
Dataset<Row> joinedDS = studentDataset.join(resultset, "name");
The problem with groupBy
this that after applying groupBy you get RelationalGroupedDataset
so it depends on what next operation you perform like sum, min, mean, max
etc then the result of these operation joined with groupBy
As in you case name
column is joined with the max
of age
so it will return only two columns but if use apply groupBy
on age
and then apply max
on 'age' column you will get two column one is age
and second is max(age)
.
Note :- code is not tested please make changes if needed
Hope this clears you query
How to groupBy in Spark using two columns and in both directions
You can create a new array column with array
containing your two group by column, sort this array and group by this array column, as follow:
import org.apache.spark.sql.functions.{array, array_sort, col, count, first}
val result = dff.withColumn("group", array_sort(array(col("src"), col("dst"))))
.groupBy("group")
.agg(first("src").as("src"), first("dst").as("dst"), count("group").as("count"))
.drop("group")
result
dataframe is as follow:
+---+---+-----+
|src|dst|count|
+---+---+-----+
|A |B |5 |
|C |A |4 |
|B |C |1 |
|C |D |2 |
+---+---+-----+
If you don't have array_sort
method (available in spark 2.4), you can use when
condition to reorder your two columns src
and dst
, as follows:
import org.apache.spark.sql.functions.{col, when}
val result = dff
.withColumn("first", when(col("dst") < col("src"), col("src")).otherwise(col("dst")))
.withColumn("second", when(col("dst") >= col("src"), col("src")).otherwise(col("dst")))
.drop("src", "dst")
.withColumnRenamed("first", "src")
.withColumnRenamed("second", "dst")
.groupBy("src", "dst")
.count()
However, this second method only works for two columns
How to apply groupby condition and get all the columns in the result?
With modified expected output you can get arbitrary values with first
:
from pyspark.sql.functions import avg, first
df.groupBy("id").agg(
first("Title"), first("Status"), first("Suite"), avg("Time")
).toDF("id", "Title", "Status", "Suite", "Time").show()
# +----+-----+------+-----+-----+
# | id|Title|Status|Suite| Time|
# +----+-----+------+-----+-----+
# | 113| XCV|Passed| GHY| 57.0|
# | 256| KJH|Passed| SNMP| 47.0|
# | 456| KJM|Passed| RTH| 70.5|
# | 115| KIM|Passed| ABC| 60.5|
# |8963| JY|Passed| JHJK| 74.0|
# | 123| KIM|Passed| ABC|30.75|
# +----+-----+------+-----+-----+
Original answer
It looks like you want to drop_duplicates
:
df.drop_duplicates(subset=["ID"]).show()
# +-----+------+-----+----+
# |Title|Status|Suite| ID|
# +-----+------+-----+----+
# | XCV|Passed| GHY| 113|
# | KJH|Passed| SNMP| 256|
# | KJM|Passed| RTH| 456|
# | KIM|Passed| ABC| 115|
# | JY|Passed| JHJK|8963|
# | KIM|Passed| ABC| 123|
# +-----+------+-----+----+
If you want to use specific row please refer to Find maximum row per group in Spark DataFrame
Select corresponding value of not included column in groupBy in spark dataframe in java
You can use rank window function partition on col1 and col2 and sort it based on the timestamp ,then select the records where rank=1 . Spark sql equivalent will be something like this.
select * from (select col1,col2,rank() over(partition by col1,col2 order by timestamp desc) as rnk)temp where rnk=1
PySpark groupBy and aggregation functions with multiple columns
Try using below code -
from pyspark.sql.functions import *
df = spark.createDataFrame([('id11', 'id21', 1), ('id11', 'id22', 2), ('id11', 'id23', 3), ('id12', 'id21', 2), ('id12', 'id23', 1), ('id13', 'id23', 2), ('id13', 'id21', 8)], ["id1", "id2","value"])
Aggregated Data -
df.groupBy("id1").agg(count("id2"),sum("value")).show()
Output -
+----+----------+----------+
| id1|count(id2)|sum(value)|
+----+----------+----------+
|id11| 3| 6|
|id12| 2| 3|
|id13| 2| 10|
+----+----------+----------+
Related Topics
When Would I Use Xml Instead of SQL
Return Setof Record (Virtual Table) from Function
How Exactly Does Using or in a MySQL Statement Differ With/Without Parentheses
Insert Select Statement in Oracle 11G
Get Execution Time of Postgresql Query
Modify Default Value in SQL Server
Should I Use SQL_Variant Data Type
How to Split String by Character into Separate Columns in SQL Server
What Is Full Text Search VS Like
Drop All Tables Whose Names Begin with a Certain String
What Are the Principles Behind, and Benefits Of, the "Party Model"
How Does This Case Expression Reach the Else Clause
Procedure or Function !!! Has Too Many Arguments Specified
Transactsql to Run Another Transactsql Script