How to Get Other Columns When Using Spark Dataframe Groupby

How to get other columns when using Spark DataFrame groupby?

Long story short in general you have to join aggregated results with the original table. Spark SQL follows the same pre-SQL:1999 convention as most of the major databases (PostgreSQL, Oracle, MS SQL Server) which doesn't allow additional columns in aggregation queries.

Since for aggregations like count results are not well defined and behavior tends to vary in systems which supports this type of queries you can just include additional columns using arbitrary aggregate like first or last.

In some cases you can replace agg using select with window functions and subsequent where but depending on the context it can be quite expensive.

Include other columns when doing groupBy and agg in SparkSQL

in the agg expression also get first from the others fields

dsJoin.groupBy("transID").agg(functions.max("subSeq"),functions.first("principal")).show();

Groupby column and create lists for other columns, preserving order

I don't think the order can be reliably preserved using groupBy aggregations. So window functions seems to be the way to go.

Setup:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('abc', 789, 0, 1),
('def', 456, 1, 0),
('abc', 123, 1, 0),
('def', 321, 0, 1)],
['Id', 'timestamp', 'col1', 'col2'])

Script:

w1 = W.partitionBy('Id').orderBy('timestamp')
w2 = W.partitionBy('Id').orderBy(F.desc('timestamp'))
df = df.select(
'Id',
*[F.collect_list(c).over(w1).alias(c) for c in df.columns if c != 'Id']
)
df = (df
.withColumn('_rn', F.row_number().over(w2))
.filter('_rn=1')
.drop('_rn')
)

Result:

df.show()
# +---+----------+------+------+
# | Id| timestamp| col1| col2|
# +---+----------+------+------+
# |abc|[123, 789]|[1, 0]|[0, 1]|
# |def|[321, 456]|[0, 1]|[1, 0]|
# +---+----------+------+------+

You were also very close to what you needed. I've played around and this seems to be working too:

window_spec = W.partitionBy("Id").orderBy('timestamp')
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

df1 = (df
.withColumn("timestamp", F.collect_list("timestamp").over(ranged_spec))
.withColumn("col1", F.collect_list("col1").over(ranged_spec))
.withColumn("col2", F.collect_list("col2").over(ranged_spec))
).drop_duplicates()
df1.show()

How to get all columns after groupby on Dataset Row in spark sql 2.1.0

For your solution you have to try different approach. You was almost there for solution but let me help you understand.

Dataset<Row> resultset = studentDataSet.groupBy("name").max("age");

now what you can do is you can join the resultset with studentDataSet

Dataset<Row> joinedDS = studentDataset.join(resultset, "name");

The problem with groupBy this that after applying groupBy you get RelationalGroupedDataset so it depends on what next operation you perform like sum, min, mean, max etc then the result of these operation joined with groupBy

As in you case name column is joined with the max of age so it will return only two columns but if use apply groupBy on age and then apply max on 'age' column you will get two column one is age and second is max(age).

Note :- code is not tested please make changes if needed
Hope this clears you query

How to groupBy in Spark using two columns and in both directions

You can create a new array column with array containing your two group by column, sort this array and group by this array column, as follow:

import org.apache.spark.sql.functions.{array, array_sort, col, count, first}

val result = dff.withColumn("group", array_sort(array(col("src"), col("dst"))))
.groupBy("group")
.agg(first("src").as("src"), first("dst").as("dst"), count("group").as("count"))
.drop("group")

result dataframe is as follow:

+---+---+-----+
|src|dst|count|
+---+---+-----+
|A |B |5 |
|C |A |4 |
|B |C |1 |
|C |D |2 |
+---+---+-----+

If you don't have array_sort method (available in spark 2.4), you can use when condition to reorder your two columns src and dst, as follows:

import org.apache.spark.sql.functions.{col, when}

val result = dff
.withColumn("first", when(col("dst") < col("src"), col("src")).otherwise(col("dst")))
.withColumn("second", when(col("dst") >= col("src"), col("src")).otherwise(col("dst")))
.drop("src", "dst")
.withColumnRenamed("first", "src")
.withColumnRenamed("second", "dst")
.groupBy("src", "dst")
.count()

However, this second method only works for two columns

How to apply groupby condition and get all the columns in the result?

With modified expected output you can get arbitrary values with first:

from pyspark.sql.functions import avg, first

df.groupBy("id").agg(
first("Title"), first("Status"), first("Suite"), avg("Time")
).toDF("id", "Title", "Status", "Suite", "Time").show()
# +----+-----+------+-----+-----+
# | id|Title|Status|Suite| Time|
# +----+-----+------+-----+-----+
# | 113| XCV|Passed| GHY| 57.0|
# | 256| KJH|Passed| SNMP| 47.0|
# | 456| KJM|Passed| RTH| 70.5|
# | 115| KIM|Passed| ABC| 60.5|
# |8963| JY|Passed| JHJK| 74.0|
# | 123| KIM|Passed| ABC|30.75|
# +----+-----+------+-----+-----+

Original answer

It looks like you want to drop_duplicates:

df.drop_duplicates(subset=["ID"]).show()
# +-----+------+-----+----+
# |Title|Status|Suite| ID|
# +-----+------+-----+----+
# | XCV|Passed| GHY| 113|
# | KJH|Passed| SNMP| 256|
# | KJM|Passed| RTH| 456|
# | KIM|Passed| ABC| 115|
# | JY|Passed| JHJK|8963|
# | KIM|Passed| ABC| 123|
# +-----+------+-----+----+

If you want to use specific row please refer to Find maximum row per group in Spark DataFrame

Select corresponding value of not included column in groupBy in spark dataframe in java

You can use rank window function partition on col1 and col2 and sort it based on the timestamp ,then select the records where rank=1 . Spark sql equivalent will be something like this.

select * from (select col1,col2,rank() over(partition by col1,col2 order by timestamp desc) as rnk)temp where rnk=1

PySpark groupBy and aggregation functions with multiple columns

Try using below code -

from pyspark.sql.functions import *

df = spark.createDataFrame([('id11', 'id21', 1), ('id11', 'id22', 2), ('id11', 'id23', 3), ('id12', 'id21', 2), ('id12', 'id23', 1), ('id13', 'id23', 2), ('id13', 'id21', 8)], ["id1", "id2","value"])

Aggregated Data -

df.groupBy("id1").agg(count("id2"),sum("value")).show()

Output -

+----+----------+----------+
| id1|count(id2)|sum(value)|
+----+----------+----------+
|id11| 3| 6|
|id12| 2| 3|
|id13| 2| 10|
+----+----------+----------+


Related Topics



Leave a reply



Submit