How to Aggregate Over Rolling Time Window with Groups in Spark

How to aggregate over rolling time window with groups in Spark

Revised answer:

You can use a simple window functions trick here. A bunch of imports:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window

window definition:

w = Window.partitionBy("group_by").orderBy("date")

Cast date to DateType:

df_ = df.withColumn("date", col("date").cast("date"))

Define following expressions:

# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))

# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")

# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")

Add subgroup expression to the table:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
| group1| 0| 5.0|
| group2| 0| 20.0|
| group2| 1| 8.0|
+--------+--------+------------+

first is not meaningful with aggregations, but if column is monotonically increasing you can use min. Otherwise you'll have to use window functions as well.

Tested using Spark 2.1. May require subqueries and Window instance when used with earlier Spark release.

The original answer (not relevant in the specified scope)

Since Spark 2.0 you should be able to use a window function:

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

from pyspark.sql.functions import window

df.groupBy(window("date", windowDuration="30 days")).count()

but you can see from the result,

+---------------------------------------------+-----+
|window |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1 |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2 |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1 |
+---------------------------------------------+-----+

you'll have to be a bit careful when it comes to timezones.

pyspark high performance rolling/window aggregations on timeseries data

NOTE: I'm going to mark this as the accepted answer for the time being. If someone finds a faster/better please notify me and I'll switch it!

EDIT Clarification: The calculations shown here assume input dataframes pre-processed to the day day-level with day-level rolling calculations

After I posted the question I tested several different options on my real dataset (and got some input from coworkers) and I believe the fastest way to do this (for large datasets) uses pyspark.sql.functions.window() with groupby().agg instead of pyspark.sql.window.Window().

A similar answer can be found here

The steps to make this work are:

  1. sort dataframe by name and date (in example dataframe)
  2. .persist() dataframe
  3. Compute grouped dataframe using F.window() and join back to df for every window required.

The best/easiest way to see this in action is on the SQL diagram in the Spark GUI thing. When Window() is used, the SQL execution is totally sequential. However, when F.window() is used, the diagram shows parallelization! NOTE: on small datasets Window() still seems faster.

In my tests with real data on 7-day windows, Window() was 3-5x slower than F.window(). The only downside is F.window() is a bit less convenient to use. I've shown some code and screenshots below for reference

Fastest Solution Found (F.window() with groupby.agg())

# this turned out to be super important for tricking spark into parallelizing things
df = df.orderBy("name", "date")
df.persist()

fwindow7 = F.window(
F.col("date"),
windowDuration="7 days",
slideDuration="1 days",
).alias("window")

