data.table and parallel computing
First thing to check is that data.table
FAQ 3.1 point 2 has sunk in :
One memory allocation is made for the largest group only, then that
memory is reused for the other groups. There is very little garbage
to collect.
That's one reason data.table grouping is quick. But this approach doesn't lend itself to parallelization. Parallelizing means copying the data to the other threads, instead, costing time. But, my understanding is that data.table
grouping is usually faster than plyr
with .parallel
on anyway. It depends on the computation time of the task for each group, and if that compute time can be easily reduced or not. Moving the data around often dominates (when benchmarking 1 or 3 runs of large data tasks).
More often, so far, it's actually some gotcha that's biting in the j
expression of [.data.table
. For example, recently we saw poor performance from data.table
grouping but the culprit turned out to be min(POSIXct)
(Aggregating in R over 80K unique ID's). Avoiding that gotcha yielded over 50 times speedup.
So the mantra is: Rprof
, Rprof
, Rprof
.
Further, point 1 from the same FAQ might be significant :
Only that column is grouped, the other 19 are ignored because
data.table inspects the j expression and realises it doesn’t use the
other columns.
So, data.table
really doesn't follow the split-apply-combine paradigm at all. It works differently. split-apply-combine lends itself to parallelization but it really doesn't scale to large data.
Also see footnote 3 in the data.table intro vignette :
We wonder how many people are deploying parallel techniques to code
that is vector scanning
That's trying to say "sure, parallel is significantly faster, but how long should it really take with an efficient algorithm?".
BUT if you've profiled (using Rprof
), and the task per group really is compute intensive, then the 3 posts on datatable-help including the word "multicore" might help:
multicore posts on datatable-help
Of course there are many tasks where parallelization would be nice in data.table, and there is a way to do it. But it hasn't been done yet, since usually other factors bite, so it's been low priority. If you can post reproducible dummy data with benchmarks and Rprof results, that would help increase the priority.
Speed-up data.table group by using multiple cores and parallel programming
If you have multiple cores available to you, why not leverage the fact that you can quickly filter & group rows in a data.table using its key:
library(doMC)
registerDoMC(cores=4)
setkey(dt, "a")
finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
Note that if the number of unique groups (ie length(unique(a))
) is relatively small, it will be faster to drop the .combine
argument, get the results back in a list, then call rbindlist
on the results. In my testing on two cores & 8GB RAM, the threshold was at about 9,000 unique values. Here is what I used to benchmark:
# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3)
# [1] 1.243 elapsed for N == 1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000
# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
# ------- #
}))), 3)
# [1] 1.117 elapsed for N == 1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000
## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
Parallelization with data.table
First, calculate and store the alpha_j
's.
Then, sort DT
by x first and cut it into the relevant intervals before performing your linear interpolation
alpha <- c(NA, diff(yPoints) / diff(xPoints))
DT[order(x),
y := alpha[.GRP] * (x - xPoints[.GRP-1L]) + yPoints[.GRP-1L],
by=cut(x, xPoints)]
Please let me know how it performs.
data:
library(data.table)
## Main program
set.seed(27L)
xPoints <- c(4, 9, 12, 15, 18, 21)
yPoints <- c(1, 2, 3, 4, 5, 6)
DT <- data.table(x=rnorm(1e4, mean=12, sd=5))
check:
f_pwl <- function(x) {
temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
for (i in seq(from = 1, to = length(x), by = 1)) {
if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
# nothing to do, temp[i] <- NA
} else if (x[i] == max(xPoints)) {
# value equal max(yPoints)
temp[i] <- max(yPoints)
} else {
# value is f_pwl(x)
xIndexVector = as.logical( x[i] >= xPoints & abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
}
} # end for i
as.vector( temp, mode = "double" )
}
system.time({
DT[, yOP := f_pwl( x ) ]
})
DT[abs(y-yOP) > 1e-6]
#Empty data.table (0 rows) of 3 cols: x,y,yOP
Parallelizing / Multithreading with data.table
I got answers from data.table
developers from data.table github.
Here's a summary:
Finding groups of
by
variable itself is parallelized always, but more importantly,If the function on
j
is generic (User Defined Function) then there's no parallelization.Operations on
j
is parallelized if the function is (gforce) optimized (Expressions in j which contain only the functionsmin
,max
,mean
,median
,var
,sd
,sum
,prod
,first
,last
,head
,tail
)
So, it is advised to do parallel operation manually if the function on j
is generic, but it may not always guarantee speed gain. Reference
==Solution==
In my case, I encountered vector memory exhaust when I plainly used DT[, var := some_function(var2)]
even though my server had 1TB of ram, while data was taking 200GB of memory.
I used split(DT, by='grouper')
to split my data.table
into chunks, and utilized doFuture
foreach
%dopar%
to do the job. It was pretty fast.
Is `Map()` when used in a `data.table` parallel? - R
You can use this rather explorative approach and see whether the time elapsed shrinks when more threads are used.
Note that on my machine the maximum number of usable threads is just one, so no difference is possible
library(data.table)
dt <- data.table::data.table(a = 1:3,
b = 4:6)
dt
#> a b
#> 1: 1 4
#> 2: 2 5
#> 3: 3 6
data.table::getDTthreads()
#> [1] 1
# No Prallelisation ----------------------------------
data.table::setDTthreads(1)
system.time({
dt[, lapply(.SD,
function(x) {
Sys.sleep(2)
x}
)
]
})
#> user system elapsed
#> 0.009 0.001 4.017
# Parallel -------------------------------------------
# use multiple threads
data.table::setDTthreads(2)
data.table::getDTthreads()
#> [1] 1
# if parallel, elapsed should be below 4
system.time({
dt[, lapply(.SD,
function(x) {
Sys.sleep(2)
x}
)
]
})
#> user system elapsed
#> 0.001 0.000 4.007
# Map -----------------------------------------------
# if parallel, elapsed should be below 4
system.time({
dt[, Map(f = function(x, y) {
Sys.sleep(2)
x},
.SD,
1:2
)
]
})
#> user system elapsed
#> 0.002 0.000 4.005
Parallel For Each Over Data Tables in a Data Set
Yes, you can. A DataSet
and a DataTable
are in memory collections. So if you can process them independently it's a good candidate for a parallel For Each
.
Parallel.ForEach(
dataSet.Tables.Cast(Of DataTable),
Sub(table)
ProcessTable(table)
End Sub
)
Are 'j'-expressions in 'data.table' automatically parallelised?
From ?setDTthreads
:
Internally parallelized code is used in the following places:
between.c
-between()
cj.c
-CJ()
coalesce.c
-fcoalesce()
fifelse.c
-fifelse()
fread.c
-fread()
forder.c
,fsort.c
, andreorder.c
-forder()
and relatedfroll.c
,frolladaptive.c
, andfrollR.c
-froll()
and familyfwrite.c
-fwrite()
gsumm.c
- GForce in various places, see GForcenafill.c
-nafill()
subset.c
- Used in[.data.table
subsettingtypes.c
- Internal testing usage
My understanding is that you should not expect data.table
to make use of multithreading outside of the above use cases. Note that [.data.table
uses multithreading for subsetting only, i.e., in i
-expressions but not j
-expressions. That is presumably just to speed up relational and logical operations, as in x[!is.na(a) & a > 0]
.
In a j
-expression, sum
and sapply
are still just base::sum
and base::sapply
. You can test this with a benchmark:
library("data.table")
setDTthreads(4L)
x <- data.table(a = rnorm(2^25))
microbenchmark::microbenchmark(sum(x$a), x[, sum(a)], times = 1000L)
Unit: milliseconds
expr min lq mean median uq max neval
sum(x$a) 51.61281 51.68317 51.95975 51.84204 52.09202 56.67213 1000
x[, sum(a)] 51.78759 51.89054 52.18827 52.07291 52.33486 61.11378 1000
x <- data.table(a = seq_len(1e+04L))
microbenchmark::microbenchmark(sapply(x$a, paste, "is a good number"), x[, sapply(a, paste, "is a good number")], times = 1000L)
Unit: milliseconds
expr min lq mean median uq max neval
sapply(x$a, paste, "is a good number") 14.07403 15.7293 16.72879 16.31326 17.49072 45.62300 1000
x[, sapply(a, paste, "is a good number")] 14.56324 15.9375 17.03164 16.48971 17.69045 45.99823 1000
where it is clear that simply putting code into a j
-expression does not improve performance.
data.table
does recognize and handle certain constructs exceptionally. For instance, data.table
uses its own radix-based forder
instead of base::order
when it sees x[order(...)]
. (This feature is somewhat redundant now that users of base::order
can request data.table
's radix sort by passing method = "radix"
.) I haven't seen a "master list" of such exceptions.
As for whether using, e.g., parallel::mclapply
inside of a j
-expression can have performance benefits, I think the answer (as usual) depends on what you are trying to do and the scale of your data. Ultimately, you'll have to do your own benchmarks and profiling to find out. For example:
library("parallel")
cl <- makePSOCKcluster(4L)
microbenchmark::microbenchmark(x[, sapply(a, paste, "is a good number")], x[, parSapply(cl, a, paste, "is a good number")], times = 1000L)
stopCluster(cl)
Unit: milliseconds
expr min lq mean median uq max neval
x[, sapply(a, paste, "is a good number")] 14.553934 15.982681 17.105667 16.585525 17.864623 48.81276 1000
x[, parSapply(cl, a, paste, "is a good number")] 7.675487 8.426607 9.022947 8.802454 9.334532 25.67957 1000
So it is possible to see speed-up, though sometimes you pay the price in memory usage. For small enough problems, the overhead associated with R-level parallelism can definitely outweigh the performance benefits.
You'll find good thread about integrating parallel
and data.table
(including reasons not to) here.
Subset of data table when computing in parallel
The comment from @Roland above nonwithstanding, I've actually found that this type of approach can be very effective, and I've used it to parallelize millions of computations over millions of rows on a 40-core EC2 instance.
The first thing I would do is ensure that your key is set to the column you'll be using as an index for subsetting. Your index is a list of arrays, and that's a little different than I usually do it, but it should still work.
Try the following:
out <-
foreach(i = indx, .packages = c('data.table'), .combine = data.table::rbind ) %dopar% {
Psubset<-DataP[i,]
# do some operations on Psubset
}
Or if for some reason combine
isn't working or needs additional arguments you can do it after the fact.
out_list <-
foreach(i = indx, .packages = c('data.table') ) %dopar% {
Psubset<-DataP[i,]
# do some operations on Psubset
)
}
out <- rbindlist(outlist) #, fill=TRUE, etc.
If that doesn't work I would take a look at the index so that it works more like:
out <-
foreach(i = 1:max_indx, .packages = c('data.table'), .combine = data.table::rbind ) %dopar% {
Psubset<-DataP[indx==i,]
# do some operations on Psubset
}
But without a reproducible example it's hard to know which one would work best.
Related Topics
Understanding Dates and Plotting a Histogram with Ggplot2 in R
Duplicate 'Row.Names' Are Not Allowed Error
R Shiny Rest API Communication
Variable Name Restrictions in R
Fill Missing Combinations in a Dataframe
Sort a Data.Table Fast by Ascending/Descending Order
Euclidean Distance of Two Vectors
Extract Last Word in String in R
Plotting a 3D Surface Plot with Contour Map Overlay, Using R
Legend Placement, Ggplot, Relative to Plotting Region
Stacked Bar Chart in R (Ggplot2) with Y Axis and Bars as Percentage of Counts
How to Define Fixed Aspect-Ratio for (Base R) Scatter-Plot
Add Number of Observations Per Group in Ggplot2 Boxplot