Sparkr Window Function

SparkR window function

Spark 2.0.0+

SparkR provides DSL wrappers with over, window.partitionBy / partitionBy, window.orderBy / orderBy and rowsBetween / rangeBeteen functions.

Spark <= 1.6

Unfortunately it is not possible in 1.6.0. While some window functions, including lag, have been implemented SparkR doesn't support window definitions yet which renders these completely useless.

As long as SPARK-11395 is not resolved the only option is to use raw SQL:

set.seed(1)

hc <- sparkRHive.init(sc)
sdf <- createDataFrame(hc, data.frame(x=1:12, y=1:3, z=rnorm(12)))
registerTempTable(sdf, "sdf")

sql(hc, "SELECT x, y, z, LAG(z) OVER (PARTITION BY y ORDER BY x) FROM sdf") %>%
head()

## x y z _c3
## 1 1 1 -0.6264538 NA
## 2 4 1 1.5952808 -0.6264538
## 3 7 1 0.4874291 1.5952808
## 4 10 1 -0.3053884 0.4874291
## 5 2 2 0.1836433 NA
## 6 5 2 0.3295078 0.1836433

Assuming that the corresponding PR will be merged without significant changes window definition and example query should look as follows:

w <- Window.partitionBy("y") %>% orderBy("x")
select(sdf, over(lag(sdf$z), w))

Calling spark window functions in R using sparklyr

You can pipe the sql directly.

mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
dplyr::mutate(test = paste0(gear, " ",carb)) %>%
dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))

It is worth noting here that this is a rare case and other window functions are supported in sparklyr. If you wanted just the count or a min(gear) partitioned by cyl you could do that easily.

mtcars_spk <- copy_to(sc, mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk <- mtcars_spk %>%
group_by(cyl) %>%
arrange(cyl) %>%
mutate(cnt = count()
,mindis= min(disp)

Linking in similar threads:

  • CountDistinct in sparklyr
  • CountDistinct vs approx_count_distinct

Why are Window functions (Lag) not working in SparkR?

Lets prepare an input data.frame using freeny dataset.

ldf <- freeny

# Extract year and quater
ldf$yr <- as.integer(rownames(ldf))
ldf$qr <- as.integer(4 * (as.numeric(rownames(ldf)) - ldf$yr))

# Clean column names
colnames(ldf) <- gsub("\\.", "_", colnames(ldf))

# Drop a couple of things so output fits nicely in the code box
row.names(ldf) <- NULL
ldf$market_potential <- NULL

head(ldf)

## y lag_quarterly_revenue price_index income_level yr qr
## 1 8.79236 8.79636 4.70997 5.82110 1962 1
## 2 8.79137 8.79236 4.70217 5.82558 1962 2
## 3 8.81486 8.79137 4.68944 5.83112 1962 3
## 4 8.81301 8.81486 4.68558 5.84046 1963 0
## 5 8.90751 8.81301 4.64019 5.85036 1963 1
## 6 8.93673 8.90751 4.62553 5.86464 1963 2

Another suggestion which I found was to use a Hivecontext rather than a SQLcontext as SQLcontext might not allow all functionalities.

This is correct, most of the advanced functions is supported only by the HiveContext, while a default one is SQLContext. First of all you have to make sure that your Spark version has been build with Hive support. It is true about binaries available from Spark downloads page but if you build from source you be sure to use -Phive flag.

hiveContext <- sparkRHive.init(sc)
sdf <- createDataFrame(hiveContext, ldf)
printSchema(sdf)

## root
## |-- y: double (nullable = true)
## |-- lag_quarterly_revenue: double (nullable = true)
## |-- price_index: double (nullable = true)
## |-- income_level: double (nullable = true)
## |-- yr: integer (nullable = true)
## |-- qr: integer (nullable = true)

initialize sqlContext and register the data frame as a temp table using Registertemptable

Thats right as well. To be able to use sql command you have register a table.

registerTempTable(sdf, "sdf")

Remember that DataFrame is bound to context which has been used to create it.

head(tables(hiveContext))

## tableName isTemporary
## 1 sdf TRUE

head(tables(sqlContext))

## [1] tableName isTemporary
## <0 rows> (or 0-length row.names)

Finally example query:

query <- "SELECT yr, qr, y, lag_quarterly_revenue AS old_lag,
LAG(y) OVER (ORDER BY yr, qr) AS new_lag
FROM sdf"

sql(hiveContext, query)

## yr qr y old_lag new_lag
## 1 1962 1 8.79236 8.79636 NA
## 2 1962 2 8.79137 8.79236 8.79236
## 3 1962 3 8.81486 8.79137 8.79137
## 4 1963 0 8.81301 8.81486 8.81486
## 5 1963 1 8.90751 8.81301 8.81301
## 6 1963 2 8.93673 8.90751 8.90751

Spark and SparkSQL: How to imitate window function?

You can do this with RDDs. Personally I find the API for RDDs makes a lot more sense - I don't always want my data to be 'flat' like a dataframe.

val df = sqlContext.sql("select 1, '2015-09-01'"
).unionAll(sqlContext.sql("select 2, '2015-09-01'")
).unionAll(sqlContext.sql("select 1, '2015-09-03'")
).unionAll(sqlContext.sql("select 1, '2015-09-04'")
).unionAll(sqlContext.sql("select 2, '2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd
// grouping by the first column of the row
.groupBy(r => r(0))
// map each group - an Iterable[Row] - to a list and sort by the second column
.map(g => g._2.toList.sortBy(row => row(1).toString))
.collect()

The above gives a result like the following:

Array[List[org.apache.spark.sql.Row]] = 
Array(
List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]),
List([2,2015-09-01], [2,2015-09-04]))

If you want the position within the 'group' as well, you can use zipWithIndex.

df.rdd.groupBy(r => r(0)).map(g => 
g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
List(([2,2015-09-01],0), ([2,2015-09-04],1)))

You could flatten this back to a simple List/Array of Row objects using FlatMap, but if you need to perform anything on the 'group' that won't be a great idea.

The downside to using RDD like this is that it's tedious to convert from DataFrame to RDD and back again.

How to use window functions in PySpark?

To be able to use window function you have to create a window first. Definition is pretty much the same as for normal SQL it means you can define either order, partition or both. First lets create some dummy data:

import numpy as np
np.random.seed(1)

keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])

