Aggregation with Group By date in Spark SQL
I solved the issue by adding this function:
def convert( time:Long ) : String = {
val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd")
return sdf.format(new java.util.Date(time))
}
And registering it into the sqlContext like this:
sqlContext.registerFunction("convert", convert _)
Then I could finally group by date:
select * from table convert(time)
Group spark dataframe by date
Since 1.5.0 Spark provides a number of functions like dayofmonth
, hour
, month
or year
which can operate on dates and timestamps. So if timestamp
is a TimestampType
all you need is a correct expression. For example:
from pyspark.sql.functions import hour, mean
(df
.groupBy(hour("timestamp").alias("hour"))
.agg(mean("value").alias("mean"))
.show())
## +----+------------------+
## |hour| mean|
## +----+------------------+
## | 0|508.05999999999995|
## | 1| 449.8666666666666|
## | 2| 524.9499999999999|
## | 3|264.59999999999997|
## +----+------------------+
Pre-1.5.0 your best option is to use HiveContext
and Hive UDFs either with selectExpr
:
df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum()
## +----+---------+----------+
## |year|SUM(year)|SUM(value)|
## +----+---------+----------+
## |2015| 40300| 9183.0|
## +----+---------+----------+
or raw SQL:
df.registerTempTable("df")
sqlContext.sql("""
SELECT MONTH(timestamp) AS month, SUM(value) AS values_sum
FROM df
GROUP BY MONTH(timestamp)""")
Just remember that aggregation is performed by Spark not pushed-down to the external source. Usually it is a desired behavior but there are situations when you may prefer to perform aggregation as a subquery to limit data transfer.
Aggregate a spark dataframe based on and before date
You can combine standard aggregation with window function, but the second stage won't be distributed
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
df
.groupBy($"start_date")
.agg(approx_count_distinct($"column1").alias("count"))
.withColumn(
"cumulative_count", sum($"count").over(Window.orderBy($"start_date")))
String aggregation and group by in PySpark
You can collect lists of struct ofTimestamp
and Value
(in that order) for each Id
, sort them (sort_array
will sort by the first value of struct, i.e Timestamp
) and combine Value
's values into string using concat_ws
.
PySpark (Spark 3.1.2)
import pyspark.sql.functions as F
(df
.groupBy("Id")
.agg(F.expr("concat_ws(';', sort_array(collect_list(struct(Timestamp, Value))).Value) as Values"))
).show(truncate=False)
# +---+-----------+
# |Id |Values |
# +---+-----------+
# |Id1|100;300;200|
# |Id2|433 |
# +---+-----------+
in SparkSQL
SELECT Id, concat_ws(';', sort_array(collect_list(struct(Timestamp, Value))).Value) as Values
FROM table
GROUP BY Id
How to group by time interval in Spark SQL
Spark >= 2.0
You can use window
(not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+
Spark < 2.0
Lets start with example data:
import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0
val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")
I assume that event is identified by KEY
. If this is not the case you can adjust GROUP BY
/ PARTITION BY
clauses according to your requirements.
If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round
import org.apache.spark.sql.functions.{round, sum}
// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")
// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")
df.groupBy($"KEY", interval).sum("metric")
// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+
If you're interested in a window relative to the current row use window functions:
import org.apache.spark.sql.expressions.Window
// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+
For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext
to make it work.
aggregated at week start date(Monday) for the complete week
Windows default to start at 1970-01-01 which is a Thursday. You can use
window("Time", "7 day", startTime="4 days")
to shift that to Mondays.
Group days into weeks with totals PySpark
Try with this approach using date_sub,next_day
functions in spark.
Explanation:
date_sub(
next_day(col("day"),"sunday"), //get next sunday date
7)) //substract week from the date
Example:
In pyspark:
from pyspark.sql.functions import *
df = sc.parallelize([("2009-01-03","1","0"),("2009-01-09","14","0"),("2009-01-10","61","0")]).toDF(["day","bitcoin_total","dash_total"])
df.withColumn("week_strt_day",date_sub(next_day(col("day"),"sunday"),7)).groupBy("week_strt_day").agg(sum("bitcoin_total").cast("int").alias("bitcoin_total"),sum("dash_total").cast("int").alias("dash_total")).orderBy("week_strt_day").show()
Result:
+-------------+-------------+----------+
|week_strt_day|bitcoin_total|dash_total|
+-------------+-------------+----------+
| 2008-12-28| 1| 0|
| 2009-01-04| 75| 0|
+-------------+-------------+----------+
In scala:
import org.apache.spark.sql.functions._
val df=Seq(("2009-01-03","1","0"),("2009-01-09","14","0"),("2009-01-10","61","0")).toDF("day","bitcoin_total","dash_total")
df.withColumn("week_strt_day",date_sub(next_day('day,"sunday"),7)).groupBy("week_strt_day").agg(sum("bitcoin_total").cast("int").alias("bitcoin_total"),sum("dash_total").cast("int").alias("dash_total")).orderBy("week_strt_day").show()
Result:
+-------------+-------------+----------+
|week_strt_day|bitcoin_total|dash_total|
+-------------+-------------+----------+
| 2008-12-28| 1| 0|
| 2009-01-04| 75| 0|
+-------------+-------------+----------+
PySpark - Group by ID & Date, and Sum in mins by a time column
If only two latest rows have to be compared, then Window "lead" function can be used, on Scala:
val df = Seq(
("2012-05-05", "A", "13:14:07.000000"),
("2012-05-05", "A", "13:54:08.000000"),
("2013-01-01", "B", "14:40:26.000000"),
("2013-01-01", "B", "14:48:27.000000"),
("2014-04-03", "C", "17:17:30.000000"),
("2014-04-03", "C", "17:47:31.000000")
).toDF("record_date", "Tag", "time")
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", $"unixTimestamp" - lead($"unixTimestamp", 1, 0).over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(recordTagWindow))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
Output (look like yours example incorrect for first row):
+-----------+---+-----------------+
|record_date|Tag|timeDiffFormatted|
+-----------+---+-----------------+
|2012-05-05 |A |00:40:01 |
|2013-01-01 |B |00:08:01 |
|2014-04-03 |C |00:30:01 |
+-----------+---+-----------------+
For more than two rows, functions "first" and "last" can be used, and Window modified to include all values (rowsBetween):
val recordTagWindow = Window.partitionBy("record_date", "Tag").orderBy(desc("time"))
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df
.withColumn("time", substring($"time", 1, 8))
.withColumn("unixTimestamp", unix_timestamp($"time", "HH:mm:ss"))
.withColumn("timeDiffSeconds", first($"unixTimestamp").over(recordTagWindow) - last($"unixTimestamp").over(recordTagWindow))
.withColumn("timeDiffFormatted", date_format($"timeDiffSeconds".cast(TimestampType).cast(TimestampType), "HH:mm:ss"))
.withColumn("rownum", row_number().over(Window.partitionBy("record_date", "Tag").orderBy(desc("time"))))
.where($"rownum" === 1)
.drop("rownum", "timeDiffSeconds", "time", "unixTimestamp")
.show(false)
Related Topics
Postgres Group by on JSONb Inner Field
MySQL Procedure to Update Numeric Reference in Previous Rows When One Is Updated
How to Use Group_Concat in Rails
Update Multiple Records in Multiple Nested Tables in Oracle
Get Only Date Without Time in Oracle
Sql Select Rows Containing Part of String
Displaying Columns as Rows in SQL Server 2005
Does SQL Server 2008 Support The Create Assertion Syntax
How to Select Most Frequent Value in a Column Per Each Id Group
Cascading Deletes in Postgresql
Is Postgresql Order Fully Guaranteed If Sorting on a Non-Unique Attribute
Postgresql Batch Insert or Ignore
How to Calculate Ratios in Sql
Select The Last Row in a SQL Table
How to Update SQL Table from Excel Directly
Datareader.Getfieldtype Returned Null