Data.Table and Parallel Computing

data.table and parallel computing

First thing to check is that data.table FAQ 3.1 point 2 has sunk in :

One memory allocation is made for the largest group only, then that
memory is reused for the other groups. There is very little garbage
to collect.

That's one reason data.table grouping is quick. But this approach doesn't lend itself to parallelization. Parallelizing means copying the data to the other threads, instead, costing time. But, my understanding is that data.table grouping is usually faster than plyr with .parallel on anyway. It depends on the computation time of the task for each group, and if that compute time can be easily reduced or not. Moving the data around often dominates (when benchmarking 1 or 3 runs of large data tasks).

More often, so far, it's actually some gotcha that's biting in the j expression of [.data.table. For example, recently we saw poor performance from data.table grouping but the culprit turned out to be min(POSIXct) (Aggregating in R over 80K unique ID's). Avoiding that gotcha yielded over 50 times speedup.

So the mantra is: Rprof, Rprof, Rprof.

Further, point 1 from the same FAQ might be significant :

Only that column is grouped, the other 19 are ignored because
data.table inspects the j expression and realises it doesn’t use the
other columns.

So, data.table really doesn't follow the split-apply-combine paradigm at all. It works differently. split-apply-combine lends itself to parallelization but it really doesn't scale to large data.

Also see footnote 3 in the data.table intro vignette :

We wonder how many people are deploying parallel techniques to code
that is vector scanning

That's trying to say "sure, parallel is significantly faster, but how long should it really take with an efficient algorithm?".

BUT if you've profiled (using Rprof), and the task per group really is compute intensive, then the 3 posts on datatable-help including the word "multicore" might help:

multicore posts on datatable-help

Of course there are many tasks where parallelization would be nice in data.table, and there is a way to do it. But it hasn't been done yet, since usually other factors bite, so it's been low priority. If you can post reproducible dummy data with benchmarks and Rprof results, that would help increase the priority.

Speed-up data.table group by using multiple cores and parallel programming

If you have multiple cores available to you, why not leverage the fact that you can quickly filter & group rows in a data.table using its key:

library(doMC)
registerDoMC(cores=4)


setkey(dt, "a")

finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]

Note that if the number of unique groups (ie length(unique(a)) ) is relatively small, it will be faster to drop the .combine argument, get the results back in a list, then call rbindlist on the results. In my testing on two cores & 8GB RAM, the threshold was at about 9,000 unique values. Here is what I used to benchmark:

# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3)
# [1] 1.243 elapsed for N == 1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000



# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
# ------- #
}))), 3)
# [1] 1.117 elapsed for N == 1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000


## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")

Parallelization with data.table

First, calculate and store the alpha_j's.

Then, sort DT by x first and cut it into the relevant intervals before performing your linear interpolation

alpha <- c(NA, diff(yPoints) / diff(xPoints))

DT[order(x),
y := alpha[.GRP] * (x - xPoints[.GRP-1L]) + yPoints[.GRP-1L],
by=cut(x, xPoints)]

Please let me know how it performs.

data:

library(data.table)

## Main program
set.seed(27L)
xPoints <- c(4, 9, 12, 15, 18, 21)
yPoints <- c(1, 2, 3, 4, 5, 6)
DT <- data.table(x=rnorm(1e4, mean=12, sd=5))

check:

f_pwl <- function(x)  {
temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
for (i in seq(from = 1, to = length(x), by = 1)) {
if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
# nothing to do, temp[i] <- NA
} else if (x[i] == max(xPoints)) {
# value equal max(yPoints)
temp[i] <- max(yPoints)
} else {
# value is f_pwl(x)
xIndexVector = as.logical( x[i] >= xPoints & abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
}
} # end for i
as.vector( temp, mode = "double" )
}
system.time({
DT[, yOP := f_pwl( x ) ]
})

DT[abs(y-yOP) > 1e-6]
#Empty data.table (0 rows) of 3 cols: x,y,yOP

Parallelizing / Multithreading with data.table

I got answers from data.table developers from data.table github.

Here's a summary:

  • Finding groups of by variable itself is parallelized always, but more importantly,

  • If the function on j is generic (User Defined Function) then there's no parallelization.

  • Operations on j is parallelized if the function is (gforce) optimized (Expressions in j which contain only the functions min, max, mean, median, var, sd, sum, prod, first, last, head, tail)

So, it is advised to do parallel operation manually if the function on j is generic, but it may not always guarantee speed gain. Reference

==Solution==

In my case, I encountered vector memory exhaust when I plainly used DT[, var := some_function(var2)] even though my server had 1TB of ram, while data was taking 200GB of memory.

I used split(DT, by='grouper') to split my data.table into chunks, and utilized doFuture foreach %dopar% to do the job. It was pretty fast.

Is `Map()` when used in a `data.table` parallel? - R

You can use this rather explorative approach and see whether the time elapsed shrinks when more threads are used.
Note that on my machine the maximum number of usable threads is just one, so no difference is possible

library(data.table)

dt <- data.table::data.table(a = 1:3,
b = 4:6)
dt
#> a b
#> 1: 1 4
#> 2: 2 5
#> 3: 3 6

data.table::getDTthreads()
#> [1] 1

# No Prallelisation ----------------------------------
data.table::setDTthreads(1)
system.time({

dt[, lapply(.SD,
function(x) {
Sys.sleep(2)
x}
)
]
})
#> user system elapsed
#> 0.009 0.001 4.017

