Cumulative Sum in Spark

Cumulative sum in Spark

To get the cumulative sum using the DataFrame API you should use the rowsBetween window method. In Spark 2.1 and newer create the window as follows:

val w = Window.partitionBy($"product_id", $"ack")
.orderBy($"date_time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

This will tell Spark to use the values from the beginning of the partition until the current row. Using older versions of Spark, use rowsBetween(Long.MinValue, 0) for the same effect.

To use the window, use the same method as before:

val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
.withColumn("val2_sum", sum($"val2").over(w))

How to do cumulative sum based on conditions in spark scala

You can define a column group using conditional sum on flag, then using row_number with a Window partitioned by cola and group gives the result you want:

import org.apache.spark.sql.expressions.Window

val result = df.withColumn(
"group",
sum(when(col("flag") === 0, 1).otherwise(0)).over(Window.partitionBy("cola").orderBy("date"))
).withColumn(
"final_column",
row_number().over(Window.partitionBy("cola", "group").orderBy("date")) - 1
).drop("group")

result.show

//+----+-----+----+------------+
//|cola| date|flag|final_column|
//+----+-----+----+------------+
//| b|44201| 0| 0|
//| b|44202| 1| 1|
//| b|44203| 1| 2|
//| b|44204| 0| 0|
//| b|44205| 0| 0|
//| b|44206| 1| 1|
//| b|44207| 1| 2|
//| b|44208| 1| 3|
//| b|44209| 1| 4|
//| b|44210| 0| 0|
//| a|44201| 0| 0|
//| a|44202| 1| 1|
//| a|44203| 1| 2|
//| a|44204| 0| 0|
//| a|44205| 0| 0|
//| a|44206| 0| 0|
//| a|44207| 1| 1|
//| a|44208| 1| 2|
//| a|44209| 1| 3|
//| a|44210| 0| 0|
//+----+-----+----+------------+

row_number() - 1 in this case is just equivalent to sum(col("flag")) as flag values are always 0 or 1. So the above final_column can also be written as:

.withColumn(
"final_column",
sum(col("flag")).over(Window.partitionBy("cola", "group").orderBy("date"))
)

How to compute cumulative sum using Spark

  1. Compute partial results for each partition:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
    val (keys, values) = iter.toSeq.unzip
    val sums = values.scanLeft(0)(_ + _)
    Iterator((keys.zip(sums.tail), sums.last))
    })
  2. Collect partials sums

    val partialSums = partials.values.collect
  3. Compute cumulative sum over partitions and broadcast it:

    val sumMap = sc.broadcast(
    (0 until rdd.partitions.size)
    .zip(partialSums.scanLeft(0)(_ + _))
    .toMap
    )
  4. Compute final results:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
    val offset = sumMap.value(i)
    if (iter.isEmpty) Iterator()
    else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })

Pyspark Cumulative sum within Partition for moving last 2 (N) rows

What you want is to sum the last 2 rows (current row included), so simply specify the rowsBetween like this:

from pyspark.sql import functions as F, Window

w = Window.partitionBy('Period').orderBy('Month').rowsBetween(-1, Window.currentRow)

df = df.withColumn('CustLast2', F.sum('Cust').over(w))

You inverted the lower and upper bounds of the window frame in your attempt.

How to get aggregate by hour including missing hours and add cumulative sum?

You could do it by first creating a table with hours and then joining it with the rest of data.

Setup:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
['GroupId', 'Event_time', 'Event_name', 'Event_value']
)
df = df.withColumn('Date', F.col('Event_time').cast('date'))

The following creates a dataframe with hours:

min_date = df.agg(F.min('Date')).head()[0]
max_date = df.agg(F.max('Date')).head()[0]
df_hours = df.select(
'GroupId',
'Event_name',
F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
).distinct()

Then, aggregating your first table by hours:

df_agg = (df
.groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
.agg(F.sum('Event_value').alias('Count'))
)

Joining them both together:

df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')

Adding column agg_count and others:

w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
df2 = (df_joined
.select(
'GroupId',
'Event_name',
F.to_date('date_hour').alias('Date'),
F.date_format('date_hour', 'HH').alias('Hour'),
'Count',
F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
)
)

Result:

+-------+----------+----------+----+-----+---------+
|GroupId|Event_name| Date|Hour|Count|agg_count|
+-------+----------+----------+----+-----+---------+
| xx| eventA|2011-08-15| 00| null| 0|
| xx| eventA|2011-08-15| 01| null| 0|
| xx| eventA|2011-08-15| 02| null| 0|
| xx| eventA|2011-08-15| 03| null| 0|
| xx| eventA|2011-08-15| 04| null| 0|
| xx| eventA|2011-08-15| 05| null| 0|
| xx| eventA|2011-08-15| 06| null| 0|
| xx| eventA|2011-08-15| 07| null| 0|
| xx| eventA|2011-08-15| 08| null| 0|
| xx| eventA|2011-08-15| 09| null| 0|
| xx| eventA|2011-08-15| 10| null| 0|
| xx| eventA|2011-08-15| 11| null| 0|
| xx| eventA|2011-08-15| 12| null| 0|
| xx| eventA|2011-08-15| 13| null| 0|
| xx| eventA|2011-08-15| 14| 3| 3|
| xx| eventA|2011-08-15| 15| null| 3|
| xx| eventA|2011-08-15| 16| 100| 103|
| xx| eventA|2011-08-15| 17| null| 103|
| xx| eventA|2011-08-15| 18| null| 103|
| xx| eventA|2011-08-15| 19| null| 103|
| xx| eventA|2011-08-15| 20| null| 103|
| xx| eventA|2011-08-15| 21| null| 103|
| xx| eventA|2011-08-15| 22| null| 103|
| xx| eventA|2011-08-15| 23| null| 103|
| yy| eventA|2011-08-15| 00| null| 0|
| yy| eventA|2011-08-15| 01| null| 0|
| yy| eventA|2011-08-15| 02| null| 0|
| yy| eventA|2011-08-15| 03| null| 0|
| yy| eventA|2011-08-15| 04| null| 0|
| yy| eventA|2011-08-15| 05| null| 0|
| yy| eventA|2011-08-15| 06| null| 0|
| yy| eventA|2011-08-15| 07| null| 0|
| yy| eventA|2011-08-15| 08| null| 0|
| yy| eventA|2011-08-15| 09| null| 0|
| yy| eventA|2011-08-15| 10| null| 0|
| yy| eventA|2011-08-15| 11| 2| 2|
| yy| eventA|2011-08-15| 12| 1| 3|
| yy| eventA|2011-08-15| 13| 3| 6|
| yy| eventA|2011-08-15| 14| null| 6|
| yy| eventA|2011-08-15| 15| null| 6|
| yy| eventA|2011-08-15| 16| null| 6|
| yy| eventA|2011-08-15| 17| null| 6|
| yy| eventA|2011-08-15| 18| null| 6|
| yy| eventA|2011-08-15| 19| null| 6|
| yy| eventA|2011-08-15| 20| null| 6|
| yy| eventA|2011-08-15| 21| null| 6|
| yy| eventA|2011-08-15| 22| null| 6|
| yy| eventA|2011-08-15| 23| null| 6|
+-------+----------+----------+----+-----+---------+

Calculate cumulative sum and average based on column values in spark dataframe

You need to chain when() clauses as you want to populate one single column:

windowval=(Window.partitionBy('Location','Brand').orderBy('month_in_timestamp')
.rangeBetween(Window.unboundedPreceding, 0))

df = df.withColumn('TotalSumValue',
F.when(F.col('Brand').isin('brand1', 'brand2'), F.sum('TrueValue').over(windowval)) \
.when(F.col('Brand').isin('brand3'), F.avg('TrueValue').over(windowval)))

PySpark - Cumulative sum with limits

In such cases, we usually think of window functions to do a calculation going from one row to next. But this case is different, because the window should kind of keep track of itself. So window cannot help.

Main idea. Instead of operating with rows, one can do the work with grouped/aggregated arrays. In this case, it would work very well, because we do have a key to use in groupBy, so the table will be divided into chunks of data, so the calculations will be parallelized.

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, '2022-01-01', 0),
(1, '2022-01-02', 0),
(1, '2022-01-03', 1),
(1, '2022-01-04', 1),
(1, '2022-01-05', 1),
(1, '2022-01-06', 0),
(1, '2022-01-07', 0),
(1, '2022-01-08', 0),
(1, '2022-01-09', 1),
(1, '2022-01-10', 1),
(1, '2022-01-11', 1),
(1, '2022-01-12', 0),
(1, '2022-01-13', 0),
(1, '2022-01-14', -1),
(1, '2022-01-15', -1),
(1, '2022-01-16', -1),
(1, '2022-01-17', -1),
(1, '2022-01-18', -1),
(1, '2022-01-19', -1),
(1, '2022-01-20', 0)],
['user_id', 'date', 'valor'])

Script:

df = df.groupBy('user_id').agg(
F.aggregate(
F.array_sort(F.collect_list(F.struct('date', 'valor'))),
F.expr("array(struct(cast(null as string) date, 0L valor, 3L cum))"),
lambda acc, x: F.array_union(
acc,
F.array(x.withField(
'cum',
F.greatest(F.lit(1), F.least(F.lit(5), x['valor'] + F.element_at(acc, -1)['cum']))
))
)
).alias("a")
)
df = df.selectExpr("user_id", "inline(slice(a, 2, size(a)))")