df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])

Make sure you're using HiveContext (Spark < 2.0 only):

from pyspark.sql import HiveContext

assert isinstance(sqlContext, HiveContext)

Create a window:

from pyspark.sql.window import Window

w = Window.partitionBy(df.k).orderBy(df.v)

which is equivalent to

(PARTITION BY k ORDER BY v) 

in SQL.

As a rule of thumb window definitions should always contain PARTITION BY clause otherwise Spark will move all data to a single partition. ORDER BY is required for some functions, while in different cases (typically aggregates) may be optional.

There are also two optional which can be used to define window span - ROWS BETWEEN and RANGE BETWEEN. These won't be useful for us in this particular scenario.

Finally we can use it for a query:

from pyspark.sql.functions import percentRank, ntile

df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)

Note that ntile is not related in any way to the quantiles.

Moving Average in SparkR

I managed to solve this using SparkR window functions. I'm using Spark 2.0 btw.

set.seed(123)

#generate poisson distribution for easy checking, with lambda = 15
n <- 1000
orderingColumn = seq(1,n)
data = rpois(n, 15)
df <- data.frame(orderingColumn, data)

#Create sparkdf
sdf <- as.DataFrame(df);

#Moving average
ws <- windowOrderBy(sdf$orderingColumn)
frame <- rowsBetween(ws, -100, 0) #100 observations back included in average
sdfWithMa <- withColumn(sdf, "moving_average", over(avg(sdf$data), frame))

head(sdfWithMa, 100)

One thing to be aware of with above is that Spark will attempt to load all the data into a single partition so it can be slow over large data sets, unfortunately. I wish the underlying implementation was different, though I understand that calculating sliding windows on ordered data is difficult on any system where the data is distributed.

If you are lucky enough that your moving average can be run on partitions of the data then you can change your window:

ws <- orderBy(windowPartitionBy("my_partition_column"), sdf$orderingColumn)


Related Topics



Leave a reply



Submit