Sparklyr: How to Center a Spark Table Based on Column

Sparklyr: how to center a Spark table based on column?

You just use mutate_each / muate_all

library(dplyr)

df <- data.frame(x=c(1, 2, 3), y = c(-4, 5, 6), z = c(42, 42, 42))
sdf <- copy_to(sc, df, overwrite=TRUE)

mutate_all(sdf, funs(. - mean(.)))

Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

but it looks like it is expanded to a really inefficient (unacceptable for large datasets) window function application. You could be better with more verbose solution:

avgs <- summarize_all(sdf, funs(mean)) %>% as.data.frame()

exprs <- as.list(paste(colnames(sdf),"-", avgs))

sdf %>%
spark_dataframe() %>%
invoke("selectExpr", exprs) %>%
invoke("toDF", as.list(colnames(sdf))) %>%
invoke("registerTempTable", "centered")

tbl(sc, "centered")
Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

It is not as pretty as dplyr approach but unlike the former one does a sensible thing.

If you want to skip all the invokes you can use dplyr to the same thing:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf)))
Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

Execution plans:

A helper function (see also dbplyr::remote_query for physical plan):

optimizedPlan <- function(df) {
df %>%
spark_dataframe() %>%
invoke("queryExecution") %>%
invoke("optimizedPlan")
}

dplyr version:

mutate_all(sdf, funs(. - mean(.))) %>% optimizedPlan()
<jobj[190]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [x#2877, y#2878, (z#1123 - _we0#2894) AS z#2879]
+- Window [avg(z#1123) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2894]
+- Project [x#2877, (y#1122 - _we0#2892) AS y#2878, z#1123]
+- Window [avg(y#1122) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2892]
+- Project [(x#1121 - _we0#2890) AS x#2877, z#1123, y#1122]
+- Window [avg(x#1121) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2890]
+- Project [y#1122, z#1123, x#1121]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Spark solution:

tbl(sc, "centered") %>% optimizedPlan()
<jobj[204]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#2339, (y#1122 - 2.33333333333333) AS y#2340, (z#1123 - 42.0) AS z#2341]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

dplyr optimized:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf))) %>% optimizedPlan()
<jobj[272]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#4792, (y#1122 - 2.33333333333333) AS y#4793, (z#1123 - 42.0) AS z#4794]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Notes:

Spark SQL is not that good in handling wide datasets. With core Spark you usually combine features into a single Vector Column and Spark provides a number of transformers which can be used to operate on Vector data.

Using mutate_each to scale and center variables in sparklyr

Thoughts on what's going wrong here.

It is essentially the same problem as described in Sparklyr: how to center a Spark table based on column? - aggregation functions used in mutate are expanded to global (without PARTITION BY clause) window functions making this approach completely useless in practice.

I'm surprised there isn't a build in function for centering and scaling.

Well, in general Spark operates using ML Transformers, a bunch of which, has been ported to sparklyr. These can be distinguished by ft_ prefix. Unfortunately StandardScaler and MinMaxScaler are not ported yet. It is not that hard to implement your own interface though.

If you want a quick that can work on the data as-is:

library(rlang)
library(glue)

# Compute all the stats at once
stats <- dat.out %>% summarise_all(funs(avg, min, max)) %>% collect()

# Separate stats into components
cols <- dat.out %>% colnames()
avgs <- stats %>% select(ends_with("avg")) %>% unlist
mins <- stats %>% select(ends_with("min")) %>% unlist
maxs <- stats %>% select(ends_with("max")) %>% unlist

# Create expressions
exprs <- glue("({cols} - {avgs}) / ({maxs} - {mins})") %>%
setNames(cols) %>%
lapply(parse_quosure)

dat.out %>% mutate(!!! exprs)

Credits once again go to Artem Sokolov (dplyr 0.7 equivalent for deprecated mutate_).

Note:

Don't use . with functions which are to be used with sparklyr. dplyr will try to match these as database functions in a "prefix" database and fail or produce unintended results.

Sparklyr: how to explode a list column into their own columns in Spark table?

Let's say your data look like this

library(dplyr)
library(sparklyr)

df <- data.frame(text = c("1.0,2.0,3.0", "4.0,5.0,6.0"))
sdf <- copy_to(sc, df, "df", overwrite = TRUE)

and you've already created a spark_connection you can do following

n <- 3

# There is no function syntax for array access in Hive
# so we have to build [] expressions
# CAST(... AS double) could be handled in sparklyr / dplyr with as.numeric
exprs <- lapply(
0:(n - 1),
function(i) paste("CAST(bits[", i, "] AS double) AS x", i, sep=""))

sdf %>%
# Convert to Spark DataFrame
spark_dataframe() %>%
# Use expression with split and explode
invoke("selectExpr", list("split(text, ',') AS bits")) %>%
# Select individual columns
invoke("selectExpr", exprs) %>%
# Register table in the metastore ("registerTempTable" in Spark 1.x)
invoke("createOrReplaceTempView", "exploded_df")

And use dplyr::tbl to get back sparklyr object:

tbl(sc, "exploded_df")
Source:   query [2 x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE

x0 x1 x2
<dbl> <dbl> <dbl>
1 1 2 3
2 4 5 6

In the latest versions you can also use sdf_separate_column:

sdf %>% 
mutate(text=split(text, ",")) %>%
sdf_separate_column("text", paste0("x", 0:2))
# Source:   table<sparklyr_tmp_87125f13b89> [?? x 4]
# Database: spark_connection
text x0 x1 x2
<list> <chr> <chr> <chr>
1 <list [3]> 1.0 2.0 3.0
2 <list [3]> 4.0 5.0 6.0

Gather in sparklyr

Here's a function to mimic gather in sparklyr. This would gather the given columns while keeping everything else intact, but it can easily be extended if required.

# Function
sdf_gather <- function(tbl, gather_cols){

other_cols <- colnames(tbl)[!colnames(tbl) %in% gather_cols]

lapply(gather_cols, function(col_nm){
tbl %>%
select(c(other_cols, col_nm)) %>%
mutate(key = col_nm) %>%
rename(value = col_nm)
}) %>%
sdf_bind_rows() %>%
select(c(other_cols, 'key', 'value'))
}

# Example
spark_df %>%
select(col_1, col_2, col_3, col_4) %>%
sdf_gather(c('col_3', 'col_4'))

Sparklyr: how to calculate correlation coefficient between 2 Spark tables?

Personally I would solve it by going back to the input dataset. Just for the record the input data has been loaded using CSV reader:

df <- spark_read_csv(
sc, path = path, name = "simData", delimiter = " ",
header = "false", infer_schema = "false"
) %>% rename(y = `_c0`, xs = `_c1`)

and looks more or less like this:

      y                                                   xs
<chr> <chr>
1 21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665
2 35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823
3 15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842

Now instead of splitting data into mutlitple tables let's process both part together:

exprs <- lapply(
0:(n - 1),
function(i) paste("CAST(xs[", i, "] AS double) AS x", i, sep=""))

df %>%
# Convert to native Spark
spark_dataframe() %>%
# Split and select xs, but retain y
invoke("selectExpr", list("y", "split(xs, ',') AS xs")) %>%
invoke("selectExpr", c("CAST(y AS DOUBLE)", exprs)) %>%
# Register table so we can access it from dplyr
invoke("registerTempTable", "exploded_df")

and apply summarize_each:

tbl(sc, "exploded_df") %>% summarize_each(funs(corr(., y)), starts_with("x"))
Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

A quick sanity check (correlation between y and x0, y and x4):

cor(c(21.66, 35.15, 15.22), c(2.643227, 3.422151, 2.8302398))
[1] 0.8503358
cor(c(21.66, 35.15, 15.22), c(3.8708665, 4.0771823, 4.6600842))
[1] -0.5571591

You can of course center the data first:

exploded <- tbl(sc, "exploded_df")

avgs <- summarize_all(exploded, funs(mean)) %>% as.data.frame()
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))

transmute_(exploded, .dots = setNames(center_exprs, colnames(exploded))) %>%
summarize_each(funs(corr(., y)), starts_with("x"))

but it doesn't affect the result:

Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

If both the transmute_ and summarize_each causes some issue we can push the centering and correlation directly into Spark:

#Centering
center_exprs <- as.list(paste(colnames(exploded ),"-", avgs))

exploded %>%
spark_dataframe() %>%
invoke("selectExpr", center_exprs) %>%
invoke("toDF", as.list(colnames(exploded))) %>%
invoke("registerTempTable", "centered")

centered <- tbl(sc, "centered")

#Correlation
corr_exprs <- lapply(
0:(n - 1),
function(i) paste("corr(y, x", i, ") AS x", i, sep=""))

centered %>%
spark_dataframe() %>%
invoke("selectExpr", corr_exprs) %>%
invoke("registerTempTable", "corrs")

tbl(sc, "corrs")
Source:   query [1 x 5]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x0 x1 x2 x3 x4
<dbl> <dbl> <dbl> <dbl> <dbl>
1 0.8503358 -0.9972426 0.7242708 -0.9975092 -0.5571591

Intermediate table is of course not necessary and this could be applied at the same time as we extract data from arrays.

How to use Sparklyr to summarize Categorical Variable Level

Flatten your data using sdf_gather:

long <- diamonds_tbl %>% 
select(cut, color, clarity) %>%
sdf_gather("variable", "level", "cut", "color", "clarity")

Aggregate by variable and level:

counts <- long %>% group_by(variable, level) %>% summarise(freq = n())

And finally apply required window functions:

result <- counts %>%
arrange(-freq) %>%
mutate(
rank = rank(),
total = sum(freq, na.rm = TRUE),
ratio = freq / total * 100)

Which will give you

result
# Source:     spark<?> [?? x 6]
# Groups: variable
# Ordered by: -freq
variable level freq rank total ratio
<chr> <chr> <dbl> <int> <dbl> <dbl>
1 cut Ideal 21551 1 53940 40.0
2 cut Premium 13791 2 53940 25.6
3 cut Very Good 12082 3 53940 22.4
4 cut Good 4906 4 53940 9.10
5 cut Fair 1610 5 53940 2.98
6 clarity SI1 13065 1 53940 24.2
7 clarity VS2 12258 2 53940 22.7
8 clarity SI2 9194 3 53940 17.0
9 clarity VS1 8171 4 53940 15.1
10 clarity VVS2 5066 5 53940 9.39
# … with more rows

with following optimized plan

optimizedPlan(result)
<jobj[165]>
org.apache.spark.sql.catalyst.plans.logical.Project
Project [variable#524, level#525, freq#1478L, rank#1479, total#1480L, ((cast(freq#1478L as double) / cast(total#1480L as double)) * 100.0) AS ratio#1481]
+- Window [rank(_w1#1493L) windowspecdefinition(variable#524, _w1#1493L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#1479], [variable#524], [_w1#1493L ASC NULLS FIRST]
+- Window [sum(freq#1478L) windowspecdefinition(variable#524, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total#1480L], [variable#524]
+- Project [variable#524, level#525, freq#1478L, -freq#1478L AS _w1#1493L]
+- Sort [-freq#1478L ASC NULLS FIRST], true
+- Aggregate [variable#524, level#525], [variable#524, level#525, count(1) AS freq#1478L]
+- Generate explode(map(cut, cut#19, color, color#20, clarity, clarity#21)), [0, 1, 2], false, [variable#524, level#525]
+- Project [cut#19, color#20, clarity#21]
+- InMemoryRelation [carat#18, cut#19, color#20, clarity#21, depth#22, table#23, price#24, x#25, y#26, z#27], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[carat#18,cut#19,color#20,clarity#21,depth#22,table#23,price#24,x#25,y#26,z#27]

and query (sdf_gather component not included):

dbplyr::remote_query(result)
<SQL> SELECT `variable`, `level`, `freq`, `rank`, `total`, `freq` / `total` * 100.0 AS `ratio`
FROM (SELECT `variable`, `level`, `freq`, rank() OVER (PARTITION BY `variable` ORDER BY -`freq`) AS `rank`, sum(`freq`) OVER (PARTITION BY `variable`) AS `total`
FROM (SELECT *
FROM (SELECT `variable`, `level`, count(*) AS `freq`
FROM `sparklyr_tmp_ded2576b9f1`
GROUP BY `variable`, `level`) `dsbksdfhtf`
ORDER BY -`freq`) `obyrzsxeus`) `ekejqyjrfz`

How to repartition a data frame in sparklyr

You can try something like this

library(dplyr)
library(stringi)


#' @param df tbl_spark
#' @param numPartitions numeric number of partitions
#' @param ... character column names
repartition <- function(df, numPartitions, ...) {
# Create output name
alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")

# Convert to Spark DataFrame
sdf <- df %>% spark_dataframe

# Convert names to Columns
exprs <- lapply(
list(...),
function(x) invoke(sdf, "apply", x)
)

sdf %>%
invoke("repartition", as.integer(numPartitions), exprs) %>%
# Use "registerTempTable" with Spark 1.x
invoke("createOrReplaceTempView", alias)

tbl(sc, alias)
}

Example usage:

df <- copy_to(sc, iris)

repartition(df, 3, "Species") %>% optimizedPlan

## <jobj[182]>
## class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
## RepartitionByExpression [Species#775], 3
## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
## : +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

repartition(df, 7) %>% optimizedPlan
## <jobj[69]>
## class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
## RepartitionByExpression 7
## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
## : +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

Function optimizedPlan as defined in Sparklyr: how to center a Spark table based on column?



Related Topics



Leave a reply



Submit