Sparksql - Lag Function

SparkSQL - Lag function?

  1. Frame specification should start with a keyword ROWS not ROW
  2. Frame specification requires either lower bound value

    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

    or UNBOUNDED keyword

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  3. LAG function doesn't accept frame at all so a correct SQL query with lag can look like this

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
    PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
    ) as prev_amt from tx

Edit:

Regarding SQL DSL usage:

  1. As you can read in an error message

    Note that, using window functions currently requires a HiveContex

    Be sure to initialize sqlContext using HiveContext not SQLContext

  2. windowSpec.rowsBetween(-1, 0) does nothing, but once again frame specification is not supported by the lag function.

spark sql window function lag

You are doing correctly all you missed is over(window expression) on lag

val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")

val w = org.apache.spark.sql.expressions.Window.orderBy("date")

import org.apache.spark.sql.functions.lag

val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))

leadDf.show()

+------+------+-------+
| date|volume|new_col|
+------+------+-------+
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|
+------+------+-------+

This code was run on Spark shell 2.0.2

Dynamic/Variable Offset in SparkSQL Lead/Lag function

You can add a row number column, and do a self join based on the row number and offset, e.g.:

val df = sampleData.withColumn("rn", row_number().over(window))

val df2 = df.alias("t1").join(
df.alias("t2"),
expr("t1.rn = t2.rn + t1.ColumnForOffset"),
"left"
).selectExpr("t1.*", "t2.Salary as LastSalary")

df2.show
+------+---------+------+---------------+---+----------+
| Name| Role|Salary|ColumnForOffset| rn|LastSalary|
+------+---------+------+---------------+---+----------+
| bob|Developer|125000| 2| 1| null|
| mark|Developer|108000| 3| 2| null|
| peter|Developer|185000| 2| 3| 125000|
| simon|Developer| 98000| 2| 4| 108000|
| eric|Developer|144000| 3| 5| 108000|
| henry|Developer|110000| 2| 6| 98000|
| carl| Tester| 70000| 3| 7| 98000|
| jon| Tester| 65000| 1| 8| 70000|
| roman| Tester| 82000| 1| 9| 65000|
|carlos| Tester| 75000| 2| 10| 65000|
+------+---------+------+---------------+---+----------+

If or case conditions in Spark SQL lag function to get last not null lag

The standard lag() function has an ignore nulls option:

select Id, date,
lag(date ignore nulls) over (PARTITION BY id order by date) as last_date
from mytable;

But not all databases support this. You can emulate it with a subquery:

select Id, date,
min(date) over (partition by id, grp order by date) as last_date
from (select t.*,
count(date) over (partition by id order by date) as grp
from mytable t
) t

Spark Conditional Lag Function over Window

You have to specify the Window function as per your purpose. You may need to use the lag function twice.

import org.apache.spark.sql.expressions.Window

val dW = Window.partitionBy("id", "bin", "hour").orderBy("date")
val hW = Window.partitionBy("id", "bin", "date").orderBy("hour")

df.withColumn("yesterdaySameHour", lag("label", 1, 0.0).over(dW))
.withColumn("todayPreviousHour", lag("label", 1, 0.0).over(hW))
.withColumn("yestedayPreviousHour", lag(lag("label", 1, 0.0).over(dW), 1, 0.0).over(hW))
.orderBy("date", "hour", "bin")
.show(false)

This will give you the result:

+----------+----+---+---+-----+-----------------+-----------------+--------------------+
|date |hour|id |bin|label|yesterdaySameHour|todayPreviousHour|yestedayPreviousHour|
+----------+----+---+---+-----+-----------------+-----------------+--------------------+
|2019_12_19|7 |1 |0 |-1 |0 |0 |0 |
|2019_12_19|7 |1 |2 |-2 |0 |0 |0 |
|2019_12_19|7 |1 |3 |-3 |0 |0 |0 |
|2019_12_19|8 |1 |0 |1 |0 |-1 |0 |
|2019_12_19|8 |1 |2 |2 |0 |-2 |0 |
|2019_12_19|8 |1 |3 |3 |0 |-3 |0 |
|2019_12_20|7 |1 |0 |4 |-1 |0 |0 |
|2019_12_20|7 |1 |2 |5 |-2 |0 |0 |
|2019_12_20|7 |1 |3 |6 |-3 |0 |0 |
|2019_12_20|8 |1 |0 |7 |1 |4 |-1 |
|2019_12_20|8 |1 |2 |8 |2 |5 |-2 |
|2019_12_20|8 |1 |3 |9 |3 |6 |-3 |
+----------+----+---+---+-----+-----------------+-----------------+--------------------+

Use lag in spark sql within case statement

This can be accomplished with a running sum.

select Item,
Stay,
sum(case when Stay > 8600 then 1 else 0 end) over(partition by item order by ts) as seq_group
from tableA


Related Topics



Leave a reply



Submit