Running Out of Heap Space in Sparklyr, But Have Plenty of Memory

Running out of heap space in sparklyr, but have plenty of memory

In looking into Sandeep's suggestions, I started digging into the sparklyr deployment notes. These mention that the driver might run out of memory at this stage, and to tweak some settings to correct it.

These settings did not solve the problem, at least not initially. However, isolating the problem to the collect stage allowed me to find similar problems using SparkR on SO.

These answers depended in part on setting the environment variable SPARK_MEM. Putting it all together, I got it to work as follows:

library(tidyverse)
library(sparklyr)

# Set memory allocation for whole local Spark instance
Sys.setenv("SPARK_MEM" = "13g")

# Set driver and executor memory allocations
config <- spark_config()
config$spark.driver.memory <- "4G"
config$spark.executor.memory <- "1G"

# Connect to Spark instance
sc <- spark_connect(master = "local")

# Load data into Spark
df_tbl <- copy_to(sc, df)

# Summarise data
uniques <- df_tbl %>%
group_by(my_key) %>%
summarise() %>%
ungroup() %>%
collect()

SparkR collect method crashes with OutOfMemory on Java heap space

This does appear to be a simple combination of Java in-memory object representations being inefficient combined with some apparent long-lived object references which cause some collections to fail to be garbage-collected in time for the new collect() call to overwrite the old one in-place.

I experimented with some options, and for my sample 256MB file that contains ~4M lines, I indeed reproduce your behavior where collect is fine the first time, but OOMs the second time, when using SPARK_MEM=1g. I then set SPARK_MEM=4g instead, and then I'm able to ctrl+c and re-run test <- collect(lines) as many times as I want.

For one thing, even if references didn't leak, note that after the first time you ran test <- collect(lines), the variable test is holding that gigantic array of lines, and the second time you call it, the collect(lines) executes before finally being assigned to the test variable and thus in any straightforward instruction-ordering, there's no way to garbage-collect the old contents of test. This means the second run will make the SparkRBackend process hold two copies of the entire collection at the same time, leading to the OOM you saw.

To diagnose, on the master I started SparkR and first ran

dhuo@dhuo-sparkr-m:~$ jps | grep SparkRBackend
8709 SparkRBackend

I also checked top and it was using around 22MB of memory. I fetched a heap profile with jmap:

jmap -heap:format=b 8709
mv heap.bin heap0.bin

Then I ran the first round of test <- collect(lines) at which point running top showed it using ~1.7g of RES memory. I grabbed another heap dump. Finally, I also tried test <- {} to get rid of references to allow garbage-collection. After doing this, and printing out test and showing it to be empty, I grabbed another heap dump and noticed RES still showed 1.7g. I used jhat heap0.bin to analyze the original heap dump, and got:

Heap Histogram

All Classes (excluding platform)

Class Instance Count Total Size
class [B 25126 14174163
class [C 19183 1576884
class [<other> 11841 1067424
class [Lscala.concurrent.forkjoin.ForkJoinTask; 16 1048832
class [I 1524 769384
...

After running collect, I had:

Heap Histogram

All Classes (excluding platform)

Class Instance Count Total Size
class [C 2784858 579458804
class [B 27768 70519801
class java.lang.String 2782732 44523712
class [Ljava.lang.Object; 2567 22380840
class [I 1538 8460152
class [Lscala.concurrent.forkjoin.ForkJoinTask; 27 1769904

Even after I nulled out test, it remained about the same. This shows us 2784858 instances of char[], for a total size of 579MB, and also 2782732 instances of String, presumably holding those char[]'s above it. I followed the reference graph all the way up, and got something like

char[] -> String -> String[] -> ... -> class scala.collection.mutable.DefaultEntry -> class [Lscala.collection.mutable.HashEntry; -> class scala.collection.mutable.HashMap -> class edu.berkeley.cs.amplab.sparkr.JVMObjectTracker$ -> java.util.Vector@0x785b48cd8 (36 bytes) -> sun.misc.Launcher$AppClassLoader@0x7855c31a8 (138 bytes)

And then AppClassLoader had something like thousands of inbound references. So somewhere along that chain something should've been removing their reference but failing to do so, causing the entire collected array to sit in memory while we try to fetch a second copy of it.

Finally, to answer your question about hanging after the collect, it appears it has to do with the data not fitting in the R process's memory; here's a thread related to that issue: https://www.mail-archive.com/user@spark.apache.org/msg29155.html

I confirmed that using a smaller file with only a handful of lines, and then running collect indeed does not hang.

Spark Memory Issues with sparklyr

I figured this out by chance. It turns out that when you are running operations on an external spark cluster on client mode it still runs Spark locally as well. I think that the local Spark did not have enough memory allocated and that was causing the error. My fix was simple:

Instead of allocating memory via:

spark_conf = spark_config()
spark_conf$`spark.driver.memory` <- "8G"
spark_conf$`spark.executor.memory` <- "12G"

I used:

spark_conf = spark_config()
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"

The former will set resources on the cluster (spark context) directly. The latter sets it in the spark context as well as the rest of the sparklyr application.



Related Topics



Leave a reply



Submit