gb7 = (
df
.groupBy(F.col("name"), fwindow7)
.agg(
F.sum(F.col("value")).alias("sum7"),
F.avg(F.col("value")).alias("mean7"),
F.min(F.col("value")).alias("min7"),
F.max(F.col("value")).alias("max7"),
F.stddev(F.col("value")).alias("stddev7"),
F.count(F.col("value")).alias("cnt7")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = df.join(gb7, ["name", "date"], how="left")

fwindow14 = F.window(
F.col("date"),
windowDuration="14 days",
slideDuration="1 days",
).alias("window")

gb14 = (
df
.groupBy(F.col("name"), fwindow14)
.agg(
F.sum(F.col("value")).alias("sum14"),
F.avg(F.col("value")).alias("mean14"),
F.min(F.col("value")).alias("min14"),
F.max(F.col("value")).alias("max14"),
F.stddev(F.col("value")).alias("stddev14"),
F.count(F.col("value")).alias("cnt14")
)
.withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
.drop("window")
)
window_function_example = window_function_example.join(gb14, ["name", "date"], how="left")
window_function_example.orderBy("name", "date").show(truncate=True)

SQL Diagram

Group By

Option 2 from Original Question (Higher Order Functions applied to Window())

window7 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-7 * 60 * 60 * 24 + 1, Window.currentRow)
)
window14 = (
Window
.partitionBy(F.col("name"))
.orderBy(F.col("date").cast("timestamp").cast("long"))
.rangeBetween(-14 * 60 * 60 * 24 + 1, Window.currentRow)
)
hof_example = (
df
.withColumn("value_array", F.collect_list(F.col("value")).over(window7))
.withColumn("sum7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max7", F.array_max(F.col("value_array")))
.withColumn("min7", F.array_min(F.col("value_array")))
.withColumn("std7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean7)*(x - mean7), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count7", F.size(F.col("value_array")))
.drop("value_array")
)
hof_example = (
hof_example
.withColumn("value_array", F.collect_list(F.col("value")).over(window14))
.withColumn("sum14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
.withColumn("mean14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
.withColumn("max14", F.array_max(F.col("value_array")))
.withColumn("min14", F.array_min(F.col("value_array")))
.withColumn("std14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean14)*(x - mean14), acc -> sqrt(acc / (size(value_array) - 1)))'))
.withColumn("count14", F.size(F.col("value_array")))
.drop("value_array")
)

hof_example.show(truncate=True)

SQL Diagram Snippet

Higher Order Functions

How to create a map column with rolling window aggregates per each key

Assuming that the state, ds, ds_num and region columns in your source dataframe are unique (they can be seen as primary key), this snippet would do the work:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

days = lambda i: i * 86400

df.alias('a').join(df.alias('b'), 'state') \
.where((F.col('a.ds_num') - F.col('b.ds_num')).between(0, days(27))) \
.select('state', 'a.ds', 'a.ds_num', 'b.region', 'b.count') \
.dropDuplicates() \
.groupBy('state', 'ds', 'ds_num', 'region').sum('count') \
.groupBy('state', 'ds', 'ds_num') \
.agg(F.map_from_entries(F.collect_list(F.struct("region", "sum(count)"))).alias("count_rolling_4W")) \
.orderBy('a.ds') \
.show(truncate=False)

Results:

+-----+----------+----------+------------------------------------+
|state|ds |ds_num |count_rolling_4W |
+-----+----------+----------+------------------------------------+
|AK |2022-05-02|1651449600|{US -> 3} |
|AK |2022-05-03|1651536000|{US -> 3, ON -> 1} |
|AK |2022-05-04|1651622400|{US -> 3, ON -> 1, CO -> 1} |
|AK |2022-05-06|1651795200|{US -> 8, ON -> 1, CO -> 1, AK -> 1}|
+-----+----------+----------+------------------------------------+

It may seem complex, but it's just a windowing rewritten as a cross join for better control over the results.

How to get aggregate by hour including missing hours and add cumulative sum?

You could do it by first creating a table with hours and then joining it with the rest of data.

Setup:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
['GroupId', 'Event_time', 'Event_name', 'Event_value']
)
df = df.withColumn('Date', F.col('Event_time').cast('date'))

The following creates a dataframe with hours:

min_date = df.agg(F.min('Date')).head()[0]
max_date = df.agg(F.max('Date')).head()[0]
df_hours = df.select(
'GroupId',
'Event_name',
F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
).distinct()

Then, aggregating your first table by hours:

df_agg = (df
.groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
.agg(F.sum('Event_value').alias('Count'))
)

Joining them both together:

df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')

Adding column agg_count and others:

w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
df2 = (df_joined
.select(
'GroupId',
'Event_name',
F.to_date('date_hour').alias('Date'),
F.date_format('date_hour', 'HH').alias('Hour'),
'Count',
F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
)
)

Result:

+-------+----------+----------+----+-----+---------+
|GroupId|Event_name| Date|Hour|Count|agg_count|
+-------+----------+----------+----+-----+---------+
| xx| eventA|2011-08-15| 00| null| 0|
| xx| eventA|2011-08-15| 01| null| 0|
| xx| eventA|2011-08-15| 02| null| 0|
| xx| eventA|2011-08-15| 03| null| 0|
| xx| eventA|2011-08-15| 04| null| 0|
| xx| eventA|2011-08-15| 05| null| 0|
| xx| eventA|2011-08-15| 06| null| 0|
| xx| eventA|2011-08-15| 07| null| 0|
| xx| eventA|2011-08-15| 08| null| 0|
| xx| eventA|2011-08-15| 09| null| 0|
| xx| eventA|2011-08-15| 10| null| 0|
| xx| eventA|2011-08-15| 11| null| 0|
| xx| eventA|2011-08-15| 12| null| 0|
| xx| eventA|2011-08-15| 13| null| 0|
| xx| eventA|2011-08-15| 14| 3| 3|
| xx| eventA|2011-08-15| 15| null| 3|
| xx| eventA|2011-08-15| 16| 100| 103|
| xx| eventA|2011-08-15| 17| null| 103|
| xx| eventA|2011-08-15| 18| null| 103|
| xx| eventA|2011-08-15| 19| null| 103|
| xx| eventA|2011-08-15| 20| null| 103|
| xx| eventA|2011-08-15| 21| null| 103|
| xx| eventA|2011-08-15| 22| null| 103|
| xx| eventA|2011-08-15| 23| null| 103|
| yy| eventA|2011-08-15| 00| null| 0|
| yy| eventA|2011-08-15| 01| null| 0|
| yy| eventA|2011-08-15| 02| null| 0|
| yy| eventA|2011-08-15| 03| null| 0|
| yy| eventA|2011-08-15| 04| null| 0|
| yy| eventA|2011-08-15| 05| null| 0|
| yy| eventA|2011-08-15| 06| null| 0|
| yy| eventA|2011-08-15| 07| null| 0|
| yy| eventA|2011-08-15| 08| null| 0|
| yy| eventA|2011-08-15| 09| null| 0|
| yy| eventA|2011-08-15| 10| null| 0|
| yy| eventA|2011-08-15| 11| 2| 2|
| yy| eventA|2011-08-15| 12| 1| 3|
| yy| eventA|2011-08-15| 13| 3| 6|
| yy| eventA|2011-08-15| 14| null| 6|
| yy| eventA|2011-08-15| 15| null| 6|
| yy| eventA|2011-08-15| 16| null| 6|
| yy| eventA|2011-08-15| 17| null| 6|
| yy| eventA|2011-08-15| 18| null| 6|
| yy| eventA|2011-08-15| 19| null| 6|
| yy| eventA|2011-08-15| 20| null| 6|
| yy| eventA|2011-08-15| 21| null| 6|
| yy| eventA|2011-08-15| 22| null| 6|
| yy| eventA|2011-08-15| 23| null| 6|
+-------+----------+----------+----+-----+---------+

Pyspark: aggregate mode (most frequent) value in a rolling window

You can use collect_list function to get the stations from last 3 rows using the defined window, then for each resulting array calculate the most frequent element.

To get the most frequent element on the array, you can explode it then group by and count as in linked post your already saw or use some UDF like this:

import pyspark.sql.functions as F

test_df.withColumn(
"rolling_mode_station",
F.collect_list("station").over(rolling_w)
).withColumn(
"rolling_mode_station",
F.udf(lambda x: max(set(x), key=x.count))(F.col("rolling_mode_station"))
).show()

#+------+----------+---------+--------------------+
#|device|start_time| station|rolling_mode_station|
#+------+----------+---------+--------------------+
#|Python| 1|station_1| station_1|
#|Python| 2|station_2| station_1|
#|Python| 3|station_1| station_1|
#|Python| 4|station_2| station_2|
#|Python| 5|station_2| station_2|
#|Python| 6| null| station_2|
#+------+----------+---------+--------------------+

Group by and aggregate values from multiple time periods in python and pyspark

In pyspark you can use collect_list over a Window with frame boundaries specified as rows between [-n, currentRow],
to get the n consecutive months and also calculate a running sum of amt over this same Window. Finally, filter only rows with size of months equals n + 1:

from pyspark.sql import functions as F, Window

# create spark df from pandas dataframe
sdf = spark.createDataFrame(df)

n = 2
w = Window.partitionBy("id").orderBy("month").rowsBetween(-n, Window.currentRow)

result = sdf.withColumn("months", F.collect_list("month").over(w)) \
.withColumn("amt", F.sum("amt").over(w)) \
.filter(F.size("months") == n + 1) \
.select(
F.col("id"),
F.element_at(F.col("months"), 1).alias("month_start"),
F.element_at(F.col("months"), -1).alias("month_end"),
F.col("amt")
)

result.show()
#+---+-----------+---------+---+
#| id|month_start|month_end|amt|
#+---+-----------+---------+---+
#| A| 2020-01| 2020-03| 9|
#| A| 2020-02| 2020-04| 12|
#| B| 2020-01| 2020-03| 6|
#| B| 2020-02| 2020-04| 9|
#+---+-----------+---------+---+

How to do a rolling sum in PySpark?

It seems you're trying to do a rolling sum of A. You can do a sum over a window, e.g.

from pyspark.sql import functions as F, Window

df2 = df.withColumn('B', F.sum('A').over(Window.orderBy('ordering_col')))

But you would need a column to order by, otherwise the "previous record" is not well-defined because Spark dataframes are unordered.

Aggregate over time windows on a partitioned/grouped by window

First, you need to assign a group id :

from pyspark.sql import functions as F, Window as W

df2 = (
df.withColumn(
"id",
F.when(
F.lag("record_type").over(W.partitionBy("ID1", "ID2").orderBy("date"))
== F.col("record_type"),
0,
).otherwise(1),
)
.withColumn("id", F.sum("id").over(W.partitionBy("ID1", "ID2").orderBy("date")))
)

df2.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital| null| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00| null|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00| null|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital| null| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| null| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+

Then, you value the columns where there are nulls:

df3 = df2.withColumn(
"name",
F.coalesce(
F.col("name"),
F.max("name").over(W.partitionBy("ID1", "ID2", "id"))
)
).withColumn(
"type",
F.coalesce(
F.col("type"),
F.max("type").over(W.partitionBy("ID1", "ID2", "id"))
)
)

df3.show()
+-------+----+-------------------+--------+----------+-----------+---+
| ID1| ID2| date| type| name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN| 1|
|ACC.PXP|7246|2018-11-11T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT| 2|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN| 3|
|ACC.PXP|7246|2020-12-04T15:00:00| Care| Betel| OUT| 4|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT| 4|
+-------+----+-------------------+--------+----------+-----------+---+

Finally, you select the "last" line for each tuple("ID1", "ID2", "id"):

df4 = df3.withColumn(
"row",
F.row_number().over(W.partitionBy("ID1", "ID2", "id").orderBy(F.col("date").desc()))
).where("row=1").drop("row", "id")

df4.show()
+-------+----+-------------------+--------+----------+-----------+
| ID1| ID2| date| type| name|record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation| IN|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington| OUT|
|ACC.PXP|7246|2018-11-15T04:00:00| Home| null| IN|
|ACC.PXP|7246|2020-13-04T15:00:00| Care| Betel| OUT|
+-------+----+-------------------+--------+----------+-----------+

How to group by time interval in Spark SQL

Spark >= 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+

Spark < 2.0

Lets start with example data:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+

If you're interested in a window relative to the current row use window functions:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.



Related Topics



Leave a reply



Submit