# Parallel -------------------------------------------
# use multiple threads
data.table::setDTthreads(2)
data.table::getDTthreads()
#> [1] 1

# if parallel, elapsed should be below 4
system.time({

dt[, lapply(.SD,
function(x) {
Sys.sleep(2)
x}
)
]
})
#> user system elapsed
#> 0.001 0.000 4.007

# Map -----------------------------------------------
# if parallel, elapsed should be below 4
system.time({

dt[, Map(f = function(x, y) {
Sys.sleep(2)
x},
.SD,
1:2

)
]
})
#> user system elapsed
#> 0.002 0.000 4.005

Parallel For Each Over Data Tables in a Data Set

Yes, you can. A DataSet and a DataTable are in memory collections. So if you can process them independently it's a good candidate for a parallel For Each.

Parallel.ForEach(
dataSet.Tables.Cast(Of DataTable),
Sub(table)
ProcessTable(table)
End Sub
)

Are 'j'-expressions in 'data.table' automatically parallelised?

From ?setDTthreads:

Internally parallelized code is used in the following places:

  • between.c - between()
  • cj.c - CJ()
  • coalesce.c - fcoalesce()
  • fifelse.c - fifelse()
  • fread.c - fread()
  • forder.c, fsort.c, and reorder.c - forder() and related
  • froll.c, frolladaptive.c, and frollR.c - froll() and family
  • fwrite.c - fwrite()
  • gsumm.c - GForce in various places, see GForce
  • nafill.c - nafill()
  • subset.c - Used in [.data.table subsetting
  • types.c - Internal testing usage

My understanding is that you should not expect data.table to make use of multithreading outside of the above use cases. Note that [.data.table uses multithreading for subsetting only, i.e., in i-expressions but not j-expressions. That is presumably just to speed up relational and logical operations, as in x[!is.na(a) & a > 0].

In a j-expression, sum and sapply are still just base::sum and base::sapply. You can test this with a benchmark:

library("data.table")
setDTthreads(4L)

x <- data.table(a = rnorm(2^25))
microbenchmark::microbenchmark(sum(x$a), x[, sum(a)], times = 1000L)
Unit: milliseconds
expr min lq mean median uq max neval
sum(x$a) 51.61281 51.68317 51.95975 51.84204 52.09202 56.67213 1000
x[, sum(a)] 51.78759 51.89054 52.18827 52.07291 52.33486 61.11378 1000
x <- data.table(a = seq_len(1e+04L))
microbenchmark::microbenchmark(sapply(x$a, paste, "is a good number"), x[, sapply(a, paste, "is a good number")], times = 1000L)
Unit: milliseconds
expr min lq mean median uq max neval
sapply(x$a, paste, "is a good number") 14.07403 15.7293 16.72879 16.31326 17.49072 45.62300 1000
x[, sapply(a, paste, "is a good number")] 14.56324 15.9375 17.03164 16.48971 17.69045 45.99823 1000

where it is clear that simply putting code into a j-expression does not improve performance.

data.table does recognize and handle certain constructs exceptionally. For instance, data.table uses its own radix-based forder instead of base::order when it sees x[order(...)]. (This feature is somewhat redundant now that users of base::order can request data.table's radix sort by passing method = "radix".) I haven't seen a "master list" of such exceptions.

As for whether using, e.g., parallel::mclapply inside of a j-expression can have performance benefits, I think the answer (as usual) depends on what you are trying to do and the scale of your data. Ultimately, you'll have to do your own benchmarks and profiling to find out. For example:

library("parallel")
cl <- makePSOCKcluster(4L)
microbenchmark::microbenchmark(x[, sapply(a, paste, "is a good number")], x[, parSapply(cl, a, paste, "is a good number")], times = 1000L)
stopCluster(cl)
Unit: milliseconds
expr min lq mean median uq max neval
x[, sapply(a, paste, "is a good number")] 14.553934 15.982681 17.105667 16.585525 17.864623 48.81276 1000
x[, parSapply(cl, a, paste, "is a good number")] 7.675487 8.426607 9.022947 8.802454 9.334532 25.67957 1000

So it is possible to see speed-up, though sometimes you pay the price in memory usage. For small enough problems, the overhead associated with R-level parallelism can definitely outweigh the performance benefits.

You'll find good thread about integrating parallel and data.table (including reasons not to) here.

Subset of data table when computing in parallel

The comment from @Roland above nonwithstanding, I've actually found that this type of approach can be very effective, and I've used it to parallelize millions of computations over millions of rows on a 40-core EC2 instance.

The first thing I would do is ensure that your key is set to the column you'll be using as an index for subsetting. Your index is a list of arrays, and that's a little different than I usually do it, but it should still work.

Try the following:

out <- 
foreach(i = indx, .packages = c('data.table'), .combine = data.table::rbind ) %dopar% {
Psubset<-DataP[i,]
# do some operations on Psubset
}

Or if for some reason combine isn't working or needs additional arguments you can do it after the fact.

out_list <- 
foreach(i = indx, .packages = c('data.table') ) %dopar% {
Psubset<-DataP[i,]
# do some operations on Psubset
)
}
out <- rbindlist(outlist) #, fill=TRUE, etc.

If that doesn't work I would take a look at the index so that it works more like:

out <- 
foreach(i = 1:max_indx, .packages = c('data.table'), .combine = data.table::rbind ) %dopar% {
Psubset<-DataP[indx==i,]
# do some operations on Psubset
}

But without a reproducible example it's hard to know which one would work best.



Related Topics



Leave a reply



Submit