TryCatch with parLapply (Parallel package) in R
In your error
handler function cond
is an error condition. message(cond)
signals this condition, which is caught on the workers and transmitted as an error to the master. Either remove the message
calls or replace them with something like
message(conditionMessage(cond))
You won't see anything on the master though, so removing is probably best.
Read many files in parallel and extract data
Here is an attempt to solve the problem in the question. Untested, since there is no data.
Step 1
First of all, rewrite the loop in the question as a function.
f <- function(i, path = "./data", cik_files){
filename <- file.path(path, cik_files[i])
data1 <- fromJSON(filename, flatten = TRUE)
if(("NetIncomeLoss" %in% names(data1$facts$`us-gaap`))){
data1 <- data1$facts$`us-gaap`$NetIncomeLoss$units$USD
found <- grep("CY20[0-9]{2}$", data1$frame)
if(length(found) > 0){
tryCatch({
out <- data1[found, c(3, 9)]
out$cik <- strtrim(cik_files[i], 13)
out
},
error = function(e) e,
warning = function(w) w)
} else NULL
} else NULL
}
Step 2
Now load the package parallel
and run one of the following, depending on OS.
library(parallel)
# Not on Windows
library(jsonlite)
json_list <- mclapply(seq_along(cik_files), f, cik_files = cik_files)
# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "cik_files")
clusterEvalQ(cl, "cik_files")
clusterEvalQ(cl, library(jsonlite))
json_list <- parLapply(cl, seq_along(cik_files), f, cik_files = cik_files)
stopCluster(cl)
Step 3
Extract the data from the returned list json_list
.
err <- sapply(json_list, inherits, "error")
warn <- sapply(json_list, inherits, "warning")
ok <- !(err | warn)
json_list[ok] # correctly read in
Error handling within parApply (in R, using parallel package)
There may be nothing wrong with your use of the try
function. It may be that your function is causing a worker process to exit. In that case, the master process will get an error reading from the socket connection to that worker, resulting in the error message:
Error in unserialize(node$con) : error reading from connection
parApply
doesn't catch this error, but propagates it, causing your script to exit with the message "Execution halted".
I can reproduce this scenario with:
library(parallel)
cl <- makePSOCKcluster(4)
clusterApply(cl, 1:10, function(i) {
tryCatch({
quit(save='no', status=1)
},
error=function(e) {
NULL
})
})
When I executed it, I get the output:
Error in unserialize(node$con) : error reading from connection
Calls: clusterApply ... FUN -> recvData -> recvData.SOCKnode -> unserialize
Execution halted
Unfortunately, this tells us nothing about what is causing a worker process to exit, but I think that's where you should focus your efforts, rather than struggling with the try
function.
parallel parLapply setup
Since you're calling functions from NLP
on the cluster workers, you should load it on each of the workers before calling parLapply
. You can do that from the worker function, but I tend to use clusterCall
or clusterEvalQ
right after creating the cluster object:
clusterEvalQ(cl, {library(openNLP); library(NLP)})
Since as.String
and Maxent_Word_Token_Annotator
are in those packages, they shouldn't be exported.
Note that while running your example on my machine, I noticed that the PTA
object doesn't work after being exported to the worker machines. Presumably there is something in that object that can't be safely serialized and unserialized. After I created that object on the workers using clusterEvalQ
, the example ran successfully. Here it is, using openNLP 0.2-1:
library(parallel)
tagPOS <- function(x, ...) {
s <- as.String(x)
word_token_annotator <- Maxent_Word_Token_Annotator()
a2 <- Annotation(1L, "sentence", 1L, nchar(s))
a2 <- annotate(s, word_token_annotator, a2)
a3 <- annotate(s, PTA, a2)
a3w <- a3[a3$type == "word"]
POStags <- unlist(lapply(a3w$features, `[[`, "POS"))
POStagged <- paste(sprintf("%s/%s", s[a3w], POStags), collapse = " ")
list(POStagged = POStagged, POStags = POStags)
}
text.var <- c("I like it.", "This is outstanding soup!",
"I really must get the recipe.")
cl <- makeCluster(mc <- getOption("cl.cores", detectCores()/2))
clusterEvalQ(cl, {
library(openNLP)
library(NLP)
PTA <- Maxent_POS_Tag_Annotator()
})
m <- parLapply(cl, text.var, tagPOS)
print(m)
stopCluster(cl)
If clusterEvalQ
fails because Maxent_POS_Tag_Annotator is not found, you might be loading the wrong version of openNLP on the workers. You can determine what package versions you're getting on the workers by executing sessionInfo
with clusterEvalQ
:
library(parallel)
cl <- makeCluster(2)
clusterEvalQ(cl, {library(openNLP); library(NLP)})
clusterEvalQ(cl, sessionInfo())
This will return the results of executing sessionInfo()
on each of the cluster workers. Here is the version information for some of the packages that I'm using and that work for me:
other attached packages:
[1] NLP_0.1-0 openNLP_0.2-1
loaded via a namespace (and not attached):
[1] openNLPdata_1.5.3-1 rJava_0.9-4
Related Topics
Using R to Fit a Sigmoidal Curve
Voronoi Diagram Polygons Enclosed in Geographic Borders
Export All User Inputs in a Shiny App to File and Load Them Later
Set Environment Variables for System() in R
How to Save Output from Ggforce::Facet_Grid_Paginate in Only One PDF
How to Better Create Stacked Bar Graphs with Multiple Variables from Ggplot2
Stacked Histograms Like in Flow Cytometry
Constrain Multiple Sliderinput in Shiny to Sum to 100
Reproduce a 'The Economist' Chart with Dual Axis
How to Read Knitr/Rmd Cache in Interactive Session
Why Are Lubridate Functions So Slow When Compared with As.Posixct
Data.Table VS Plyr Regression Output
Read-Write Pipe() Communication in R
How to Reset All Options() Arguments to Their Default Values
R Name Colnames and Rownames in List of Data.Frames with Lapply