How to Group by Time Interval in Spark SQL

How to group by time interval in Spark SQL

Spark >= 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+

Spark < 2.0

Lets start with example data:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+

If you're interested in a window relative to the current row use window functions:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

How do I group records that are within a specific time interval using Spark Scala or sql?

org.apache.spark.sql.functions provides overloaded window functions as below.

1. window(timeColumn: Column, windowDuration: String) : Generates tumbling time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

The windows will look like:

  {{{
09:00:00-09:01:00
09:01:00-09:02:00
09:02:00-09:03:00 ...
}}}

2. window((timeColumn: Column, windowDuration: String, slideDuration: String):
Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).
slideDuration Parameter specifying the sliding interval of the window, e.g. 1 minute.A new window will be generated every slideDuration. Must be less than or equal to the windowDuration.

The windows will look like:

{{{
09:00:00-09:01:00
09:00:10-09:01:10
09:00:20-09:01:20 ...
}}}

3. window((timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

The windows will look like:

{{{
09:00:05-09:01:05
09:00:15-09:01:15
09:00:25-09:01:25 ...
}}}

For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. This is the perfect overloaded window function which suites your requirement.

Please find working code as below.

import org.apache.spark.sql.SparkSession

object SparkWindowTest extends App {

val spark = SparkSession
.builder()
.master("local")
.appName("File_Streaming")
.getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

//Prepare Test Data
val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
(3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
(1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
.toDF("ID", "Volume", "TimeString")

df.show()
df.printSchema()

+---+------+-------------------+
| ID|Volume| TimeString|
+---+------+-------------------+
| 1| 10|2019-02-17 12:00:49|
| 2| 20|2019-02-17 11:10:46|
| 3| 30|2019-02-17 13:23:34|
| 2| 50|2019-02-17 11:10:30|
| 1| 40|2019-02-17 12:01:02|
| 1| 60|2019-02-17 12:01:57|
+---+------+-------------------+

root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)

//Converted String Timestamp into Timestamp
val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))

//Dropped String Timestamp from DF
val modifiedDF1 = modifiedDF.drop("TimeString")

modifiedDF.show(false)
modifiedDF.printSchema()

+---+------+-------------------+-------------------+
|ID |Volume|TimeString |Time |
+---+------+-------------------+-------------------+
|1 |10 |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|2019-02-17 12:01:57|
+---+------+-------------------+-------------------+

root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)
|-- Time: timestamp (nullable = true)

modifiedDF1.show(false)
modifiedDF1.printSchema()

+---+------+-------------------+
|ID |Volume|Time |
+---+------+-------------------+
|1 |10 |2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|
+---+------+-------------------+

root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- Time: timestamp (nullable = true)

//Main logic
val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")

//Renamed all columns of DF.
val newNames = Seq("ID", "WINDOW", "VOLUME")
val finalDF = modifiedDF2.toDF(newNames: _*)

finalDF.show(false)

+---+---------------------------------------------+------+
|ID |WINDOW |VOLUME|
+---+---------------------------------------------+------+
|2 |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50 |
|1 |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60 |
|
|2 |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20 |
+---+---------------------------------------------+------+

}

Grouping data based on time interval in pyspark

You can use Udfs to convert time to range and then do group by

from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
def getInterval(time):
start = int(time.split(":")[0])
return str(start)+"-"+str(start+1)+" "+time.split(" ")[1]

getIntervalUdf = udf(getInterval,StringType())

spark = SparkSession.builder.appName("appName").getOrCreate()
df = spark.read.csv("emp",sep=",",header=True)
df.show()
df = df.withColumn("Interval",getIntervalUdf("Server_Time"))
df.show()
df = df.groupby("Server_Date","Interval","Source").count()
df.show()

Output

+-----------+--------------+------------------+-------------+-------------+
| EventID | AccessReason | Source | Server_Date | Server_Time |
+-----------+--------------+------------------+-------------+-------------+
| 847495004 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 7:25:52 AM |
| 847506432 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 8:53:38 AM |
| 847512725 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 10:18:50 AM |
| 847512768 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 10:19:32 AM |
| 847513357 | Granted | ORSB_GND_GYM_OUT | 10/1/2016 | 10:25:36 AM |
| 847513614 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 10:28:08 AM |
| 847515838 | Granted | ORSB_GND_GYM_OUT | 10/1/2016 | 10:57:41 AM |
| 847522522 | Granted | ORSB_GND_GYM_IN | 10/1/2016 | 11:57:10 AM |
+-----------+--------------+------------------+-------------+-------------+

