Gather in sparklyr
Here's a function to mimic gather
in sparklyr. This would gather the given columns while keeping everything else intact, but it can easily be extended if required.
# Function
sdf_gather <- function(tbl, gather_cols){
other_cols <- colnames(tbl)[!colnames(tbl) %in% gather_cols]
lapply(gather_cols, function(col_nm){
tbl %>%
select(c(other_cols, col_nm)) %>%
mutate(key = col_nm) %>%
rename(value = col_nm)
}) %>%
sdf_bind_rows() %>%
select(c(other_cols, 'key', 'value'))
}
# Example
spark_df %>%
select(col_1, col_2, col_3, col_4) %>%
sdf_gather(c('col_3', 'col_4'))
Spread in SparklyR / pivot in Spark
In this specific case (in general where all columns have the same type, although if you're interested only in missing data statistics, this can be further relaxed) you can use much simpler structure than this.
With data defined like this:
df <- copy_to(sc, iris, overwrite = TRUE)
gather
the columns (below I assume a function as defined in my answer to Gather in sparklyr)
long <- df %>%
select(Sepal_Length, Sepal_Width) %>%
sdf_gather("key", "value", "Sepal_Length", "Sepal_Width")
and then group and aggregate:
long %>%
group_by(key) %>%
summarise(n = n(), nmiss = sum(as.numeric(is.na(value)), na.rm=TRUE))
with result as:
# Source: spark<?> [?? x 3]
key n nmiss
<chr> <dbl> <dbl>
1 Sepal_Length 150 0
2 Sepal_Width 150 0
Given reduced size of the output it is also fine to collect the result after aggregation
agg <- df %>%
select(Sepal_Length,Sepal_Width) %>%
summarize_all(funs(
n = n(),
nmiss=sum(as.numeric(is.na(.))) # MissingCount
)) %>% collect()
and apply your gather
- spread
logic on the result:
agg %>%
tidyr::gather(variable, value) %>%
tidyr::separate(variable, c("var", "stat"), sep = "_(?=[^_]*$)") %>%
tidyr::spread(stat, value)
# A tibble: 2 x 3
var n nmiss
<chr> <dbl> <dbl>
1 Sepal_Length 150 0
2 Sepal_Width 150 0
In fact the latter approach should be superior performance-wise in this particular case.
How to use sdf_pivot() in sparklyr and concatenate strings?
i dug into the tests for sdf_pivot
and it seems you can use invoke
inside a custom fun.aggregate
function to access the collect_list
function:
fun.aggregate <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"expr",
"collect_list(y)" #this is your own "y" variable
)
gdf %>% invoke("agg", expr, list())
}
that you can then use in sdf_pivot
:
d_sdf_wide <- sdf_pivot(d_sdf, id ~ x, fun.aggregate)
this does do the job:
> d_sdf_wide
Source: table<sparklyr_tmp_69c14424c5a4> [?? x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE
id `200` `201`
<chr> <list> <list>
1 1 <list [2]> <list [1]>
2 2 <list [1]> <list [2]>
(your data is now in list
format, not a string, but you can concatenate the lists if you like, e.g.
d_sdf_wide %>% mutate(liststring = paste(`200`))
id `200` `201` liststring
<chr> <list> <list> <chr>
1 1 <list [2]> <list [1]> This That
2 2 <list [1]> <list [2]> The
(alternatively, you could write a complicated sql query, but i haven't tried)
Sparklyr: how to center a Spark table based on column?
You just use mutate_each
/ muate_all
library(dplyr)
df <- data.frame(x=c(1, 2, 3), y = c(-4, 5, 6), z = c(42, 42, 42))
sdf <- copy_to(sc, df, overwrite=TRUE)
mutate_all(sdf, funs(. - mean(.)))
Source: query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0
but it looks like it is expanded to a really inefficient (unacceptable for large datasets) window function application. You could be better with more verbose solution:
avgs <- summarize_all(sdf, funs(mean)) %>% as.data.frame()
exprs <- as.list(paste(colnames(sdf),"-", avgs))
sdf %>%
spark_dataframe() %>%
invoke("selectExpr", exprs) %>%
invoke("toDF", as.list(colnames(sdf))) %>%
invoke("registerTempTable", "centered")
tbl(sc, "centered")
Source: query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0
It is not as pretty as dplyr
approach but unlike the former one does a sensible thing.
If you want to skip all the invokes
you can use dplyr
to the same thing:
transmute_(sdf, .dots = setNames(exprs, colnames(sdf)))
Source: query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE
x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0
Execution plans:
A helper function (see also dbplyr::remote_query
for physical plan):
optimizedPlan <- function(df) {
df %>%
spark_dataframe() %>%
invoke("queryExecution") %>%
invoke("optimizedPlan")
}
dplyr
version:
mutate_all(sdf, funs(. - mean(.))) %>% optimizedPlan()
<jobj[190]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [x#2877, y#2878, (z#1123 - _we0#2894) AS z#2879]
+- Window [avg(z#1123) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2894]
+- Project [x#2877, (y#1122 - _we0#2892) AS y#2878, z#1123]
+- Window [avg(y#1122) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2892]
+- Project [(x#1121 - _we0#2890) AS x#2877, z#1123, y#1122]
+- Window [avg(x#1121) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2890]
+- Project [y#1122, z#1123, x#1121]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>
Spark solution:
tbl(sc, "centered") %>% optimizedPlan()
<jobj[204]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#2339, (y#1122 - 2.33333333333333) AS y#2340, (z#1123 - 42.0) AS z#2341]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>
dplyr
optimized:
transmute_(sdf, .dots = setNames(exprs, colnames(sdf))) %>% optimizedPlan()
<jobj[272]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#4792, (y#1122 - 2.33333333333333) AS y#4793, (z#1123 - 42.0) AS z#4794]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>
Notes:
Spark SQL is not that good in handling wide datasets. With core Spark you usually combine features into a single Vector
Column
and Spark provides a number of transformers which can be used to operate on Vector
data.
sparklyr sdf_collect and dplyr collect function on large tables in Spark takes ages to run?
That is an expected behavior. dplyr::collect
, sparklyr::sdf_collect
or Spark's native collect
will bring all data to the driver node.
Even if feasible (you need at least 2-3 times more memory than the actual size of the data, depending on a scenario) it is bound to take a long time - with drivers network interfaces being the most obvious bottleneck.
In practice if you're going to collect all the data it typically makes more sense to skip network and platform overhead and load data directly using native tools (given the description it would be to download data to the driver and convert to R friendly format file by file).
How to use R function in Sparklyr
Your issue is related to non-standard evaluation that dplyr
functions tend to give you. When you reference a
in your first call to point_dist
, R attempts to evaluate it, which of course fails. (It's even more confusing when you have some variable named as such in your calling environment or higher ...)
NSE in dplyr
means you can do something like select(mtcars, cyl)
, whereas with most standard-evaluation functions, you'll need myfunc(mtcars, "cyl")
, since there isn't a variable named cyl
in the calling environment.
In your case, try:
point_dist <- function(dta, vari) {
vari <- enquo(vari)
dta %>%
group_by(!!vari) %>%
summarize(count=n()) %>%
collect() %>%
ggplot(aes(x=!!vari, y=count)) +
gemo_point()
}
This method of dealing with unquoted column-names in your functions can be confusing if you're familiar with normal R function definitions and/or are not familiar with NSE. This can be a good template for you if that's as far as you're going to go with it, otherwise I strongly urge you to read a little more at the first reference below.
Some good references for NSE, specifically in/around tidyverse stuff:
- https://dplyr.tidyverse.org/articles/programming.html
- http://adv-r.had.co.nz/Computing-on-the-language.html
How to use Sparklyr to summarize Categorical Variable Level
Flatten your data using sdf_gather
:
long <- diamonds_tbl %>%
select(cut, color, clarity) %>%
sdf_gather("variable", "level", "cut", "color", "clarity")
Aggregate by variable
and level
:
counts <- long %>% group_by(variable, level) %>% summarise(freq = n())
And finally apply required window functions:
result <- counts %>%
arrange(-freq) %>%
mutate(
rank = rank(),
total = sum(freq, na.rm = TRUE),
ratio = freq / total * 100)
Which will give you
result
# Source: spark<?> [?? x 6]
# Groups: variable
# Ordered by: -freq
variable level freq rank total ratio
<chr> <chr> <dbl> <int> <dbl> <dbl>
1 cut Ideal 21551 1 53940 40.0
2 cut Premium 13791 2 53940 25.6
3 cut Very Good 12082 3 53940 22.4
4 cut Good 4906 4 53940 9.10
5 cut Fair 1610 5 53940 2.98
6 clarity SI1 13065 1 53940 24.2
7 clarity VS2 12258 2 53940 22.7
8 clarity SI2 9194 3 53940 17.0
9 clarity VS1 8171 4 53940 15.1
10 clarity VVS2 5066 5 53940 9.39
# … with more rows
with following optimized plan
optimizedPlan(result)
<jobj[165]>
org.apache.spark.sql.catalyst.plans.logical.Project
Project [variable#524, level#525, freq#1478L, rank#1479, total#1480L, ((cast(freq#1478L as double) / cast(total#1480L as double)) * 100.0) AS ratio#1481]
+- Window [rank(_w1#1493L) windowspecdefinition(variable#524, _w1#1493L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#1479], [variable#524], [_w1#1493L ASC NULLS FIRST]
+- Window [sum(freq#1478L) windowspecdefinition(variable#524, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total#1480L], [variable#524]
+- Project [variable#524, level#525, freq#1478L, -freq#1478L AS _w1#1493L]
+- Sort [-freq#1478L ASC NULLS FIRST], true
+- Aggregate [variable#524, level#525], [variable#524, level#525, count(1) AS freq#1478L]
+- Generate explode(map(cut, cut#19, color, color#20, clarity, clarity#21)), [0, 1, 2], false, [variable#524, level#525]
+- Project [cut#19, color#20, clarity#21]
+- InMemoryRelation [carat#18, cut#19, color#20, clarity#21, depth#22, table#23, price#24, x#25, y#26, z#27], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[carat#18,cut#19,color#20,clarity#21,depth#22,table#23,price#24,x#25,y#26,z#27]
and query (sdf_gather
component not included):
dbplyr::remote_query(result)
<SQL> SELECT `variable`, `level`, `freq`, `rank`, `total`, `freq` / `total` * 100.0 AS `ratio`
FROM (SELECT `variable`, `level`, `freq`, rank() OVER (PARTITION BY `variable` ORDER BY -`freq`) AS `rank`, sum(`freq`) OVER (PARTITION BY `variable`) AS `total`
FROM (SELECT *
FROM (SELECT `variable`, `level`, count(*) AS `freq`
FROM `sparklyr_tmp_ded2576b9f1`
GROUP BY `variable`, `level`) `dsbksdfhtf`
ORDER BY -`freq`) `obyrzsxeus`) `ekejqyjrfz`
Related Topics
Draw Lines Between Different Elements in a Stacked Bar Plot
Replace Values in Data Frame Based on Other Data Frame in R
Let Ggplot2 Histogram Show Classwise Percentages on Y Axis
Count Common Words in Two Strings
Unexpected Symbol Error in Parse(Text = Str) with Hyphen After a Digit
Repeat Vector to Fill Down Column in Data Frame
Finding Close Match from Data Frame 1 in Data Fame 2
Operator Precedence of "Unary Minus" (-) and Exponentiation (^) Outside VS. Inside Function
Plot Scatterplot on a Map in Shiny
Tm_Map Has Parallel::Mclapply Error in R 3.0.1 on MAC
How to Suppress Warnings from Stats:::Regularize.Values
Repeat the Re-Sampling Function for 1000 Times? Using Lapply
How to Create a Bar and Line Plot with R Dygraphs
Write Different Data Frame in One .CSV File with R