Gather in Sparklyr

Gather in sparklyr

Here's a function to mimic gather in sparklyr. This would gather the given columns while keeping everything else intact, but it can easily be extended if required.

# Function
sdf_gather <- function(tbl, gather_cols){

other_cols <- colnames(tbl)[!colnames(tbl) %in% gather_cols]

lapply(gather_cols, function(col_nm){
tbl %>%
select(c(other_cols, col_nm)) %>%
mutate(key = col_nm) %>%
rename(value = col_nm)
}) %>%
sdf_bind_rows() %>%
select(c(other_cols, 'key', 'value'))
}

# Example
spark_df %>%
select(col_1, col_2, col_3, col_4) %>%
sdf_gather(c('col_3', 'col_4'))

Spread in SparklyR / pivot in Spark

In this specific case (in general where all columns have the same type, although if you're interested only in missing data statistics, this can be further relaxed) you can use much simpler structure than this.

With data defined like this:

df <- copy_to(sc, iris, overwrite = TRUE) 

gather the columns (below I assume a function as defined in my answer to Gather in sparklyr)

long <- df %>% 
select(Sepal_Length, Sepal_Width) %>%
sdf_gather("key", "value", "Sepal_Length", "Sepal_Width")

and then group and aggregate:

long %>% 
group_by(key) %>%
summarise(n = n(), nmiss = sum(as.numeric(is.na(value)), na.rm=TRUE))

with result as:

# Source: spark<?> [?? x 3]
key n nmiss
<chr> <dbl> <dbl>
1 Sepal_Length 150 0
2 Sepal_Width 150 0

Given reduced size of the output it is also fine to collect the result after aggregation

agg <- df %>%
select(Sepal_Length,Sepal_Width) %>%
summarize_all(funs(
n = n(),
nmiss=sum(as.numeric(is.na(.))) # MissingCount
)) %>% collect()

and apply your gather - spread logic on the result:

agg %>% 
tidyr::gather(variable, value) %>%
tidyr::separate(variable, c("var", "stat"), sep = "_(?=[^_]*$)") %>%
tidyr::spread(stat, value)
# A tibble: 2 x 3
var n nmiss
<chr> <dbl> <dbl>
1 Sepal_Length 150 0
2 Sepal_Width 150 0

In fact the latter approach should be superior performance-wise in this particular case.

How to use sdf_pivot() in sparklyr and concatenate strings?

i dug into the tests for sdf_pivot and it seems you can use invoke inside a custom fun.aggregate function to access the collect_list function:

 fun.aggregate <- function(gdf) {

expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"expr",
"collect_list(y)" #this is your own "y" variable
)

gdf %>% invoke("agg", expr, list())
}

that you can then use in sdf_pivot:

d_sdf_wide <- sdf_pivot(d_sdf, id ~ x, fun.aggregate)

this does do the job:

> d_sdf_wide
Source: table<sparklyr_tmp_69c14424c5a4> [?? x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE

id `200` `201`
<chr> <list> <list>
1 1 <list [2]> <list [1]>
2 2 <list [1]> <list [2]>

(your data is now in list format, not a string, but you can concatenate the lists if you like, e.g.

d_sdf_wide %>% mutate(liststring = paste(`200`))

id `200` `201` liststring
<chr> <list> <list> <chr>
1 1 <list [2]> <list [1]> This That
2 2 <list [1]> <list [2]> The

(alternatively, you could write a complicated sql query, but i haven't tried)

Sparklyr: how to center a Spark table based on column?

You just use mutate_each / muate_all

library(dplyr)

df <- data.frame(x=c(1, 2, 3), y = c(-4, 5, 6), z = c(42, 42, 42))
sdf <- copy_to(sc, df, overwrite=TRUE)

mutate_all(sdf, funs(. - mean(.)))

Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

but it looks like it is expanded to a really inefficient (unacceptable for large datasets) window function application. You could be better with more verbose solution:

avgs <- summarize_all(sdf, funs(mean)) %>% as.data.frame()

exprs <- as.list(paste(colnames(sdf),"-", avgs))

sdf %>%
spark_dataframe() %>%
invoke("selectExpr", exprs) %>%
invoke("toDF", as.list(colnames(sdf))) %>%
invoke("registerTempTable", "centered")

tbl(sc, "centered")
Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

It is not as pretty as dplyr approach but unlike the former one does a sensible thing.

If you want to skip all the invokes you can use dplyr to the same thing:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf)))
Source:   query [3 x 3]
Database: spark connection master=local[*] app=sparklyr local=TRUE

