SparkR window function
Spark 2.0.0+
SparkR provides DSL wrappers with over
, window.partitionBy
/ partitionBy
, window.orderBy
/ orderBy
and rowsBetween
/ rangeBeteen
functions.
Spark <= 1.6
Unfortunately it is not possible in 1.6.0. While some window functions, including lag
, have been implemented SparkR doesn't support window definitions yet which renders these completely useless.
As long as SPARK-11395 is not resolved the only option is to use raw SQL:
set.seed(1)
hc <- sparkRHive.init(sc)
sdf <- createDataFrame(hc, data.frame(x=1:12, y=1:3, z=rnorm(12)))
registerTempTable(sdf, "sdf")
sql(hc, "SELECT x, y, z, LAG(z) OVER (PARTITION BY y ORDER BY x) FROM sdf") %>%
head()
## x y z _c3
## 1 1 1 -0.6264538 NA
## 2 4 1 1.5952808 -0.6264538
## 3 7 1 0.4874291 1.5952808
## 4 10 1 -0.3053884 0.4874291
## 5 2 2 0.1836433 NA
## 6 5 2 0.3295078 0.1836433
Assuming that the corresponding PR will be merged without significant changes window definition and example query should look as follows:
w <- Window.partitionBy("y") %>% orderBy("x")
select(sdf, over(lag(sdf$z), w))
Calling spark window functions in R using sparklyr
You can pipe the sql directly.
mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
dplyr::mutate(test = paste0(gear, " ",carb)) %>%
dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))
It is worth noting here that this is a rare case and other window functions are supported in sparklyr. If you wanted just the count or a min(gear) partitioned by cyl you could do that easily.
mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk <- mtcars_spk %>%
group_by(cyl) %>%
arrange(cyl) %>%
mutate(cnt = count()
,mindis= min(disp)
Linking in similar threads:
- CountDistinct in sparklyr
- CountDistinct vs approx_count_distinct
Why are Window functions (Lag) not working in SparkR?
Lets prepare an input data.frame
using freeny
dataset.
ldf <- freeny
# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))
# Clean column names
colnames(ldf) <- gsub("\\.", "_", colnames(ldf))
# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL
head(ldf)
## y lag_quarterly_revenue price_index income_level yr qr
## 1 8.79236 8.79636 4.70997 5.82110 1962 1
## 2 8.79137 8.79236 4.70217 5.82558 1962 2
## 3 8.81486 8.79137 4.68944 5.83112 1962 3
## 4 8.81301 8.81486 4.68558 5.84046 1963 0
## 5 8.90751 8.81301 4.64019 5.85036 1963 1
## 6 8.93673 8.90751 4.62553 5.86464 1963 2
Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.
This is correct, most of the advanced functions is supported only by the HiveContext
, while a default one is SQLContext
. First of all you have to make sure that your Spark version has been build with Hive support. It is true about binaries available from Spark downloads page but if you build from source you be sure to use -Phive
flag.
hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)
## root
## |-- y: double (nullable = true)
## |-- lag_quarterly_revenue: double (nullable = true)
## |-- price_index: double (nullable = true)
## |-- income_level: double (nullable = true)
## |-- yr: integer (nullable = true)
## |-- qr: integer (nullable = true)
initialize sqlContext and register the data frame as a temp table using Registertemptable
Thats right as well. To be able to use sql
command you have register a table.
registerTempTable(sdf, "sdf")
Remember that DataFrame is bound to context which has been used to create it.
head(tables(hiveContext))
## tableName isTemporary
## 1 sdf TRUE
head(tables(sqlContext))
## [1] tableName isTemporary
## <0 rows> (or 0-length row.names)
Finally example query:
query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
LAG(y) OVER (ORDER BY yr, qr) AS new_lag
FROM sdf"
sql(hiveContext, query)
## yr qr y old_lag new_lag
## 1 1962 1 8.79236 8.79636 NA
## 2 1962 2 8.79137 8.79236 8.79236
## 3 1962 3 8.81486 8.79137 8.79137
## 4 1963 0 8.81301 8.81486 8.81486
## 5 1963 1 8.90751 8.81301 8.81301
## 6 1963 2 8.93673 8.90751 8.90751
Spark and SparkSQL: How to imitate window function?
You can do this with RDDs. Personally I find the API for RDDs makes a lot more sense - I don't always want my data to be 'flat' like a dataframe.
val df = sqlContext.sql("select 1, '2015-09-01'"
).unionAll(sqlContext.sql("select 2, '2015-09-01'")
).unionAll(sqlContext.sql("select 1, '2015-09-03'")
).unionAll(sqlContext.sql("select 1, '2015-09-04'")
).unionAll(sqlContext.sql("select 2, '2015-09-04'"))
// dataframe as an RDD (of Row objects)
df.rdd
// grouping by the first column of the row
.groupBy(r => r(0))
// map each group - an Iterable[Row] - to a list and sort by the second column
.map(g => g._2.toList.sortBy(row => row(1).toString))
.collect()
The above gives a result like the following:
Array[List[org.apache.spark.sql.Row]] =
Array(
List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]),
List([2,2015-09-01], [2,2015-09-04]))
If you want the position within the 'group' as well, you can use zipWithIndex
.
df.rdd.groupBy(r => r(0)).map(g =>
g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()
Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
List(([2,2015-09-01],0), ([2,2015-09-04],1)))
You could flatten this back to a simple List/Array of Row
objects using FlatMap, but if you need to perform anything on the 'group' that won't be a great idea.
The downside to using RDD like this is that it's tedious to convert from DataFrame to RDD and back again.
How to use window functions in PySpark?
To be able to use window function you have to create a window first. Definition is pretty much the same as for normal SQL it means you can define either order, partition or both. First lets create some dummy data:
import numpy as np
np.random.seed(1)
keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])
df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])
Make sure you're using HiveContext
(Spark < 2.0 only):
from pyspark.sql import HiveContext
assert isinstance(sqlContext, HiveContext)
Create a window:
from pyspark.sql.window import Window
w = Window.partitionBy(df.k).orderBy(df.v)
which is equivalent to
(PARTITION BY k ORDER BY v)
in SQL.
As a rule of thumb window definitions should always contain PARTITION BY
clause otherwise Spark will move all data to a single partition. ORDER BY
is required for some functions, while in different cases (typically aggregates) may be optional.
There are also two optional which can be used to define window span - ROWS BETWEEN
and RANGE BETWEEN
. These won't be useful for us in this particular scenario.
Finally we can use it for a query:
from pyspark.sql.functions import percentRank, ntile
df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)
Note that ntile
is not related in any way to the quantiles.
Moving Average in SparkR
I managed to solve this using SparkR window functions. I'm using Spark 2.0 btw.
set.seed(123)
#generate poisson distribution for easy checking, with lambda = 15
n <- 1000
orderingColumn = seq(1,n)
data = rpois(n, 15)
df <- data.frame(orderingColumn, data)
#Create sparkdf
sdf <- as.DataFrame(df);
#Moving average
ws <- windowOrderBy(sdf$orderingColumn)
frame <- rowsBetween(ws, -100, 0) #100 observations back included in average
sdfWithMa <- withColumn(sdf, "moving_average", over(avg(sdf$data), frame))
head(sdfWithMa, 100)
One thing to be aware of with above is that Spark will attempt to load all the data into a single partition so it can be slow over large data sets, unfortunately. I wish the underlying implementation was different, though I understand that calculating sliding windows on ordered data is difficult on any system where the data is distributed.
If you are lucky enough that your moving average can be run on partitions of the data then you can change your window:
ws <- orderBy(windowPartitionBy("my_partition_column"), sdf$orderingColumn)
Related Topics
R Shiny Widgetfunc() Warning Messages with Eventreactive(Warning 1) and Renderdatatable (Warning 2)
Does White Space Slow Down Processing
Significance Level Added to Matrix Correlation Heatmap Using Ggplot2
How to Clear an Na Flag for a Posix Value
Constroptim in R - Init Val Is Not in the Interior of the Feasible Region Error
Topic Models: Cross Validation with Loglikelihood or Perplexity
Adding Multiple Lag Variables Using Dplyr and for Loops
Calculate Percentage for Each Time Series Observations Per Group in R
How to Make Stacked Barplot with Ggplot2
How to Change Color in Shiny Dashboard
Getsymbols and Using Lapply, Cl, and Merge to Extract Close Prices
Splitting String Between Capital and Lowercase Character in R
Using User-Defined "For Loop" Function to Construct a Data Frame
Principal Components Analysis - How to Get the Contribution (%) of Each Parameter to a Prin.Comp
Remove Duplicate Rows of a Matrix or Dataframe