How to aggregate over rolling time window with groups in Spark
Revised answer:
You can use a simple window functions trick here. A bunch of imports:
from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window
window definition:
w = Window.partitionBy("group_by").orderBy("date")
Cast date
to DateType
:
df_ = df.withColumn("date", col("date").cast("date"))
Define following expressions:
# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))
# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")
# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")
Add subgroup
expression to the table:
df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
| group1| 0| 5.0|
| group2| 0| 20.0|
| group2| 1| 8.0|
+--------+--------+------------+
first
is not meaningful with aggregations, but if column is monotonically increasing you can use min
. Otherwise you'll have to use window functions as well.
Tested using Spark 2.1. May require subqueries and Window
instance when used with earlier Spark release.
The original answer (not relevant in the specified scope)
Since Spark 2.0 you should be able to use a window
function:
Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).
from pyspark.sql.functions import window
df.groupBy(window("date", windowDuration="30 days")).count()
but you can see from the result,
+---------------------------------------------+-----+
|window |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1 |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2 |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1 |
+---------------------------------------------+-----+
you'll have to be a bit careful when it comes to timezones.
pyspark high performance rolling/window aggregations on timeseries data
NOTE: I'm going to mark this as the accepted answer for the time being. If someone finds a faster/better please notify me and I'll switch it!
EDIT Clarification: The calculations shown here assume input dataframes pre-processed to the day day-level with day-level rolling calculations
After I posted the question I tested several different options on my real dataset (and got some input from coworkers) and I believe the fastest way to do this (for large datasets) uses pyspark.sql.functions.window()
with groupby().agg
instead of pyspark.sql.window.Window()
.
A similar answer can be found here
The steps to make this work are:
- sort dataframe by
name
anddate
(in example dataframe) .persist()
dataframe- Compute grouped dataframe using
F.window()
and join back todf
for every window required.
The best/easiest way to see this in action is on the SQL diagram in the Spark GUI thing. When Window()
is used, the SQL execution is totally sequential. However, when F.window()
is used, the diagram shows parallelization! NOTE: on small datasets Window()
still seems faster.
In my tests with real data on 7-day windows, Window()
was 3-5x slower than F.window()
. The only downside is F.window()
is a bit less convenient to use. I've shown some code and screenshots below for reference
Fastest Solution Found (F.window()
with groupby.agg()
)
# this turned out to be super important for tricking spark into parallelizing things
df = df.orderBy("name", "date")
df.persist()
fwindow7 = F.window(
F.col("date"),
windowDuration="7 days",
slideDuration="1 days",
).alias("window")
gb7 = (
df
.groupBy(F.col("name"), fwindow7)
.agg(
F.sum(F.col("value")).alias("sum7"),
F.avg(F.col("value")).alias("mean7"),
F.min(F.col("value")).alias("min7"),
F.max(F.col("value")).alias("max7"),
F.stddev(F.col("value")).alias("stddev7"),
F.count(F.col("value")).alias("cnt7")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = df.join(gb7, ["name", "date"], how="left")
fwindow14 = F.window(
F.col("date"),
windowDuration="14 days",
slideDuration="1 days",
).alias("window")
gb14 = (
df
.groupBy(F.col("name"), fwindow14)
.agg(
F.sum(F.col("value")).alias("sum14"),
F.avg(F.col("value")).alias("mean14"),
F.min(F.col("value")).alias("min14"),
F.max(F.col("value")).alias("max14"),
F.stddev(F.col("value")).alias("stddev14"),
F.count(F.col("value")).alias("cnt14")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = window_function_example.join(gb14, ["name", "date"], how="left")
window_function_example.orderBy("name", "date").show(truncate=True)
SQL Diagram
Option 2 from Original Question (Higher Order Functions applied to Window()
)
window7 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-7 * 60 * 60 * 24 + 1, Window.currentRow)
)
window14 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-14 * 60 * 60 * 24 + 1, Window.currentRow)
)
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window7))
.withColumn("sum7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max7", F.array_max(F.col("value_array")))
.withColumn("min7", F.array_min(F.col("value_array")))
.withColumn("std7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean7)*(x - mean7), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count7", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example = (
hof_example
.withColumn("value_array", F.collect_list(F.col("value")).over(window14))
.withColumn("sum14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max14", F.array_max(F.col("value_array")))
.withColumn("min14", F.array_min(F.col("value_array")))
.withColumn("std14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean14)*(x - mean14), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count14", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example.show(truncate=True)
SQL Diagram Snippet
How to create a map column with rolling window aggregates per each key
Assuming that the state
, ds
, ds_num
and region
columns in your source dataframe are unique (they can be seen as primary key), this snippet would do the work:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
days = lambda i: i * 86400
df.alias('a').join(df.alias('b'), 'state') \
.where((F.col('a.ds_num') - F.col('b.ds_num')).between(0, days(27))) \
.select('state', 'a.ds', 'a.ds_num', 'b.region', 'b.count') \
.dropDuplicates() \
.groupBy('state', 'ds', 'ds_num', 'region').sum('count') \
.groupBy('state', 'ds', 'ds_num') \
.agg(F.map_from_entries(F.collect_list(F.struct("region", "sum(count)"))).alias("count_rolling_4W")) \
.orderBy('a.ds') \
.show(truncate=False)
Results:
+-----+----------+----------+------------------------------------+
|state|ds |ds_num |count_rolling_4W |
+-----+----------+----------+------------------------------------+
|AK |2022-05-02|1651449600|{US -> 3} |
|AK |2022-05-03|1651536000|{US -> 3, ON -> 1} |
|AK |2022-05-04|1651622400|{US -> 3, ON -> 1, CO -> 1} |
|AK |2022-05-06|1651795200|{US -> 8, ON -> 1, CO -> 1, AK -> 1}|
+-----+----------+----------+------------------------------------+
It may seem complex, but it's just a windowing rewritten as a cross join for better control over the results.
How to get aggregate by hour including missing hours and add cumulative sum?
You could do it by first creating a table with hours and then joining it with the rest of data.
Setup:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
['GroupId', 'Event_time', 'Event_name', 'Event_value']
)
df = df.withColumn('Date', F.col('Event_time').cast('date'))
The following creates a dataframe with hours:
min_date = df.agg(F.min('Date')).head()[0]
max_date = df.agg(F.max('Date')).head()[0]
df_hours = df.select(
'GroupId',
'Event_name',
F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
).distinct()
Then, aggregating your first table by hours:
df_agg = (df
.groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
.agg(F.sum('Event_value').alias('Count'))
)
Joining them both together:
df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')
Adding column agg_count
and others:
w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
df2 = (df_joined
.select(
'GroupId',
'Event_name',
F.to_date('date_hour').alias('Date'),
F.date_format('date_hour', 'HH').alias('Hour'),
'Count',
F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
)
)
Result:
+-------+----------+----------+----+-----+---------+
|GroupId|Event_name| Date|Hour|Count|agg_count|
+-------+----------+----------+----+-----+---------+
| xx| eventA|2011-08-15| 00| null| 0|
| xx| eventA|2011-08-15| 01| null| 0|
| xx| eventA|2011-08-15| 02| null| 0|
| xx| eventA|2011-08-15| 03| null| 0|
| xx| eventA|2011-08-15| 04| null| 0|
| xx| eventA|2011-08-15| 05| null| 0|
| xx| eventA|2011-08-15| 06| null| 0|
| xx| eventA|2011-08-15| 07| null| 0|
| xx| eventA|2011-08-15| 08| null| 0|
| xx| eventA|2011-08-15| 09| null| 0|
| xx| eventA|2011-08-15| 10| null| 0|
| xx| eventA|2011-08-15| 11| null| 0|
| xx| eventA|2011-08-15| 12| null| 0|
| xx| eventA|2011-08-15| 13| null| 0|
| xx| eventA|2011-08-15| 14| 3| 3|
| xx| eventA|2011-08-15| 15| null| 3|
| xx| eventA|2011-08-15| 16| 100| 103|
| xx| eventA|2011-08-15| 17| null| 103|
| xx| eventA|2011-08-15| 18| null| 103|
| xx| eventA|2011-08-15| 19| null| 103|
| xx| eventA|2011-08-15| 20| null| 103|
| xx| eventA|2011-08-15| 21| null| 103|
| xx| eventA|2011-08-15| 22| null| 103|
| xx| eventA|2011-08-15| 23| null| 103|
| yy| eventA|2011-08-15| 00| null| 0|
| yy| eventA|2011-08-15| 01| null| 0|
| yy| eventA|2011-08-15| 02| null| 0|
| yy| eventA|2011-08-15| 03| null| 0|
| yy| eventA|2011-08-15| 04| null| 0|
| yy| eventA|2011-08-15| 05| null| 0|
| yy| eventA|2011-08-15| 06| null| 0|
| yy| eventA|2011-08-15| 07| null| 0|
| yy| eventA|2011-08-15| 08| null| 0|
| yy| eventA|2011-08-15| 09| null| 0|
| yy| eventA|2011-08-15| 10| null| 0|
| yy| eventA|2011-08-15| 11| 2| 2|
| yy| eventA|2011-08-15| 12| 1| 3|
| yy| eventA|2011-08-15| 13| 3| 6|
| yy| eventA|2011-08-15| 14| null| 6|
| yy| eventA|2011-08-15| 15| null| 6|
| yy| eventA|2011-08-15| 16| null| 6|
| yy| eventA|2011-08-15| 17| null| 6|
| yy| eventA|2011-08-15| 18| null| 6|
| yy| eventA|2011-08-15| 19| null| 6|
| yy| eventA|2011-08-15| 20| null| 6|
| yy| eventA|2011-08-15| 21| null| 6|
| yy| eventA|2011-08-15| 22| null| 6|
| yy| eventA|2011-08-15| 23| null| 6|
+-------+----------+----------+----+-----+---------+
Pyspark: aggregate mode (most frequent) value in a rolling window
You can use collect_list
function to get the stations from last 3 rows using the defined window, then for each resulting array calculate the most frequent element.
To get the most frequent element on the array, you can explode it then group by and count as in linked post your already saw or use some UDF like this:
import pyspark.sql.functions as F
test_df.withColumn(
"rolling_mode_station",
F.collect_list("station").over(rolling_w)
).withColumn(
"rolling_mode_station",
F.udf(lambda x: max(set(x), key=x.count))(F.col("rolling_mode_station"))
).show()
#+------+----------+---------+--------------------+
#|device|start_time| station|rolling_mode_station|
#+------+----------+---------+--------------------+
#|Python| 1|station_1| station_1|
#|Python| 2|station_2| station_1|
#|Python| 3|station_1| station_1|
#|Python| 4|station_2| station_2|
#|Python| 5|station_2| station_2|
#|Python| 6| null| station_2|
#+------+----------+---------+--------------------+
Group by and aggregate values from multiple time periods in python and pyspark
In pyspark you can use collect_list
over a Window with frame boundaries specified as rows between [-n, currentRow]
,
to get the n
consecutive months and also calculate a running sum of amt
over this same Window. Finally, filter only rows with size of months equals n + 1
:
from pyspark.sql import functions as F, Window
# create spark df from pandas dataframe
sdf = spark.createDataFrame(df)
n = 2
w = Window.partitionBy("id").orderBy("month").rowsBetween(-n, Window.currentRow)
result = sdf.withColumn("months", F.collect_list("month").over(w)) \
.withColumn("amt", F.sum("amt").over(w)) \
.filter(F.size("months") == n + 1) \
.select(
F.col("id"),
F.element_at(F.col("months"), 1).alias("month_start"),
F.element_at(F.col("months"), -1).alias("month_end"),
F.col("amt")
)
result.show()
#+---+-----------+---------+---+
#| id|month_start|month_end|amt|
#+---+-----------+---------+---+
#| A| 2020-01| 2020-03| 9|
#| A| 2020-02| 2020-04| 12|
#| B| 2020-01| 2020-03| 6|
#| B| 2020-02| 2020-04| 9|
#+---+-----------+---------+---+
How to do a rolling sum in PySpark?
It seems you're trying to do a rolling sum of A. You can do a sum over a window, e.g.
from pyspark.sql import functions as F, Window
df2 = df.withColumn('B', F.sum('A').over(Window.orderBy('ordering_col')))
But you would need a column to order by, otherwise the "previous record" is not well-defined because Spark dataframes are unordered.
Aggregate over time windows on a partitioned/grouped by window
First, you need to assign a group id :
from pyspark.sql import functions as F, Window as W
df2 = (
df.withColumn(
"id",
F.when(
F.lag("record_type").over(W.partitionBy("ID1", "ID2").orderBy("date"))
== F.col("record_type"),
0,
).otherwise(1),
)
.withColumn("id", F.sum("id").over(W.partitionBy("ID1", "ID2").orderBy("date")))
)
df2.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital| null| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00| null|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00| null|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital| null| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| null| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+
Then, you value the columns where there are nulls:
df3 = df2.withColumn(
"name",
F.coalesce(
F.col("name"),
F.max("name").over(W.partitionBy("ID1", "ID2", "id"))
)
).withColumn(
"type",
F.coalesce(
F.col("type"),
F.max("type").over(W.partitionBy("ID1", "ID2", "id"))
)
)
df3.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+
Finally, you select the "last" line for each tuple("ID1", "ID2", "id")
:
df4 = df3.withColumn(
"row",
F.row_number().over(W.partitionBy("ID1", "ID2", "id").orderBy(F.col("date").desc()))
).where("row=1").drop("row", "id")
df4.show()
+-------+----+-------------------+--------+----------+-----------+
| ID1| ID2| date| type| name|record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT|
+-------+----+-------------------+--------+----------+-----------+
How to group by time interval in Spark SQL
Spark >= 2.0
You can use window
(not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+
Spark < 2.0
Lets start with example data:
import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0
val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")
I assume that event is identified by KEY
. If this is not the case you can adjust GROUP BY
/ PARTITION BY
clauses according to your requirements.
If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round
import org.apache.spark.sql.functions.{round, sum}
// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")
// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")
df.groupBy($"KEY", interval).sum("metric")
// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+
If you're interested in a window relative to the current row use window functions:
import org.apache.spark.sql.expressions.Window
// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+
For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext
to make it work.
Related Topics
Multiple Array_Agg() Calls in a Single Query
In SQL How to Get the Maximum Value for an Integer
How to Search SQL Column Containing JSON Array
Adding Primary Key to SQL View
Split String into Rows Oracle SQL
Rolling 90 Days Active Users in Bigquery, Improving Preformance (Dau/Mau/Wau)
How to Get Rid of "Error 1329: No Data - Zero Rows Fetched, Selected, or Processed"
Grant Select Permission on a View, But Not on Underlying Objects
How to Search All Columns in a Table
Looping Through Column Names with Dynamic SQL
The Ole Db Provider "Microsoft.Ace.Oledb.12.0" for Linked Server "(Null)"
SQL Query to Show Gaps Between Multiple Date Ranges
How to Quickly Edit Values in Table in SQL Server Management Studio