x y z
<dbl> <dbl> <dbl>
1 -1 -6.333333 0
2 0 2.666667 0
3 1 3.666667 0

Execution plans:

A helper function (see also dbplyr::remote_query for physical plan):

optimizedPlan <- function(df) {
df %>%
spark_dataframe() %>%
invoke("queryExecution") %>%
invoke("optimizedPlan")
}

dplyr version:

mutate_all(sdf, funs(. - mean(.))) %>% optimizedPlan()
<jobj[190]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [x#2877, y#2878, (z#1123 - _we0#2894) AS z#2879]
+- Window [avg(z#1123) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2894]
+- Project [x#2877, (y#1122 - _we0#2892) AS y#2878, z#1123]
+- Window [avg(y#1122) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2892]
+- Project [(x#1121 - _we0#2890) AS x#2877, z#1123, y#1122]
+- Window [avg(x#1121) windowspecdefinition(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS _we0#2890]
+- Project [y#1122, z#1123, x#1121]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Spark solution:

tbl(sc, "centered") %>% optimizedPlan()
<jobj[204]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#2339, (y#1122 - 2.33333333333333) AS y#2340, (z#1123 - 42.0) AS z#2341]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

dplyr optimized:

transmute_(sdf, .dots = setNames(exprs, colnames(sdf))) %>% optimizedPlan()
<jobj[272]>
class org.apache.spark.sql.catalyst.plans.logical.Project
Project [(x#1121 - 2.0) AS x#4792, (y#1122 - 2.33333333333333) AS y#4793, (z#1123 - 42.0) AS z#4794]
+- InMemoryRelation [x#1121, y#1122, z#1123], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
: +- *Scan csv [x#1121,y#1122,z#1123] Format: CSV, InputPaths: file:/tmp/RtmpiEECCe/spark_serialize_f848ebf3e065c9a204092779c3e8f32ce6afdcb6e79bf6b9868ae9ff198a..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<x:double,y:double,z:double>

Notes:

Spark SQL is not that good in handling wide datasets. With core Spark you usually combine features into a single Vector Column and Spark provides a number of transformers which can be used to operate on Vector data.

sparklyr sdf_collect and dplyr collect function on large tables in Spark takes ages to run?

That is an expected behavior. dplyr::collect, sparklyr::sdf_collect or Spark's native collect will bring all data to the driver node.

Even if feasible (you need at least 2-3 times more memory than the actual size of the data, depending on a scenario) it is bound to take a long time - with drivers network interfaces being the most obvious bottleneck.

In practice if you're going to collect all the data it typically makes more sense to skip network and platform overhead and load data directly using native tools (given the description it would be to download data to the driver and convert to R friendly format file by file).

How to use R function in Sparklyr

Your issue is related to non-standard evaluation that dplyr functions tend to give you. When you reference a in your first call to point_dist, R attempts to evaluate it, which of course fails. (It's even more confusing when you have some variable named as such in your calling environment or higher ...)

NSE in dplyr means you can do something like select(mtcars, cyl), whereas with most standard-evaluation functions, you'll need myfunc(mtcars, "cyl"), since there isn't a variable named cyl in the calling environment.

In your case, try:

point_dist <- function(dta, vari) {
vari <- enquo(vari)
dta %>%
group_by(!!vari) %>%
summarize(count=n()) %>%
collect() %>%
ggplot(aes(x=!!vari, y=count)) +
gemo_point()
}

This method of dealing with unquoted column-names in your functions can be confusing if you're familiar with normal R function definitions and/or are not familiar with NSE. This can be a good template for you if that's as far as you're going to go with it, otherwise I strongly urge you to read a little more at the first reference below.

Some good references for NSE, specifically in/around tidyverse stuff:

  • https://dplyr.tidyverse.org/articles/programming.html
  • http://adv-r.had.co.nz/Computing-on-the-language.html

How to use Sparklyr to summarize Categorical Variable Level

Flatten your data using sdf_gather:

long <- diamonds_tbl %>% 
select(cut, color, clarity) %>%
sdf_gather("variable", "level", "cut", "color", "clarity")

Aggregate by variable and level:

counts <- long %>% group_by(variable, level) %>% summarise(freq = n())

And finally apply required window functions:

result <- counts %>%
arrange(-freq) %>%
mutate(
rank = rank(),
total = sum(freq, na.rm = TRUE),
ratio = freq / total * 100)

Which will give you

result
# Source:     spark<?> [?? x 6]
# Groups: variable
# Ordered by: -freq
variable level freq rank total ratio
<chr> <chr> <dbl> <int> <dbl> <dbl>
1 cut Ideal 21551 1 53940 40.0
2 cut Premium 13791 2 53940 25.6
3 cut Very Good 12082 3 53940 22.4
4 cut Good 4906 4 53940 9.10
5 cut Fair 1610 5 53940 2.98
6 clarity SI1 13065 1 53940 24.2
7 clarity VS2 12258 2 53940 22.7
8 clarity SI2 9194 3 53940 17.0
9 clarity VS1 8171 4 53940 15.1
10 clarity VVS2 5066 5 53940 9.39
# … with more rows

with following optimized plan

optimizedPlan(result)
<jobj[165]>
org.apache.spark.sql.catalyst.plans.logical.Project
Project [variable#524, level#525, freq#1478L, rank#1479, total#1480L, ((cast(freq#1478L as double) / cast(total#1480L as double)) * 100.0) AS ratio#1481]
+- Window [rank(_w1#1493L) windowspecdefinition(variable#524, _w1#1493L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#1479], [variable#524], [_w1#1493L ASC NULLS FIRST]
+- Window [sum(freq#1478L) windowspecdefinition(variable#524, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total#1480L], [variable#524]
+- Project [variable#524, level#525, freq#1478L, -freq#1478L AS _w1#1493L]
+- Sort [-freq#1478L ASC NULLS FIRST], true
+- Aggregate [variable#524, level#525], [variable#524, level#525, count(1) AS freq#1478L]
+- Generate explode(map(cut, cut#19, color, color#20, clarity, clarity#21)), [0, 1, 2], false, [variable#524, level#525]
+- Project [cut#19, color#20, clarity#21]
+- InMemoryRelation [carat#18, cut#19, color#20, clarity#21, depth#22, table#23, price#24, x#25, y#26, z#27], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[carat#18,cut#19,color#20,clarity#21,depth#22,table#23,price#24,x#25,y#26,z#27]

and query (sdf_gather component not included):

dbplyr::remote_query(result)
<SQL> SELECT `variable`, `level`, `freq`, `rank`, `total`, `freq` / `total` * 100.0 AS `ratio`
FROM (SELECT `variable`, `level`, `freq`, rank() OVER (PARTITION BY `variable` ORDER BY -`freq`) AS `rank`, sum(`freq`) OVER (PARTITION BY `variable`) AS `total`
FROM (SELECT *
FROM (SELECT `variable`, `level`, count(*) AS `freq`
FROM `sparklyr_tmp_ded2576b9f1`
GROUP BY `variable`, `level`) `dsbksdfhtf`
ORDER BY -`freq`) `obyrzsxeus`) `ekejqyjrfz`


Related Topics



Leave a reply



Submit