Trycatch with Parlapply (Parallel Package) in R

TryCatch with parLapply (Parallel package) in R

In your error handler function cond is an error condition. message(cond) signals this condition, which is caught on the workers and transmitted as an error to the master. Either remove the message calls or replace them with something like

message(conditionMessage(cond))

You won't see anything on the master though, so removing is probably best.

Read many files in parallel and extract data

Here is an attempt to solve the problem in the question. Untested, since there is no data.

Step 1

First of all, rewrite the loop in the question as a function.

f <- function(i, path = "./data", cik_files){
filename <- file.path(path, cik_files[i])
data1 <- fromJSON(filename, flatten = TRUE)
if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
found <- grep("CY20[0-9]{2}$", data1$frame)
if(length(found) > 0){
tryCatch({
out <- data1[found, c(3, 9)]
out$cik <- strtrim(cik_files[i], 13)
out
},
error = function(e) e,
warning = function(w) w)
} else NULL
} else NULL
}

Step 2

Now load the package parallel and run one of the following, depending on OS.

library(parallel)


# Not on Windows
library(jsonlite)
json_list <- mclapply(seq_along(cik_files), f, cik_files = cik_files)


# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "cik_files")
clusterEvalQ(cl, "cik_files")
clusterEvalQ(cl, library(jsonlite))

json_list <- parLapply(cl, seq_along(cik_files), f, cik_files = cik_files)

stopCluster(cl)

Step 3

Extract the data from the returned list json_list.

err <- sapply(json_list, inherits, "error")
warn <- sapply(json_list, inherits, "warning")
ok <- !(err | warn)
json_list[ok] # correctly read in

Error handling within parApply (in R, using parallel package)

There may be nothing wrong with your use of the try function. It may be that your function is causing a worker process to exit. In that case, the master process will get an error reading from the socket connection to that worker, resulting in the error message:

Error in unserialize(node$con) : error reading from connection

parApply doesn't catch this error, but propagates it, causing your script to exit with the message "Execution halted".

I can reproduce this scenario with:

library(parallel)
cl <- makePSOCKcluster(4)
clusterApply(cl, 1:10, function(i) {
tryCatch({
quit(save='no', status=1)
},
error=function(e) {
NULL
})
})

When I executed it, I get the output:

Error in unserialize(node$con) : error reading from connection
Calls: clusterApply ... FUN -> recvData -> recvData.SOCKnode -> unserialize
Execution halted

Unfortunately, this tells us nothing about what is causing a worker process to exit, but I think that's where you should focus your efforts, rather than struggling with the try function.

parallel parLapply setup

Since you're calling functions from NLP on the cluster workers, you should load it on each of the workers before calling parLapply. You can do that from the worker function, but I tend to use clusterCall or clusterEvalQ right after creating the cluster object:

clusterEvalQ(cl, {library(openNLP); library(NLP)})

Since as.String and Maxent_Word_Token_Annotator are in those packages, they shouldn't be exported.

Note that while running your example on my machine, I noticed that the PTA object doesn't work after being exported to the worker machines. Presumably there is something in that object that can't be safely serialized and unserialized. After I created that object on the workers using clusterEvalQ, the example ran successfully. Here it is, using openNLP 0.2-1:

library(parallel)
tagPOS <- function(x, ...) {
s <- as.String(x)
word_token_annotator <- Maxent_Word_Token_Annotator()
a2 <- Annotation(1L, "sentence", 1L, nchar(s))
a2 <- annotate(s, word_token_annotator, a2)
a3 <- annotate(s, PTA, a2)
a3w <- a3[a3$type == "word"]
POStags <- unlist(lapply(a3w$features, `[[`, "POS"))
POStagged <- paste(sprintf("%s/%s", s[a3w], POStags), collapse = " ")
list(POStagged = POStagged, POStags = POStags)
}
text.var <- c("I like it.", "This is outstanding soup!",
"I really must get the recipe.")
cl <- makeCluster(mc <- getOption("cl.cores", detectCores()/2))
clusterEvalQ(cl, {
library(openNLP)
library(NLP)
PTA <- Maxent_POS_Tag_Annotator()
})
m <- parLapply(cl, text.var, tagPOS)
print(m)
stopCluster(cl)

If clusterEvalQ fails because Maxent_POS_Tag_Annotator is not found, you might be loading the wrong version of openNLP on the workers. You can determine what package versions you're getting on the workers by executing sessionInfo with clusterEvalQ:

library(parallel)
cl <- makeCluster(2)
clusterEvalQ(cl, {library(openNLP); library(NLP)})
clusterEvalQ(cl, sessionInfo())

This will return the results of executing sessionInfo() on each of the cluster workers. Here is the version information for some of the packages that I'm using and that work for me:

other attached packages:
[1] NLP_0.1-0 openNLP_0.2-1

loaded via a namespace (and not attached):
[1] openNLPdata_1.5.3-1 rJava_0.9-4


Related Topics



Leave a reply



Submit