Aggregation with Group by Date in Spark Sql

Aggregation with Group By date in Spark SQL

I solved the issue by adding this function:

def convert( time:Long ) : String = {
val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd")
return sdf.format(new java.util.Date(time))
}

And registering it into the sqlContext like this:

sqlContext.registerFunction("convert", convert _)

Then I could finally group by date:

select * from table convert(time)

Group spark dataframe by date

Since 1.5.0 Spark provides a number of functions like dayofmonth, hour, month or year which can operate on dates and timestamps. So if timestamp is a TimestampType all you need is a correct expression. For example:

from pyspark.sql.functions import hour, mean

(df
.groupBy(hour("timestamp").alias("hour"))
.agg(mean("value").alias("mean"))
.show())

## +----+------------------+
## |hour| mean|
## +----+------------------+
## | 0|508.05999999999995|
## | 1| 449.8666666666666|
## | 2| 524.9499999999999|
## | 3|264.59999999999997|
## +----+------------------+

Pre-1.5.0 your best option is to use HiveContext and Hive UDFs either with selectExpr:

df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum()

## +----+---------+----------+
## |year|SUM(year)|SUM(value)|
## +----+---------+----------+
## |2015| 40300| 9183.0|
## +----+---------+----------+

or raw SQL:

df.registerTempTable("df")

sqlContext.sql("""
SELECT MONTH(timestamp) AS month, SUM(value) AS values_sum
FROM df
GROUP BY MONTH(timestamp)""")

Just remember that aggregation is performed by Spark not pushed-down to the external source. Usually it is a desired behavior but there are situations when you may prefer to perform aggregation as a subquery to limit data transfer.

Aggregate a spark dataframe based on and before date

You can combine standard aggregation with window function, but the second stage won't be distributed

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

df
.groupBy($"start_date")
.agg(approx_count_distinct($"column1").alias("count"))
.withColumn(
"cumulative_count", sum($"count").over(Window.orderBy($"start_date")))

String aggregation and group by in PySpark

You can collect lists of struct ofTimestamp and Value (in that order) for each Id, sort them (sort_array will sort by the first value of struct, i.e Timestamp) and combine Value's values into string using concat_ws.

PySpark (Spark 3.1.2)

import pyspark.sql.functions as F

(df
.groupBy("Id")
.agg(F.expr("concat_ws(';', sort_array(collect_list(struct(Timestamp, Value))).Value) as Values"))
).show(truncate=False)

# +---+-----------+
# |Id |Values |
# +---+-----------+
# |Id1|100;300;200|
# |Id2|433 |
# +---+-----------+

in SparkSQL

SELECT Id, concat_ws(';', sort_array(collect_list(struct(Timestamp, Value))).Value) as Values
FROM table
GROUP BY Id

How to group by time interval in Spark SQL

Spark >= 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+

Spark < 2.0

Lets start with example data:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+

If you're interested in a window relative to the current row use window functions:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

aggregated at week start date(Monday) for the complete week

Windows default to start at 1970-01-01 which is a Thursday. You can use

window("Time", "7 day", startTime="4 days")

to shift that to Mondays.

Group days into weeks with totals PySpark

Try with this approach using date_sub,next_day functions in spark.

Explanation:

date_sub(
next_day(col("day"),"sunday"), //get next sunday date
7)) //substract week from the date

Example:

In pyspark:

from pyspark.sql.functions import *
df = sc.parallelize([("2009-01-03","1","0"),("2009-01-09","14","0"),("2009-01-10","61","0")]).toDF(["day","bitcoin_total","dash_total"])
df.withColumn("week_strt_day",date_sub(next_day(col("day"),"sunday"),7)).groupBy("week_strt_day").agg(sum("bitcoin_total").cast("int").alias("bitcoin_total"),sum("dash_total").cast("int").alias("dash_total")).orderBy("week_strt_day").show()

Result:

+-------------+-------------+----------+
|week_strt_day|bitcoin_total|dash_total|
+-------------+-------------+----------+
| 2008-12-28| 1| 0|
| 2009-01-04| 75| 0|
+-------------+-------------+----------+

In scala:

import org.apache.spark.sql.functions._
val df=Seq(("2009-01-03","1","0"),("2009-01-09","14","0"),("2009-01-10","61","0")).toDF("day","bitcoin_total","dash_total")
df.withColumn("week_strt_day",date_sub(next_day('day,"sunday"),7)).groupBy("week_strt_day").agg(sum("bitcoin_total").cast("int").alias("bitcoin_total"),sum("dash_total").cast("int").alias("dash_total")).orderBy("week_strt_day").show()

Result:

+-------------+-------------+----------+
|week_strt_day|bitcoin_total|dash_total|
+-------------+-------------+----------+
| 2008-12-28| 1| 0|
| 2009-01-04| 75| 0|
+-------------+-------------+----------+

PySpark - Group by ID & Date, and Sum in mins by a time column

If only two latest rows have to be compared, then Window "lead" function can be used, on Scala:

val df = Seq(
("2012-05-05", "A", "13:14:07.000000"),
("2012-05-05", "A", "13:54:08.000000"),
("2013-01-01", "B", "14:40:26.000000"),
("2013-01-01", "B", "14:48:27.000000"),
("2014-04-03", "C", "17:17:30.000000"),
("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")

val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))

df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(recordTagWindow))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")

Output (look like yours example incorrect for first row):

+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A |00:40:01 |
|2013-01-01 |B |00:08:01 |
|2014-04-03 |C |00:30:01 |
+-----------+---+-----------------+

For more than two rows, functions "first" and "last" can be used, and Window modified to include all values (rowsBetween):

val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
.show(false)


Related Topics



Leave a reply



Submit