Is There an Efficient Way to Parallelize Mapply

Is there an efficient way to parallelize mapply?

I'd use the parallel package that's built into R 2.14 and work with matrices. You could then simply use mclapply like this:

dfm <- as.matrix(df)
result <- mclapply(seq_len(nrow(dfm)),
function(x) do.call(get_uniroot,as.list(dfm[x,])),
mc.cores=4L
)
unlist(result)

This is basically doing the same mapply does, but in a parallel way.

But...

Mind you that parallelization always counts for some overhead as well. As I explained in the question you link to, going parallel only pays off if your inner function calculates significantly longer than the overhead involved. In your case, your uniroot function works pretty fast. You might then consider to cut your data frame in bigger chunks, and combine both mapply and mclapply. A possible way to do this is:

ncores <- 4
id <- floor(
quantile(0:nrow(df),
1-(0:ncores)/ncores
)
)
idm <- embed(id,2)

mapply_uniroot <- function(id){
tmp <- df[(id[1]+1):id[2],]
mapply(get_uniroot, tmp$P, tmp$B0, tmp$CF1, tmp$CF2, tmp$CF3)
}
result <-mclapply(nrow(idm):1,
function(x) mapply_uniroot(idm[x,]),
mc.cores=ncores)
final <- unlist(result)

This might need some tweaking, but it essentially breaks your df in exactly as many bits as there are cores, and run the mapply on every core. To show this works :

> x1 <- mapply(get_uniroot, df$P, df$B0, df$CF1, df$CF2, df$CF3)
> all.equal(final,x1)
[1] TRUE

How do I parallelize this lapply() function in R?

Parallel processing is not always faster than a single thread

There are two frequent situations in my experience where parallel processing ends up slower than using a single-thread:

  1. The data is large and copying it between workers is expensive.
  2. The task allocated to each worker is fast enough that the overhead of setting up workers impacts significantly.

1. Copying data between workers can cost more time than parallel processing saves

As the furrr docs note:

It’s important to remember that data has to be passed back and forth between the workers. This means that whatever performance gain you might have gotten from your parallelization can be crushed by moving large amounts of data around. For example, if you are moving large data frames to the workers, running models in parallel, and returning large model objects back, the shuffling of data can take a large chunk of that time.

A quick simulation

We can see an example of this if we define an inefficient function. This function calculates the mean of a data frame column, but then instead of returning this single value, creates a new data frame column with the value recycled. It then returns the entire data frame with this new column appended:

silly_fun <- function(dat, col_name) {
mean_col_name <- paste0(col_name, "_mean")
dat[[mean_col_name]] <- mean(dat[[col_name]])

return(dat)
}

It would be a bad idea to run this function over every column of a data frame, whether in parallel or not.

So, let's try it with only 100 rows and columns and see what happens:

library(future.apply)
plan(multisession)

nrows <- 100
ncols <- 100
dat <- data.frame(
matrix(rnorm(nrows * ncols), nrows, ncols)
)

res <- microbenchmark::microbenchmark(
single_thread = lapply(names(dat), \(col_name) silly_fun(dat, col_name)),
parallel = future_lapply(names(dat), \(col_name) silly_fun(dat, col_name)),
times = 100
)

Output:

Unit: milliseconds
expr min lq mean median uq max neval cld
single_thread 2.9771 3.26725 3.876938 3.43705 3.91215 9.6273 100 a
parallel 103.5295 114.23415 126.105709 123.41755 132.39925 235.1055 100 b

As you can see, the median for the parallel job is about 35 times that of the single-threaded.

If we try it with 1000 rows and columns, these are the results:

Unit: milliseconds
expr min lq mean median uq max neval
single_thread 168.5477 168.5477 168.5477 168.5477 168.5477 168.5477 1
parallel 29440.3962 29440.3962 29440.3962 29440.3962 29440.3962 29440.3962 1

Here it takes 175 times as long to run in parallel. You can see that I've only done one iteration here - a single-threaded iteration took 0.168 seconds but doing it in parallel took 29.4 seconds!

2. The overhead of setting up workers can be more expensive than the time saved

Let's take a more sensible example and just return the actual mean, rather than the entire data frame, with 1000 rows and columns. You might think that now we're not passing large amounts of data, parallel processing will be much quicker. Let's see:

nrows <- 1e3
ncols <- 1e3

dat <- data.frame(
matrix(rnorm(nrows * ncols), nrows, ncols)
)
sensible <- microbenchmark::microbenchmark(
single_thread = lapply(dat, mean),
parallel = future_lapply(dat, mean),
times = 10
)

Unit: milliseconds
expr min lq mean median uq max neval cld
single_thread 4.3159 4.65055 5.245647 4.88995 5.37955 10.3636 100 a
parallel 157.9709 163.17605 177.565840 169.55155 180.03720 513.5421 100 b

Both methods perform a lot better in absolute time (both cases with 1000 rows and columns look like their respective timings with 100 rows and columns using the silly method).

But it's still much faster to use a single thread, because of the high overhead of setting up the workers compared with the relatively quick operation of calculating a mean.

Overall the fact that a parallel job may not be quicker does not indicate that the code is not creating the sub-processes as intended. There are costs and benefits of parallel processing. It is a lot faster in the right situations, but if you're copying large datasets or creating processes to do very simple calculations, it can be slower than a single-threaded approach.

Parallel mapply() for Windows; like mcmapply()

(disclaimer: I'm the authur)

The future.apply package provides one-to-one parallel versions of all of R's apply functions. Since it operates on top of the future framework, it works with basically all known parallel backends in R.

library(future.apply)
plan(multicore) ## forked processing

list1 <- list(elem1 = 1:3, elem2 = 4:6, elem3 = 7:9)
list2 <- list(elem1 = 10:12, elem2 = 13:15, elem3 = 15:17)

(result <- future_mapply(FUN = function(x, y){
x + y
}, list1, list2, SIMPLIFY = FALSE))

The above uses forked processing, just like parallel::mcmapply(). To use PSOCK cluster workers, which is also supported on MS Windows, have the end-user set:

plan(multisession)

With the above, you don't have to do conditional coding like:

if (parallel == "forks") {
...
} else if (parallel == "this") {
...
} else if (parallel == "that") {
...
} else {
...
}

All you need is a single future_mapply() call.

How to write efficient nested functions for parallelization?

You asked this a while ago but I'll attempt an answer in case anyone else was wondering the same thing. First, I like to split up my task first and then loop over each part. This gives me more control over the process.

parts <- split(df, c(df$class, df$group))
mclapply(parts, some_function)

Second, distributing tasks to multiple cores takes a lot of computational overhead and can cancel out any gains your make from paralleizing your script. Here, mclapply splits the job into however many nodes you have and performs the fork once. This is much more efficient than nesting two mclapply loops.

Parallelize user-defined function using apply family in R

Using theparallel package in R you can use the mclapply() function. You will need to adjust your code a little bit to make it run in parallel.

library(parallel)
my.df = data.frame(id=1:9,value=11:19)

sumPrevious <- function(i,df){df.id = df$id[i]
sum(df[df$id<=df.id,"value"])
}

mclapply(X = 1:nrow(my.df),FUN = sumPrevious,my.df,mc.preschedule = T,mc.cores = no.of.cores)

This code will run the sumPrevious in parallel on no.of.cores in your machine.



Related Topics



Leave a reply



Submit