+---------+------------+----------------+-----------+-----------+--------+
| EventID|AccessReason| Source|Server_Date|Server_Time|Interval|
+---------+------------+----------------+-----------+-----------+--------+
|847495004| Granted| ORSB_GND_GYM_IN| 10/1/2016| 7:25:52 AM| 7-8 AM|
|847506432| Granted| ORSB_GND_GYM_IN| 10/1/2016| 8:53:38 AM| 8-9 AM|
|847512725| Granted| ORSB_GND_GYM_IN| 10/1/2016|10:18:50 AM|10-11 AM|
|847512768| Granted| ORSB_GND_GYM_IN| 10/1/2016|10:19:32 AM|10-11 AM|
|847513357| Granted|ORSB_GND_GYM_OUT| 10/1/2016|10:25:36 AM|10-11 AM|
|847513614| Granted| ORSB_GND_GYM_IN| 10/1/2016|10:28:08 AM|10-11 AM|
|847515838| Granted|ORSB_GND_GYM_OUT| 10/1/2016|10:57:41 AM|10-11 AM|
|847522522| Granted| ORSB_GND_GYM_IN| 10/1/2016|11:57:10 AM|11-12 AM|
+---------+------------+----------------+-----------+-----------+--------+

+-----------+--------+----------------+-----+
|Server_Date|Interval| Source|count|
+-----------+--------+----------------+-----+
| 10/1/2016|10-11 AM| ORSB_GND_GYM_IN| 3|
| 10/1/2016| 8-9 AM| ORSB_GND_GYM_IN| 1|
| 10/1/2016|10-11 AM|ORSB_GND_GYM_OUT| 2|
| 10/1/2016|11-12 AM| ORSB_GND_GYM_IN| 1|
| 10/1/2016| 7-8 AM| ORSB_GND_GYM_IN| 1|
+-----------+--------+----------------+-----+

Group Count based on Specific Time Window for Spark Scala

By converting your timestamp value to the respective 00:00:00 - 11:59:59 or 12:00:00 - 23:59:59 time period, you may easily perform a group by on the resulting data. Using the HOUR method to retrieve the hour of day, you may use this to determine which period to assign each row in. Moreover combining this with CONCAT to create the new time period and to_date to convert your timestamp to a date you may create this time period value.

You may try the following approaches:

Using scala api

