SparkSQL - Lag function?
- Frame specification should start with a keyword
ROWS
notROW
Frame specification requires either lower bound value
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
or
UNBOUNDED
keywordROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
LAG
function doesn't accept frame at all so a correct SQL query with lag can look like thisSELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
) as prev_amt from tx
Edit:
Regarding SQL DSL usage:
As you can read in an error message
Note that, using window functions currently requires a HiveContex
Be sure to initialize
sqlContext
usingHiveContext
notSQLContext
windowSpec.rowsBetween(-1, 0)
does nothing, but once again frame specification is not supported by thelag
function.
spark sql window function lag
You are doing correctly all you missed is over(window expression)
on lag
val df = sc.parallelize(Seq((201601, 100.5),
(201602, 120.6),
(201603, 450.2),
(201604, 200.7),
(201605, 121.4))).toDF("date", "volume")
val w = org.apache.spark.sql.expressions.Window.orderBy("date")
import org.apache.spark.sql.functions.lag
val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w))
leadDf.show()
+------+------+-------+
| date|volume|new_col|
+------+------+-------+
|201601| 100.5| 0.0|
|201602| 120.6| 100.5|
|201603| 450.2| 120.6|
|201604| 200.7| 450.2|
|201605| 121.4| 200.7|
+------+------+-------+
This code was run on Spark shell 2.0.2
Dynamic/Variable Offset in SparkSQL Lead/Lag function
You can add a row number column, and do a self join based on the row number and offset, e.g.:
val df = sampleData.withColumn("rn", row_number().over(window))
val df2 = df.alias("t1").join(
df.alias("t2"),
expr("t1.rn = t2.rn + t1.ColumnForOffset"),
"left"
).selectExpr("t1.*", "t2.Salary as LastSalary")
df2.show
+------+---------+------+---------------+---+----------+
| Name| Role|Salary|ColumnForOffset| rn|LastSalary|
+------+---------+------+---------------+---+----------+
| bob|Developer|125000| 2| 1| null|
| mark|Developer|108000| 3| 2| null|
| peter|Developer|185000| 2| 3| 125000|
| simon|Developer| 98000| 2| 4| 108000|
| eric|Developer|144000| 3| 5| 108000|
| henry|Developer|110000| 2| 6| 98000|
| carl| Tester| 70000| 3| 7| 98000|
| jon| Tester| 65000| 1| 8| 70000|
| roman| Tester| 82000| 1| 9| 65000|
|carlos| Tester| 75000| 2| 10| 65000|
+------+---------+------+---------------+---+----------+
If or case conditions in Spark SQL lag function to get last not null lag
The standard lag()
function has an ignore nulls
option:
select Id, date,
lag(date ignore nulls) over (PARTITION BY id order by date) as last_date
from mytable;
But not all databases support this. You can emulate it with a subquery:
select Id, date,
min(date) over (partition by id, grp order by date) as last_date
from (select t.*,
count(date) over (partition by id order by date) as grp
from mytable t
) t
Spark Conditional Lag Function over Window
You have to specify the Window
function as per your purpose. You may need to use the lag
function twice.
import org.apache.spark.sql.expressions.Window
val dW = Window.partitionBy("id", "bin", "hour").orderBy("date")
val hW = Window.partitionBy("id", "bin", "date").orderBy("hour")
df.withColumn("yesterdaySameHour", lag("label", 1, 0.0).over(dW))
.withColumn("todayPreviousHour", lag("label", 1, 0.0).over(hW))
.withColumn("yestedayPreviousHour", lag(lag("label", 1, 0.0).over(dW), 1, 0.0).over(hW))
.orderBy("date", "hour", "bin")
.show(false)
This will give you the result:
+----------+----+---+---+-----+-----------------+-----------------+--------------------+
|date |hour|id |bin|label|yesterdaySameHour|todayPreviousHour|yestedayPreviousHour|
+----------+----+---+---+-----+-----------------+-----------------+--------------------+
|2019_12_19|7 |1 |0 |-1 |0 |0 |0 |
|2019_12_19|7 |1 |2 |-2 |0 |0 |0 |
|2019_12_19|7 |1 |3 |-3 |0 |0 |0 |
|2019_12_19|8 |1 |0 |1 |0 |-1 |0 |
|2019_12_19|8 |1 |2 |2 |0 |-2 |0 |
|2019_12_19|8 |1 |3 |3 |0 |-3 |0 |
|2019_12_20|7 |1 |0 |4 |-1 |0 |0 |
|2019_12_20|7 |1 |2 |5 |-2 |0 |0 |
|2019_12_20|7 |1 |3 |6 |-3 |0 |0 |
|2019_12_20|8 |1 |0 |7 |1 |4 |-1 |
|2019_12_20|8 |1 |2 |8 |2 |5 |-2 |
|2019_12_20|8 |1 |3 |9 |3 |6 |-3 |
+----------+----+---+---+-----+-----------------+-----------------+--------------------+
Use lag in spark sql within case statement
This can be accomplished with a running sum.
select Item,
Stay,
sum(case when Stay > 8600 then 1 else 0 end) over(partition by item order by ts) as seq_group
from tableA
Related Topics
Performance of Regexp_Replace VS Translate in Oracle
Why Are Dot-Separated Prefixes Ignored in the Column List for Insert Statements
Returning the Value of Identity Column After Insertion in Oracle
Calling a Stored Procedure Within a Stored Procedure
Count Values for Every Column in a Table
Transpose a Row into Columns with MySQL Without Using Unions
Dbms_Metadata.Get_Ddl Not Working
SQL - Returning All Rows Even If Count Is Zero for Item
Get Count of Items and Their Values in One Column
Escaping Command Parameters Passed to Xp_Cmdshell to Dtexec
Temporary Table Record Limit in SQL Server
Connect to Remote SQL Database Using Excel
Delete Duplicate Record from Same Table in MySQL
Is It Possible for Me to Include a Sub Report in a Tablix Row That Is Grouped by an Id
SQL Server Compact Edition Isnull(Sth, ' ') Returns a Boolean Value