Using Parallel's Parlapply: Unable to Access Variables Within Parallel Code

using parallel's parLapply: unable to access variables within parallel code

You need to export those variables to the other R processes in the cluster:

cl <- makeCluster(mc <- getOption("cl.cores", 4))
clusterExport(cl=cl, varlist=c("text.var", "ntv", "gc.rate", "pos"))

R using varibales in parLapply

You have to use clusterExport:

clusterExport(cl = NULL, varlist, envir = .GlobalEnv)

clusterExport assigns the values on the master R process of the variables named in varlist to variables of the same names in the global environment (aka ‘workspace’) of each node. The environment on the master from which variables are exported defaults to the global environment.

In your case:

Druckfaktor <- 1.3      

no_cores <- detectCores()-1
cl <- makeCluster(no_cores)
clusterExport(cl, c("Druckfaktor"))
[...]

Using parLapply in a sourced script results in memory leak

The signature of parLapply(cl, X, FUN, ...) applies FUN to each element of X. The worker needs to know FUN, so this is serialized and sent to the worker. What is an R function? It's the code that defines the function, and the environment in which the function was defined. Why the environment? because in R it's legal to reference variables defined outside of FUN, e.g.,

f = function(y) x + y
x = 1; f(1)
## [1] 2

As a second complexity, R allows the function to update variables outside the function

f = function(y) { x <<- x + 1; x + y }
x = 1; f(1)
## [1] 3

In the above, we can imagine that we could figure out which parts of the environment of f() need to be seen (only the variable x), but in general this kind of analysis is not possible without actually evaluating the function, e.g., f = function(y, name) get(name) + y; x = 1; f(1, "x")

So for FUN to be evaluated on the worker, the worker needs to know both the definition of FUN and the content of the environment FUN was defined in. R lets the worker know about FUN by using serialize(). The consequence is easy to see

f = function(n) { x = sample(n); length(serialize(function() {}, NULL)) }
f(1)
## [1] 754
f(10)
## [1] 1064
f(100)
## [1] 1424

Larger objects in the environment result in more information sent to / used by the worker.

If you think about it, the description so far would mean that the entire R session should be serialized to the worker (or to disk, if serialize() were being used to save objects) -- the environment of the implicit function in f() includes the body of f(), but also the environment of f(), which is the global environment, and the environment of the global environment, which is the search path... (check out environment(f) and parent.env(.GlobalEnv)). R has an arbitrary rule that it stops at the global environment. So instead of using an implicit function() {}, define this in the .GlobalEnv

g = function() {}
f = function(n) { x = sample(n); length(serialize(g, NULL)) }
f(1)
## [1] 592
f(1000)
## [1] 592

Note also that this has consequences for what functions can be serialized. For instance if g() were serialized in the code below it would 'know' about x

f = function(y) { x = 1; g = function(y) x + y; g() }
f(1)
## [1] 2

but here it does not -- it knows about the symbols in the environment(s) it was defined in but not about the symbols in the environment it was called from.

rm(x)
g = function(y) x + y
f = function(y) { x = 1; g() }
f()
## Error in g() : object 'x' not found

In your script, you could compare

cl = makeCluster(2)
f = function(n) {
x = sample(n)
parLapply(
cl, 1,
function(...)
length(serialize(environment(), NULL))
)
}
f(1)[[1]]
## [1] 256
f(1000)[[1]]
## [1] 4252

with

g = function(...) length(serialize(environment(), NULL))
f = function(n) {
x = sample(n)
parLapply(cl, 1, g)
}
f(1)[[1]]
## [1] 150
f(1000)[[1]]
## [1] 150

parallel parLapply setup

Since you're calling functions from NLP on the cluster workers, you should load it on each of the workers before calling parLapply. You can do that from the worker function, but I tend to use clusterCall or clusterEvalQ right after creating the cluster object:

clusterEvalQ(cl, {library(openNLP); library(NLP)})

Since as.String and Maxent_Word_Token_Annotator are in those packages, they shouldn't be exported.

Note that while running your example on my machine, I noticed that the PTA object doesn't work after being exported to the worker machines. Presumably there is something in that object that can't be safely serialized and unserialized. After I created that object on the workers using clusterEvalQ, the example ran successfully. Here it is, using openNLP 0.2-1:

library(parallel)
tagPOS <- function(x, ...) {
s <- as.String(x)
word_token_annotator <- Maxent_Word_Token_Annotator()
a2 <- Annotation(1L, "sentence", 1L, nchar(s))
a2 <- annotate(s, word_token_annotator, a2)
a3 <- annotate(s, PTA, a2)
a3w <- a3[a3$type == "word"]
POStags <- unlist(lapply(a3w$features, `[[`, "POS"))
POStagged <- paste(sprintf("%s/%s", s[a3w], POStags), collapse = " ")
list(POStagged = POStagged, POStags = POStags)
}
text.var <- c("I like it.", "This is outstanding soup!",
"I really must get the recipe.")
cl <- makeCluster(mc <- getOption("cl.cores", detectCores()/2))
clusterEvalQ(cl, {
library(openNLP)
library(NLP)
PTA <- Maxent_POS_Tag_Annotator()
})
m <- parLapply(cl, text.var, tagPOS)
print(m)
stopCluster(cl)

If clusterEvalQ fails because Maxent_POS_Tag_Annotator is not found, you might be loading the wrong version of openNLP on the workers. You can determine what package versions you're getting on the workers by executing sessionInfo with clusterEvalQ:

library(parallel)
cl <- makeCluster(2)
clusterEvalQ(cl, {library(openNLP); library(NLP)})
clusterEvalQ(cl, sessionInfo())

This will return the results of executing sessionInfo() on each of the cluster workers. Here is the version information for some of the packages that I'm using and that work for me:

other attached packages:
[1] NLP_0.1-0 openNLP_0.2-1

loaded via a namespace (and not attached):
[1] openNLPdata_1.5.3-1 rJava_0.9-4

Using parLapply and clusterExport inside a function

By default clusterExport looks in the .GlobalEnv for objects to export that are named in varlist. If your objects are not in the .GlobalEnv, you must tell clusterExport in which environment it can find those objects.

You can change your clusterExport to the following (which I didn't test, but you said works in the comments)

clusterExport(cl=cl, varlist=c("text.var", "ntv", "gc.rate", "pos"), envir=environment())

This way, it will look in the function's environment for the objects to export.

parLapply on 2 set of list

There is no need to calculate the (potentially large) CalcList. You can use either the list of centres or the list of destinations in the parLapply. The function you are calling would then apply your original function on each element of the other list, e.g. using lapply.

Using parApply() inside a function

clusterExport() pulls from the global environment. Your input variable is not there, it's an argument local to the function, so you need to specify clusterExport(clust, "input", envir = environment()).



Related Topics



Leave a reply



Submit