outputDf = 
df.withColumn(
"TimePeriod",
concat(
to_date("Timestamp"),
when(
hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.withColumnRenamed("TimePeriod","Timestamp")

outputDf.show(truncate=false)

Using pyspark api

from pyspark.sql import functions as F

output_df = (
df.withColumn(
"TimePeriod",
F.concat(
F.to_date("Timestamp"),
F.when(
F.hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.withColumnRenamed("TimePeriod","Timestamp")
)
output_df.show(truncate=False)

Using spark sql

  1. Ensure your dataframe is available
df.createOrReplaceTempView("input_df2")

  1. Run the following sql on the spark session
outputDf = sparkSession.sql("<insert sql below here>")

SQL:

    SELECT
CONCAT(
to_date(Timestamp),
CASE
WHEN HOUR(Timestamp) < 12 THEN ' 00:00:00 - 11:59:59'
ELSE ' 12:00:00 - 23:59:59'
END
) as Timestamp,
User,
COUNT(1) as COUNT
FROM
input_df2
GROUP BY 1,2

Output

+------------------------------+----+-----+
|Timestamp |User|COUNT|
+------------------------------+----+-----+
|2021-11-08 00:00:00 - 11:59:59|B |2 |
|2021-11-08 00:00:00 - 11:59:59|A |2 |
|2021-11-09 00:00:00 - 11:59:59|B |1 |
|2021-11-08 12:00:00 - 23:59:59|A |1 |
+------------------------------+----+-----+

NB: Your timestamp column should be of the timestamp data type which can be verified using df.printSchema(). If your timestamp column is a string type you may use to_timestamp to convert it to a timestamp type eg to_timestamp(Timestamp,"dd/MM/yyyy H:m:s")

Edit 1 : Question Changed By Op

The following update will provide the count of users on all possible time periods. A cross-join of all possible user-time-periods with the count of actual users for each time period was used below.

Using scala api

timeperiods = 
df.select(to_date("Timestamp").alias("TimestampDate"))
.distinct()
.withColumn("TimePeriodLower",concat("TimestampDate",lit(" 00:00:00 - 11:59:59")))
.withColumn("TimePeriodUpper",concat("TimestampDate",lit(" 12:00:00 - 23:59:59")));

timeperiods =
timeperiods.select("TimePeriodLower")
.union(
timeperiods.select("TimePeriodUpper")
)
.withColumnRenamed("TimePeriodLower","TimePeriod")
.crossJoin(
df.select("User").distinct()
);

outputDf =
df.withColumn(
"TimePeriod",
concat(
to_date("Timestamp"),
when(
hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.join(
timeperiods,
Seq("TimePeriod","User"),
"right"
)
.withColumn("count",coalesce("count",lit(0)))
.drop("Timestamp")
.withColumnRenamed("TimePeriod","Timestamp");


Using pyspark api

from pyspark.sql import functions as F

timeperiods = (
df.select(F.to_date("Timestamp").alias("TimestampDate"))
.distinct()
.withColumn("TimePeriodLower",F.concat("TimestampDate",F.lit(" 00:00:00 - 11:59:59")))
.withColumn("TimePeriodUpper",F.concat("TimestampDate",F.lit(" 12:00:00 - 23:59:59")))
)

timeperiods = (
timeperiods.select("TimePeriodLower")
.union(
timeperiods.select("TimePeriodUpper")
)
.withColumnRenamed("TimePeriodLower","TimePeriod")
.crossJoin(
df.select("User").distinct()
)
)

outputDf = (
df.withColumn(
"TimePeriod",
F.concat(
F.to_date("Timestamp"),
F.when(
F.hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.join(
timeperiods,
["TimePeriod","User"],
"right"
)
.withColumn("count",F.coalesce("count",F.lit(0)))
.drop("Timestamp")
.withColumnRenamed("TimePeriod","Timestamp")

)

Using spark sql

  1. Ensure your dataframe is available
df.createOrReplaceTempView("input_df2")

  1. Run the following sql on the spark session
outputDf = sparkSession.sql("<insert sql below here>")

SQL:

WITH unique_users AS (
SELECT DISTINCT
User
FROM
input_df2
),
unique_dates AS (
SELECT DISTINCT
to_date(Timestamp) as TimestampDate
FROM
input_df2
),
timeperiods as (
SELECT CONCAT(TimestampDate,' 00:00:00 - 11:59:59') as TimePeriod FROM unique_dates
UNION ALL
SELECT CONCAT(TimestampDate,' 12:00:00 - 23:59:59') as TimePeriod FROM unique_dates
),
timeperiod_users as (
SELECT * FROM timeperiods CROSS JOIN unique_users
),
timeperiod_user_counts as (
SELECT
CONCAT(
to_date(Timestamp),
CASE
WHEN HOUR(Timestamp) < 12 THEN ' 00:00:00 - 11:59:59'
ELSE ' 12:00:00 - 23:59:59'
END
) as Timeperiod,
User,
COUNT(1) as COUNT
FROM
input_df2
GROUP BY 1,2
)
SELECT
tu.TimePeriod as Timestamp,
tu.User,
COALESCE(tuc.COUNT,0) as COUNT
FROM
timeperiod_user_counts tuc
RIGHT JOIN
timeperiod_users tu ON tu.TimePeriod = tuc.TimePeriod AND
tu.User = tuc.User

Outputs

outputDf.orderBy("Timestamp","User").show()
+------------------------------+----+-----+
|Timestamp |User|count|
+------------------------------+----+-----+
|2021-11-08 00:00:00 - 11:59:59|A |2 |
|2021-11-08 00:00:00 - 11:59:59|B |2 |
|2021-11-08 12:00:00 - 23:59:59|A |1 |
|2021-11-08 12:00:00 - 23:59:59|B |0 |
|2021-11-09 00:00:00 - 11:59:59|A |0 |
|2021-11-09 00:00:00 - 11:59:59|B |1 |
|2021-11-09 12:00:00 - 23:59:59|A |0 |
|2021-11-09 12:00:00 - 23:59:59|B |0 |
+------------------------------+----+-----+
timeperiods.show()
+------------------------------+----+
|TimePeriod |User|
+------------------------------+----+
|2021-11-08 00:00:00 - 11:59:59|B |
|2021-11-08 00:00:00 - 11:59:59|A |
|2021-11-09 00:00:00 - 11:59:59|B |
|2021-11-09 00:00:00 - 11:59:59|A |
|2021-11-08 12:00:00 - 23:59:59|B |
|2021-11-08 12:00:00 - 23:59:59|A |
|2021-11-09 12:00:00 - 23:59:59|B |
|2021-11-09 12:00:00 - 23:59:59|A |
+------------------------------+----+

Spark 1.5.2: Grouping DataFrame Rows over a Time Range

Generally speaking it is relatively simple task. All you need is basic arithmetics on UNIX timestamps. First lets cast all timestamps to numerics:

val dfNum = df.withColumn("ts", $"timestamp".cast("long"))

Next lets find minimum timestamp over all rows:

val offset = dfNum.agg(min($"ts")).first.getLong(0)

and use it to compute groups:

val aDay = lit(60 * 60 * 24)
val group = (($"ts" - lit(offset)) / aDay).cast("long")
val dfWithGroups = dfNum.withColumn("group", group)

Finally you can use it as a grouping column:

dfWithGroups.groupBy($"group").agg(min($"value")).

If you want meaningful intervals (interpretable as timestamps) just multiply groups by aDay.

Obviously this won't handle complex cases like handling daylight saving time or leap seconds but should be good enough most of the time. If you need to handle properly any of this you use a similar logic using Joda time with an UDF.

scala spark get the mean of each time interval

Ok here's my solution thank's to other posts :
I created a column Bucket associated to the modulo of EventTime to create categories, and then i groupby buckets and take the mean

    val df = data_input.withColumn("Bucket", toBucketUDF(col("EventTime")))

val finalDF = df.groupBy("Bucket")
.agg(mean("RSSI"))
.withColumnRenamed("avg(RSSI)", "RSSI")
.orderBy("Bucket")
.withColumn("EventTime", getTimeUDF(col("Bucket")))
.drop("Bucket")

finalDF

}

def toBucket(input:Int): Int = {
val Bucket = input/900
return Bucket
}
def getTime(input: Int): Int = {
val time = (input+1) * 900
return time
}

val toBucketUDF = udf(toBucket _)
val getTimeUDF = udf(getTime _)

Pyspark GroupBy time span

You can use sequence SQL. sequence will create the date range with start, end and interval and return the list.

Then, you can use explode to flatten the list and then count.

from pyspark.sql import functions as F

# Make sure your spark session is set to UTC.
# This SQL won't work well with a month interval if timezone is set to a place that has a daylight saving.
spark = (SparkSession
.builder
.config('spark.sql.session.timeZone', 'UTC')
... # other config
.getOrCreate())

df = (df.withColumn('range', F.expr('sequence(to_date(`start`), to_date(`end`), interval 1 month) as date'))
.withColumn('observation', F.explode('range')))

df = df.groupby('observation').count()

GroupBy using Time Frequency on PySpark DataFrame Issue

You have two ways to answer your issue, either you cast your timestamps to the date granularity you want to group by with or (as you said in the comments) you use the sql window function to group by interval you'd like.

Just know that monthly aggregation are not possible through the window SQL function in Spark.

Here you can see the code, first three examples use the window SQL function and the last example cast the timestamp monthly and then group by every columns.

df = spark.createDataFrame(
[
("20-05-09 03:06:21", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220034"),
("20-05-09 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-05-09 12:04:52", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:24:09", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-04-09 10:57:48", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 12:12:26", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 03:26:33", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220071")
],
['CAPTUREDTIME', 'NODE', 'CHANNEL', 'LOCATION', 'TACK']
)

from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year

Hourly

I still keep the window logic just for this one, so we can reference for everyone every possibility in Spark. I only select the start of the window at the end before showing the dataframe.

hourly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.groupBy(window(col("captured_time"), "1 hour").alias("captured_time"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("captured_time_hour", col("captured_time.start"))
.drop("captured_time")
)
hourly.sort("captured_time_hour").show(100, False)

Daily

Through the date_trunc function, I can truncate the timestamp only considering the day

daily = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_day", date_trunc("day", col("captured_time")))
.groupBy("captured_time_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
)
daily.sort("captured_time_day").show(100, False)

Weekly

This one is a bit more tricky. First I use, a next_day function with monday. Please if you consider Sunday as the start of the week, update this code according to it, but I consider monday as the start of the week (it depends of SQL dialects I believe and regions)

Then we can also add a weekofyear function to retrieve the week number as you wanted

weekly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("start_day", date_sub(next_day(col("captured_time"), "monday"), 7))
.groupBy("start_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("start_day", to_timestamp(col("start_day")))
.withColumn("week_of_year", weekofyear(col("start_day")))
)
weekly.sort("start_day").show(100, False)

Monthly

We just format the timestamp as a date, and then cast it back to timestamp. This is just done to show another way of doing it. We could just truncate the timestamp as the daily usecase. I also show two ways of extracting the month name and abbreviation. Just take care of your Spark version as this is tested in Spark 3.0.0

monthly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_month", date_format(col('captured_time'), '1/M/yyyy'))
.groupBy(col("captured_time_month"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*").alias("Count TACK"))
.withColumn("captured_time_month", to_timestamp(col("captured_time_month"), '1/M/yyyy'))
.withColumn("month", month(col("captured_time_month")))
.withColumn("month_abbr", date_format(col("captured_time_month"),'MMM'))
.withColumn("full_month_name", date_format(col("captured_time_month"),'MMMM'))
)
monthly.sort("captured_time_month").show(100, False)

Ciao !

Scala Spark get sum by time bucket across team spans and key

for my opinion, maybe you need preprocess you data by spilt you duration to every minutes (or every five minutes).as you wish, the first row



Related Topics



Leave a reply



Submit