SparkSQL - Lag function?

  1. Frame specification should start with a keyword ROWS not ROW
  2. Frame specification requires either lower bound value


    or UNBOUNDED keyword

  3. LAG function doesn't accept frame at all so a correct SQL query with lag can look like this

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
    PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
    ) as prev_amt from tx


Regarding SQL DSL usage:

  1. As you can read in an error message

    Note that, using window functions currently requires a HiveContex

    Be sure to initialize sqlContext using HiveContext not SQLContext

  2. windowSpec.rowsBetween(-1, 0) does nothing, but once again frame specification is not supported by the lag function.

You are doing correctly all you missed is over(window expression) on lag

val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")

val w = org.apache.spark.sql.expressions.Window.orderBy("date")

import org.apache.spark.sql.functions.lag

val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))

| date|volume|new_col|
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|

This code was run on Spark shell 2.0.2

Dynamic/Variable Offset in SparkSQL Lead/Lag function

You can add a row number column, and do a self join based on the row number and offset, e.g.:

val df = sampleData.withColumn("rn", row_number().over(window))

val df2 = df.alias("t1").join(
expr("t1.rn = t2.rn + t1.ColumnForOffset"),
).selectExpr("t1.*", "t2.Salary as LastSalary")
| Name| Role|Salary|ColumnForOffset| rn|LastSalary|
| bob|Developer|125000| 2| 1| null|
| mark|Developer|108000| 3| 2| null|
| peter|Developer|185000| 2| 3| 125000|
| simon|Developer| 98000| 2| 4| 108000|
| eric|Developer|144000| 3| 5| 108000|
| henry|Developer|110000| 2| 6| 98000|
| carl| Tester| 70000| 3| 7| 98000|
| jon| Tester| 65000| 1| 8| 70000|
| roman| Tester| 82000| 1| 9| 65000|
|carlos| Tester| 75000| 2| 10| 65000|

If or case conditions in Spark SQL lag function to get last not null lag

The standard lag() function has an ignore nulls option:

select Id, date,
lag(date ignore nulls) over (PARTITION BY id order by date) as last_date
from mytable;

But not all databases support this. You can emulate it with a subquery:

select Id, date,
min(date) over (partition by id, grp order by date) as last_date
from (select t.*,
count(date) over (partition by id order by date) as grp
from mytable t
) t

Spark Conditional Lag Function over Window

You have to specify the Window function as per your purpose. You may need to use the lag function twice.

import org.apache.spark.sql.expressions.Window

val dW = Window.partitionBy("id", "bin", "hour").orderBy("date")
val hW = Window.partitionBy("id", "bin", "date").orderBy("hour")

df.withColumn("yesterdaySameHour", lag("label", 1, 0.0).over(dW))
.withColumn("todayPreviousHour", lag("label", 1, 0.0).over(hW))
.withColumn("yestedayPreviousHour", lag(lag("label", 1, 0.0).over(dW), 1, 0.0).over(hW))
.orderBy("date", "hour", "bin")

This will give you the result:

|date |hour|id |bin|label|yesterdaySameHour|todayPreviousHour|yestedayPreviousHour|
|2019_12_19|7 |1 |0 |-1 |0 |0 |0 |
|2019_12_19|7 |1 |2 |-2 |0 |0 |0 |
|2019_12_19|7 |1 |3 |-3 |0 |0 |0 |
|2019_12_19|8 |1 |0 |1 |0 |-1 |0 |
|2019_12_19|8 |1 |2 |2 |0 |-2 |0 |
|2019_12_19|8 |1 |3 |3 |0 |-3 |0 |
|2019_12_20|7 |1 |0 |4 |-1 |0 |0 |
|2019_12_20|7 |1 |2 |5 |-2 |0 |0 |
|2019_12_20|7 |1 |3 |6 |-3 |0 |0 |
|2019_12_20|8 |1 |0 |7 |1 |4 |-1 |
|2019_12_20|8 |1 |2 |8 |2 |5 |-2 |
|2019_12_20|8 |1 |3 |9 |3 |6 |-3 |

Use lag in spark sql within case statement

This can be accomplished with a running sum.

select Item,
sum(case when Stay > 8600 then 1 else 0 end) over(partition by item order by ts) as seq_group
from tableA

