Spark first Window function is taking much longer than last
The solution that doesn't answer the question
In trying various things to speed up my routine, it occurred to me to try re-rewriting my usages of first()
to just be usages of last()
with a reversed sort order.
So rewriting this:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy('rank').rowsBetween(0, Window.unboundedFollowing))
df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
As this:
win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy(F.desc('rank')).rowsBetween(Window.unboundedPreceding, 0))
df_part2 = (df_part1
.withColumn('next_rank', F.last(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.last(F.col('scale_factor'), ignorenulls=True).over(win_next))
)
Much to my amazement, this actually solved the performance problem, and now the entire dataframe is generated in just 3 seconds. I'm pleased, but still vexed.
As I somewhat predicted, the query plan now includes a new SORT step before creating these next two columns, and they've changed from Window
to RunningWindowFunction
as the first two. Here's the new plan (without the code broken up into 3 separate cached parts anymore, because that was just to troubleshoot performance):
As for the question:
Why do my calls to first() over Window.unboundedFollowing take so much longer than last() over Window.unboundedPreceding?
I'm hoping someone can still answer this, for academic reasons
How to use first and last function in pyspark?
Try inverting the sort order using .desc()
and then first()
will give the desired output.
w2 = Window().partitionBy("k").orderBy(df.v.desc())
df.select(F.col("k"), F.first("v",True).over(w2).alias('v')).show()
F.first("v",True).over(w2).alias('v').show()
Outputs:
+---+---+
| k| v|
+---+---+
| b| 3|
| b| 3|
| a| 1|
| a| 1|
| a| 1|
+---+---+
You should also be careful about partitionBy vs. orderBy. Since you are partitioning by 'k', all of the values of k in any given window are the same. Sorting by 'k' does nothing.
The last function is not really the opposite of first, in terms of which item from the window it returns. It returns the last non-null, value it has seen, as it progresses through the ordered rows.
To compare their effects, here is a dataframe with both function/ordering combinations. Notice how in column 'last_w2', the null value has been replaced by -1.
df = spark.sparkContext.parallelize([
("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)]).toDF(["k", "v"])
#create two windows for comparison.
w = Window().partitionBy("k").orderBy('v')
w2 = Window().partitionBy("k").orderBy(df.v.desc())
df.select('k','v',
F.first("v",True).over(w).alias('first_w1'),
F.last("v",True).over(w).alias('last_w1'),
F.first("v",True).over(w2).alias('first_w2'),
F.last("v",True).over(w2).alias('last_w2')
).show()
Output:
+---+----+--------+-------+--------+-------+
| k| v|first_w1|last_w1|first_w2|last_w2|
+---+----+--------+-------+--------+-------+
| b| 1| 1| 1| 3| 1|
| b| 3| 1| 3| 3| 3|
| a|null| null| null| 1| -1|
| a| -1| -1| -1| 1| -1|
| a| 1| -1| 1| 1| 1|
+---+----+--------+-------+--------+-------+
Window function acts not as expected when I use Order By (PySpark)
The simple reason is that the default window range/row spec is Window.UnboundedPreceding
to Window.CurrentRow
, which means that the max is taken from the first row in that partition to the current row, NOT the last row of the partition.
This is a common gotcha. (you can replace .max()
with sum()
and see what output you get. It also changes depending on how you order the partition.)
To solve this, you can specify that you want the max of each partition to always be calculated using the full window partition, like so:
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
+----------+-------+------------------+
problem in using last function in pyspark
Change your window to this.
my_window = Window.partitionBy('number').orderBy(df['date']).rowsBetween(Window.currentRow, Window.unboundedFollowing)
Your window imposes the rows between the first row to the current and in this case, the last is the same as the current.
Spark : need confirmation on approach in capturing first and last date : on dataset
The third approach you propose will work every time. You could also write it like this:
df
.groupBy('A', 'B', 'C', 'D')
.agg(F.min('WEEK').alias('min_week'), F.max('WEEK').alias('max_week'),
F.min('MONTH').alias('min_month'), F.max('MONTH').alias('max_month'))
.show()
which yields:
+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D|min_week|max_week|min_month|max_month|
+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D| 201701| 201901| 2020001| 2020003|
+---+---+---+---+--------+--------+---------+---------+
It is interesting to understand why the first two approaches produce unpredictable results while the third always works.
The second approach is unpredictable because spark is a parallel computation engine. When it aggregates a value, it starts by aggregating the value in all the partitions and then the results will be aggregated two by two. Yet the order of these aggregations is not deterministic. It depends among other things on the order of completion of the tasks which can change at every attempt, in particular if there is a lot of data.
The first approach is not exactly what you want to do. Window functions will not aggregate the dataframe into one single row. They will compute the aggregation and add it to every row. You are also making several mistakes. If you order the dataframe, by default spark considers windows ranging from the start of the window to the current row. Therefore the maximum will be the current row for the week. In fact, to compute the in and the max, you do not need to order the dataframe. You can just do it like this:
w = Window.partitionBy('A','B', 'C', 'D')
df.select('A', 'B', 'C', 'D',
F.min('WEEK').over(w).alias('min_week'),
F.max('WEEK').over(w).alias('max_week'),
F.min('MONTH').over(w).alias('min_month'),
F.max('MONTH').over(w).alias('max_month')
).show()
which yields the correct result but that was not what you were expecting. But at least, you see the difference between window aggregations and regular aggregations.
+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D|min_week|max_week|min_month|max_month|
+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D| 201701| 201901| 2020001| 2020003|
| A| B| C| D| 201701| 201901| 2020001| 2020003|
| A| B| C| D| 201701| 201901| 2020001| 2020003|
+---+---+---+---+--------+--------+---------+---------+
PySpark subtract last row from first row in a group
Code below
w=Window.partitionBy('ID').orderBy('col1').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('out', last('col1').over(w)-first('col1').over(w)).show()
Get the last value using spark window function
If you want to propagate the last known value (it is not the same as logic used with join
) you should:
ORDER BY timestamp
.- Take
last
ignoringnulls
:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// | id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0| 2| Adam| Adam|
// |-1.0| 4|Steve| Steve|
// | 1.0| 1| Matt| Matt|
// | 1.0| 2| John| John|
// | 1.0| 3| null| John|
// +----+---------+-----+--------+
If you want to take the last value globally:
ORDER BY timestamp
.- Set unbounded frame.
- Take
last
ignoringnulls
:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// | id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0| 2| Adam| Steve|
// |-1.0| 4|Steve| Steve|
// | 1.0| 1| Matt| John|
// | 1.0| 2| John| John|
// | 1.0| 3| null| John|
// +----+---------+-----+--------+
Related Topics
Rails + Postgresql Ssl Decryption Failure
Comparison Operator in Pyspark (Not Equal/ !=)
Sql: How to Get All The Distinct Characters in a Column, Across All Rows
Get Total Row Count While Paging
Sql Query to Select Bottom 2 from Each Category
In How Many Languages Is Null Not Equal to Anything Not Even Null
Calling Shell Script from Pl/Sql, But Shell Gets Executed as Grid User, Not Oracle
Pl/Sql- Get Column Names from a Query
Query to Find All Fk Constraints and Their Delete Rules (Sql Server)
Counter_Cache Has_Many_Through SQL Optimisation, Reduce Number of SQL Queries
Insert and Update a Record Using Cursors in Oracle
T-Sql Stop or Abort Command in SQL Server
Could Not Obtain Information About Windows Nt Group User
Postgres Case in Order by Using an Alias
How to Get Array/Bag of Elements from Hive Group by Operator