Pyspark/Spark Window Function First/ Last Issue

Spark first Window function is taking much longer than last

The solution that doesn't answer the question

In trying various things to speed up my routine, it occurred to me to try re-rewriting my usages of first() to just be usages of last() with a reversed sort order.

So rewriting this:

win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy('rank').rowsBetween(0, Window.unboundedFollowing))

df_part2 = (df_part1
.withColumn('next_rank', F.first(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.first(F.col('scale_factor'), ignorenulls=True).over(win_next))
)

As this:

win_next = (Window.partitionBy('PORT_TYPE', 'loss_process')
.orderBy(F.desc('rank')).rowsBetween(Window.unboundedPreceding, 0))

df_part2 = (df_part1
.withColumn('next_rank', F.last(F.col('rank'), ignorenulls=True).over(win_next))
.withColumn('next_sf', F.last(F.col('scale_factor'), ignorenulls=True).over(win_next))
)

Much to my amazement, this actually solved the performance problem, and now the entire dataframe is generated in just 3 seconds. I'm pleased, but still vexed.

As I somewhat predicted, the query plan now includes a new SORT step before creating these next two columns, and they've changed from Window to RunningWindowFunction as the first two. Here's the new plan (without the code broken up into 3 separate cached parts anymore, because that was just to troubleshoot performance):
Sample Image

As for the question:

Why do my calls to first() over Window.unboundedFollowing take so much longer than last() over Window.unboundedPreceding?

I'm hoping someone can still answer this, for academic reasons

How to use first and last function in pyspark?

Try inverting the sort order using .desc() and then first() will give the desired output.

w2 = Window().partitionBy("k").orderBy(df.v.desc())
df.select(F.col("k"), F.first("v",True).over(w2).alias('v')).show()
F.first("v",True).over(w2).alias('v').show()

Outputs:

+---+---+
| k| v|
+---+---+
| b| 3|
| b| 3|
| a| 1|
| a| 1|
| a| 1|
+---+---+

You should also be careful about partitionBy vs. orderBy. Since you are partitioning by 'k', all of the values of k in any given window are the same. Sorting by 'k' does nothing.

The last function is not really the opposite of first, in terms of which item from the window it returns. It returns the last non-null, value it has seen, as it progresses through the ordered rows.

To compare their effects, here is a dataframe with both function/ordering combinations. Notice how in column 'last_w2', the null value has been replaced by -1.

df = spark.sparkContext.parallelize([
("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)]).toDF(["k", "v"])

#create two windows for comparison.
w = Window().partitionBy("k").orderBy('v')
w2 = Window().partitionBy("k").orderBy(df.v.desc())

df.select('k','v',
F.first("v",True).over(w).alias('first_w1'),
F.last("v",True).over(w).alias('last_w1'),
F.first("v",True).over(w2).alias('first_w2'),
F.last("v",True).over(w2).alias('last_w2')
).show()

Output:

+---+----+--------+-------+--------+-------+
| k| v|first_w1|last_w1|first_w2|last_w2|
+---+----+--------+-------+--------+-------+
| b| 1| 1| 1| 3| 1|
| b| 3| 1| 3| 3| 3|
| a|null| null| null| 1| -1|
| a| -1| -1| -1| 1| -1|
| a| 1| -1| 1| 1| 1|
+---+----+--------+-------+--------+-------+

Window function acts not as expected when I use Order By (PySpark)

The simple reason is that the default window range/row spec is Window.UnboundedPreceding to Window.CurrentRow, which means that the max is taken from the first row in that partition to the current row, NOT the last row of the partition.

This is a common gotcha. (you can replace .max() with sum() and see what output you get. It also changes depending on how you order the partition.)

To solve this, you can specify that you want the max of each partition to always be calculated using the full window partition, like so:

window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

revenue_difference = F.max(df['REVENUE']).over(window_spec)

df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
+----------+-------+------------------+

problem in using last function in pyspark

Change your window to this.

my_window = Window.partitionBy('number').orderBy(df['date']).rowsBetween(Window.currentRow, Window.unboundedFollowing)

Your window imposes the rows between the first row to the current and in this case, the last is the same as the current.

Spark : need confirmation on approach in capturing first and last date : on dataset

The third approach you propose will work every time. You could also write it like this:

df
.groupBy('A', 'B', 'C', 'D')
.agg(F.min('WEEK').alias('min_week'), F.max('WEEK').alias('max_week'),
F.min('MONTH').alias('min_month'), F.max('MONTH').alias('max_month'))
.show()

which yields:

+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D|min_week|max_week|min_month|max_month|
+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D| 201701| 201901| 2020001| 2020003|
+---+---+---+---+--------+--------+---------+---------+

It is interesting to understand why the first two approaches produce unpredictable results while the third always works.

The second approach is unpredictable because spark is a parallel computation engine. When it aggregates a value, it starts by aggregating the value in all the partitions and then the results will be aggregated two by two. Yet the order of these aggregations is not deterministic. It depends among other things on the order of completion of the tasks which can change at every attempt, in particular if there is a lot of data.

The first approach is not exactly what you want to do. Window functions will not aggregate the dataframe into one single row. They will compute the aggregation and add it to every row. You are also making several mistakes. If you order the dataframe, by default spark considers windows ranging from the start of the window to the current row. Therefore the maximum will be the current row for the week. In fact, to compute the in and the max, you do not need to order the dataframe. You can just do it like this:

w = Window.partitionBy('A','B', 'C', 'D')
df.select('A', 'B', 'C', 'D',
F.min('WEEK').over(w).alias('min_week'),
F.max('WEEK').over(w).alias('max_week'),
F.min('MONTH').over(w).alias('min_month'),
F.max('MONTH').over(w).alias('max_month')
).show()

which yields the correct result but that was not what you were expecting. But at least, you see the difference between window aggregations and regular aggregations.

+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D|min_week|max_week|min_month|max_month|
+---+---+---+---+--------+--------+---------+---------+
| A| B| C| D| 201701| 201901| 2020001| 2020003|
| A| B| C| D| 201701| 201901| 2020001| 2020003|
| A| B| C| D| 201701| 201901| 2020001| 2020003|
+---+---+---+---+--------+--------+---------+---------+

PySpark subtract last row from first row in a group

Code below

  w=Window.partitionBy('ID').orderBy('col1').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('out', last('col1').over(w)-first('col1').over(w)).show()

Get the last value using spark window function

If you want to propagate the last known value (it is not the same as logic used with join) you should:

  • ORDER BY timestamp.
  • Take last ignoring nulls:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")

df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// | id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0| 2| Adam| Adam|
// |-1.0| 4|Steve| Steve|
// | 1.0| 1| Matt| Matt|
// | 1.0| 2| John| John|
// | 1.0| 3| null| John|
// +----+---------+-----+--------+

If you want to take the last value globally:

  • ORDER BY timestamp.
  • Set unbounded frame.
  • Take last ignoring nulls:
val partitionWindow = Window.partitionBy($"id").orderBy($"timestamp")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("lastName", last("name", true) over (partitionWindow)).show
// +----+---------+-----+--------+
// | id|timestamp| name|lastName|
// +----+---------+-----+--------+
// |-1.0| 2| Adam| Steve|
// |-1.0| 4|Steve| Steve|
// | 1.0| 1| Matt| John|
// | 1.0| 2| John| John|
// | 1.0| 3| null| John|
// +----+---------+-----+--------+


Related Topics



Leave a reply



Submit