df.show()
# +-------+----------+-----+---+
# |user_id| date|valor|cum|
# +-------+----------+-----+---+
# | 1|2022-01-01| 0| 3|
# | 1|2022-01-02| 0| 3|
# | 1|2022-01-03| 1| 4|
# | 1|2022-01-04| 1| 5|
# | 1|2022-01-05| 1| 5|
# | 1|2022-01-06| 0| 5|
# | 1|2022-01-07| 0| 5|
# | 1|2022-01-08| 0| 5|
# | 1|2022-01-09| 1| 5|
# | 1|2022-01-10| 1| 5|
# | 1|2022-01-11| 1| 5|
# | 1|2022-01-12| 0| 5|
# | 1|2022-01-13| 0| 5|
# | 1|2022-01-14| -1| 4|
# | 1|2022-01-15| -1| 3|
# | 1|2022-01-16| -1| 2|
# | 1|2022-01-17| -1| 1|
# | 1|2022-01-18| -1| 1|
# | 1|2022-01-19| -1| 1|
# | 1|2022-01-20| 0| 1|
# +-------+----------+-----+---+

Explanation

Groups are created based on "user_id". The aggregation for these groups lies in this line:

F.array_sort(F.collect_list(F.struct('date', 'valor')))

This creates arrays (collect_list) for every "user_id". These arrays contain structs of 2 fields: date and value.

+-------+-----------------------------------------------+
|user_id|a |
+-------+-----------------------------------------------+
|1 |[{2022-01-01, 0}, {2022-01-02, 0}, {...} ... ] |
+-------+-----------------------------------------------+

array_sort is used to make sure all the structs inside are sorted, because other steps will depend on it.

All the rest what's inside agg is for transforming the result of the above aggregation.

The main part in the code is aggregate. It takes an array, "loops" through every element and returns one value (in our case, this value is made to be array too). It works like this... You take the initial value (array(struct(cast(null as string) date, 0L valor, 3L cum)) and merge it with the first element in the array using the provided function (lambda). The result is then used in place of initial value for the next run. You do the merge again, but with the following element in the array. And so on.

In this case, the lambda function performs array_union, which makes a union of arrays having identic schemas.

  1. We take the initial value (array of structs) as acc variable

    [{null, 0, 3}]

    (it's already ready to be used in array_union)

  2. take the first element inside 'a' column's array (i.e. ) as x variable

    {2022-01-01, 0}

    (it's a struct, so the schema is not the same with acc (array of structs), so some processing is needed, and also the calculation needs to be done at this step, as we have access to both of the variables at this point)

  3. we'll create the array of structs by enclosing the x struct inside F.array(); also, we'll have to add one more field to the struct, as x struct currently has just 2 fields

    F.array(x.withField('cum', ...))

  4. inside the .withField() we have to provide the expression for the field

    F.greatest(
    F.lit(1),
    F.least(
    F.lit(5),
    x['valor'] + F.element_at(acc, -1)['cum']
    )
    )

    element_at(acc, -1) takes the last struct of acc array

    ['cum'] takes the field 'cum' from the struct

    x['valor'] + adds 'valor' field from the x struct

    F.least() assures that the max value in 'cum' will stay 5 (takes the min value from the new 'cum' and 5)

    F.greatest() assures that the min value in 'cum' will stay 1

  5. both acc and the newly created array of structs now have identic schemas and proper data, so they can be unionized

    array_union

    the result is now being assigned to acc variable, while x variable gets assigned the next value from the 'a' array.

    The process continues from step 3.

Finally, the result of aggregate looks like

[{null, 0, 3}, {2022-01-01, 0, 3}, {2022-01-02, 0, 3}, {2022-01-03, 1, 4}, {...} ... ]

The first element is removed using slice(..., 2, size(a))

inline is used to explode the array of structs.


Note. It's important to create the initial value of aggregate such that it would contain proper schema (column/field names and types):

F.expr("array(struct(cast(null as string) date, 0L valor, 3L cum))")

Those L letters tell that 0 and 3 are of bigint (long) data type. (sql-ref-literals)

The same could have been written like this:

F.expr("array(struct(null, 0, 3))").cast('array<struct<date:string,valor:bigint,cum:bigint>>')

Python Spark Cumulative Sum by Group Using DataFrame

This can be done using a combination of a window function and the Window.unboundedPreceding value in the window's range as follows:

from pyspark.sql import Window
from pyspark.sql import functions as F

windowval = (Window.partitionBy('class').orderBy('time')
.rangeBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()
+----+-----+-----+-------+
|time|value|class|cum_sum|
+----+-----+-----+-------+
| 1| 3| b| 3|
| 2| 3| b| 6|
| 1| 2| a| 2|
| 2| 2| a| 4|
| 3| 2| a| 6|
+----+-----+-----+-------+


Related Topics



Leave a reply



Submit