Cumulative sum in Spark
To get the cumulative sum using the DataFrame API you should use the rowsBetween
window method. In Spark 2.1 and newer create the window as follows:
val w = Window.partitionBy($"product_id", $"ack")
.orderBy($"date_time")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
This will tell Spark to use the values from the beginning of the partition until the current row. Using older versions of Spark, use rowsBetween(Long.MinValue, 0)
for the same effect.
To use the window, use the same method as before:
val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
.withColumn("val2_sum", sum($"val2").over(w))
How to do cumulative sum based on conditions in spark scala
You can define a column group
using conditional sum on flag
, then using row_number
with a Window partitioned by cola
and group
gives the result you want:
import org.apache.spark.sql.expressions.Window
val result = df.withColumn(
"group",
sum(when(col("flag") === 0, 1).otherwise(0)).over(Window.partitionBy("cola").orderBy("date"))
).withColumn(
"final_column",
row_number().over(Window.partitionBy("cola", "group").orderBy("date")) - 1
).drop("group")
result.show
//+----+-----+----+------------+
//|cola| date|flag|final_column|
//+----+-----+----+------------+
//| b|44201| 0| 0|
//| b|44202| 1| 1|
//| b|44203| 1| 2|
//| b|44204| 0| 0|
//| b|44205| 0| 0|
//| b|44206| 1| 1|
//| b|44207| 1| 2|
//| b|44208| 1| 3|
//| b|44209| 1| 4|
//| b|44210| 0| 0|
//| a|44201| 0| 0|
//| a|44202| 1| 1|
//| a|44203| 1| 2|
//| a|44204| 0| 0|
//| a|44205| 0| 0|
//| a|44206| 0| 0|
//| a|44207| 1| 1|
//| a|44208| 1| 2|
//| a|44209| 1| 3|
//| a|44210| 0| 0|
//+----+-----+----+------------+
row_number() - 1
in this case is just equivalent to sum(col("flag"))
as flag values are always 0 or 1. So the above final_column
can also be written as:
.withColumn(
"final_column",
sum(col("flag")).over(Window.partitionBy("cola", "group").orderBy("date"))
)
How to compute cumulative sum using Spark
Compute partial results for each partition:
val partials = rdd.mapPartitionsWithIndex((i, iter) => {
val (keys, values) = iter.toSeq.unzip
val sums = values.scanLeft(0)(_ + _)
Iterator((keys.zip(sums.tail), sums.last))
})Collect partials sums
val partialSums = partials.values.collect
Compute cumulative sum over partitions and broadcast it:
val sumMap = sc.broadcast(
(0 until rdd.partitions.size)
.zip(partialSums.scanLeft(0)(_ + _))
.toMap
)Compute final results:
val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
val offset = sumMap.value(i)
if (iter.isEmpty) Iterator()
else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
})
Pyspark Cumulative sum within Partition for moving last 2 (N) rows
What you want is to sum the last 2 rows (current row included), so simply specify the rowsBetween
like this:
from pyspark.sql import functions as F, Window
w = Window.partitionBy('Period').orderBy('Month').rowsBetween(-1, Window.currentRow)
df = df.withColumn('CustLast2', F.sum('Cust').over(w))
You inverted the lower and upper bounds of the window frame in your attempt.
How to get aggregate by hour including missing hours and add cumulative sum?
You could do it by first creating a table with hours and then joining it with the rest of data.
Setup:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
['GroupId', 'Event_time', 'Event_name', 'Event_value']
)
df = df.withColumn('Date', F.col('Event_time').cast('date'))
The following creates a dataframe with hours:
min_date = df.agg(F.min('Date')).head()[0]
max_date = df.agg(F.max('Date')).head()[0]
df_hours = df.select(
'GroupId',
'Event_name',
F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
).distinct()
Then, aggregating your first table by hours:
df_agg = (df
.groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
.agg(F.sum('Event_value').alias('Count'))
)
Joining them both together:
df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')
Adding column agg_count
and others:
w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
df2 = (df_joined
.select(
'GroupId',
'Event_name',
F.to_date('date_hour').alias('Date'),
F.date_format('date_hour', 'HH').alias('Hour'),
'Count',
F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
)
)
Result:
+-------+----------+----------+----+-----+---------+
|GroupId|Event_name| Date|Hour|Count|agg_count|
+-------+----------+----------+----+-----+---------+
| xx| eventA|2011-08-15| 00| null| 0|
| xx| eventA|2011-08-15| 01| null| 0|
| xx| eventA|2011-08-15| 02| null| 0|
| xx| eventA|2011-08-15| 03| null| 0|
| xx| eventA|2011-08-15| 04| null| 0|
| xx| eventA|2011-08-15| 05| null| 0|
| xx| eventA|2011-08-15| 06| null| 0|
| xx| eventA|2011-08-15| 07| null| 0|
| xx| eventA|2011-08-15| 08| null| 0|
| xx| eventA|2011-08-15| 09| null| 0|
| xx| eventA|2011-08-15| 10| null| 0|
| xx| eventA|2011-08-15| 11| null| 0|
| xx| eventA|2011-08-15| 12| null| 0|
| xx| eventA|2011-08-15| 13| null| 0|
| xx| eventA|2011-08-15| 14| 3| 3|
| xx| eventA|2011-08-15| 15| null| 3|
| xx| eventA|2011-08-15| 16| 100| 103|
| xx| eventA|2011-08-15| 17| null| 103|
| xx| eventA|2011-08-15| 18| null| 103|
| xx| eventA|2011-08-15| 19| null| 103|
| xx| eventA|2011-08-15| 20| null| 103|
| xx| eventA|2011-08-15| 21| null| 103|
| xx| eventA|2011-08-15| 22| null| 103|
| xx| eventA|2011-08-15| 23| null| 103|
| yy| eventA|2011-08-15| 00| null| 0|
| yy| eventA|2011-08-15| 01| null| 0|
| yy| eventA|2011-08-15| 02| null| 0|
| yy| eventA|2011-08-15| 03| null| 0|
| yy| eventA|2011-08-15| 04| null| 0|
| yy| eventA|2011-08-15| 05| null| 0|
| yy| eventA|2011-08-15| 06| null| 0|
| yy| eventA|2011-08-15| 07| null| 0|
| yy| eventA|2011-08-15| 08| null| 0|
| yy| eventA|2011-08-15| 09| null| 0|
| yy| eventA|2011-08-15| 10| null| 0|
| yy| eventA|2011-08-15| 11| 2| 2|
| yy| eventA|2011-08-15| 12| 1| 3|
| yy| eventA|2011-08-15| 13| 3| 6|
| yy| eventA|2011-08-15| 14| null| 6|
| yy| eventA|2011-08-15| 15| null| 6|
| yy| eventA|2011-08-15| 16| null| 6|
| yy| eventA|2011-08-15| 17| null| 6|
| yy| eventA|2011-08-15| 18| null| 6|
| yy| eventA|2011-08-15| 19| null| 6|
| yy| eventA|2011-08-15| 20| null| 6|
| yy| eventA|2011-08-15| 21| null| 6|
| yy| eventA|2011-08-15| 22| null| 6|
| yy| eventA|2011-08-15| 23| null| 6|
+-------+----------+----------+----+-----+---------+
Calculate cumulative sum and average based on column values in spark dataframe
You need to chain when()
clauses as you want to populate one single column:
windowval=(Window.partitionBy('Location','Brand').orderBy('month_in_timestamp')
.rangeBetween(Window.unboundedPreceding, 0))
df = df.withColumn('TotalSumValue',
F.when(F.col('Brand').isin('brand1', 'brand2'), F.sum('TrueValue').over(windowval)) \
.when(F.col('Brand').isin('brand3'), F.avg('TrueValue').over(windowval)))
PySpark - Cumulative sum with limits
In such cases, we usually think of window functions to do a calculation going from one row to next. But this case is different, because the window should kind of keep track of itself. So window cannot help.
Main idea. Instead of operating with rows, one can do the work with grouped/aggregated arrays. In this case, it would work very well, because we do have a key to use in groupBy
, so the table will be divided into chunks of data, so the calculations will be parallelized.
Input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, '2022-01-01', 0),
(1, '2022-01-02', 0),
(1, '2022-01-03', 1),
(1, '2022-01-04', 1),
(1, '2022-01-05', 1),
(1, '2022-01-06', 0),
(1, '2022-01-07', 0),
(1, '2022-01-08', 0),
(1, '2022-01-09', 1),
(1, '2022-01-10', 1),
(1, '2022-01-11', 1),
(1, '2022-01-12', 0),
(1, '2022-01-13', 0),
(1, '2022-01-14', -1),
(1, '2022-01-15', -1),
(1, '2022-01-16', -1),
(1, '2022-01-17', -1),
(1, '2022-01-18', -1),
(1, '2022-01-19', -1),
(1, '2022-01-20', 0)],
['user_id', 'date', 'valor'])
Script:
df = df.groupBy('user_id').agg(
F.aggregate(
F.array_sort(F.collect_list(F.struct('date', 'valor'))),
F.expr("array(struct(cast(null as string) date, 0L valor, 3L cum))"),
lambda acc, x: F.array_union(
acc,
F.array(x.withField(
'cum',
F.greatest(F.lit(1), F.least(F.lit(5), x['valor'] + F.element_at(acc, -1)['cum']))
))
)
).alias("a")
)
df = df.selectExpr("user_id", "inline(slice(a, 2, size(a)))")
df.show()
# +-------+----------+-----+---+
# |user_id| date|valor|cum|
# +-------+----------+-----+---+
# | 1|2022-01-01| 0| 3|
# | 1|2022-01-02| 0| 3|
# | 1|2022-01-03| 1| 4|
# | 1|2022-01-04| 1| 5|
# | 1|2022-01-05| 1| 5|
# | 1|2022-01-06| 0| 5|
# | 1|2022-01-07| 0| 5|
# | 1|2022-01-08| 0| 5|
# | 1|2022-01-09| 1| 5|
# | 1|2022-01-10| 1| 5|
# | 1|2022-01-11| 1| 5|
# | 1|2022-01-12| 0| 5|
# | 1|2022-01-13| 0| 5|
# | 1|2022-01-14| -1| 4|
# | 1|2022-01-15| -1| 3|
# | 1|2022-01-16| -1| 2|
# | 1|2022-01-17| -1| 1|
# | 1|2022-01-18| -1| 1|
# | 1|2022-01-19| -1| 1|
# | 1|2022-01-20| 0| 1|
# +-------+----------+-----+---+
Explanation
Groups are created based on "user_id". The aggregation for these groups lies in this line:
F.array_sort(F.collect_list(F.struct('date', 'valor')))
This creates arrays (collect_list
) for every "user_id". These arrays contain structs of 2 fields: date and value.
+-------+-----------------------------------------------+
|user_id|a |
+-------+-----------------------------------------------+
|1 |[{2022-01-01, 0}, {2022-01-02, 0}, {...} ... ] |
+-------+-----------------------------------------------+
array_sort
is used to make sure all the structs inside are sorted, because other steps will depend on it.
All the rest what's inside agg
is for transforming the result of the above aggregation.
The main part in the code is aggregate
. It takes an array, "loops" through every element and returns one value (in our case, this value is made to be array too). It works like this... You take the initial value (array(struct(cast(null as string) date, 0L valor, 3L cum))
and merge it with the first element in the array using the provided function (lambda
). The result is then used in place of initial value for the next run. You do the merge again, but with the following element in the array. And so on.
In this case, the lambda
function performs array_union
, which makes a union of arrays having identic schemas.
We take the initial value (array of structs) as
acc
variable[{null, 0, 3}]
(it's already ready to be used inarray_union
)take the first element inside 'a' column's array (i.e. ) as
x
variable{2022-01-01, 0}
(it's a struct, so the schema is not the same withacc
(array of structs), so some processing is needed, and also the calculation needs to be done at this step, as we have access to both of the variables at this point)we'll create the array of structs by enclosing the
x
struct insideF.array()
; also, we'll have to add one more field to the struct, asx
struct currently has just 2 fieldsF.array(x.withField('cum', ...))
inside the
.withField()
we have to provide the expression for the fieldF.greatest(
F.lit(1),
F.least(
F.lit(5),
x['valor'] + F.element_at(acc, -1)['cum']
)
)element_at(acc, -1)
takes the last struct ofacc
array['cum']
takes the field 'cum' from the structx['valor'] +
adds 'valor' field from thex
structF.least()
assures that the max value in 'cum' will stay 5 (takes the min value from the new 'cum' and 5)F.greatest()
assures that the min value in 'cum' will stay 1both
acc
and the newly created array of structs now have identic schemas and proper data, so they can be unionizedarray_union
the result is now being assigned toacc
variable, whilex
variable gets assigned the next value from the 'a' array.
The process continues from step 3.
Finally, the result of aggregate
looks like[{null, 0, 3}, {2022-01-01, 0, 3}, {2022-01-02, 0, 3}, {2022-01-03, 1, 4}, {...} ... ]
The first element is removed using slice(..., 2, size(a))
inline
is used to explode the array of structs.
Note. It's important to create the initial value of aggregate
such that it would contain proper schema (column/field names and types):
F.expr("array(struct(cast(null as string) date, 0L valor, 3L cum))")
Those L
letters tell that 0
and 3
are of bigint (long) data type. (sql-ref-literals)
The same could have been written like this:
F.expr("array(struct(null, 0, 3))").cast('array<struct<date:string,valor:bigint,cum:bigint>>')
Python Spark Cumulative Sum by Group Using DataFrame
This can be done using a combination of a window function and the Window.unboundedPreceding value in the window's range as follows:
from pyspark.sql import Window
from pyspark.sql import functions as F
windowval = (Window.partitionBy('class').orderBy('time')
.rangeBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()
+----+-----+-----+-------+
|time|value|class|cum_sum|
+----+-----+-----+-------+
| 1| 3| b| 3|
| 2| 3| b| 6|
| 1| 2| a| 2|
| 2| 2| a| 4|
| 3| 2| a| 6|
+----+-----+-----+-------+
Related Topics
Teradata Equivalent for Lead and Lag Function of Oracle
SQL Server 2008 Insert with While Loop
Combine Consecutive Date Ranges
With Hibernate, How to Query a Table and Return a Hashmap with Key Value Pair Id>Name
SQL Server:Export Query as a .Txt File
Foreign Key Column Mapped to Multiple Primary Keys
How to List All Grants a User Received
Sqlite - Replace Part of a String
SQL String Comparison Speed 'Like' VS 'Patindex'
Read the Log File (*.Ldf) in SQL Server 2008
In Postgres, Can You Set the Default Formatting for a Timestamp, by Session or Globally
How to Run Multiple SQL Queries
Rounding a Datetime Value Down to the Nearest Half Hour
Anyway for Ado to Read Updated Data from a Read-Only Excel File Before Save? (Vba)
Calling Stored Procedure While Passing Parameters from Access Module in Vba