In this article, by Aloysius Lim and William Tjhi, authors of the book R High Performance Programming, we will learn how to write and execute a parallel R code, where different parts of the code run simultaneously. So far, we have learned various ways to optimize the performance of R programs running serially, that is in a single process. This does not take full advantage of the computing power of modern CPUs with multiple cores. Parallel computing allows us to tap into all the computational resources available and to speed up the execution of R programs by many times. We will examine the different types of parallelism and how to implement them in R, and we will take a closer look at a few performance considerations when designing the parallel architecture of R programs.
(For more resources related to this topic, see here.)
Data parallelism versus task parallelism
Many modern software applications are designed to run computations in parallel in order to take advantage of the multiple CPU cores available on almost any computer today. Many R programs can similarly be written in order to run in parallel. However, the extent of possible parallelism depends on the computing task involved. On one side of the scale are embarrassingly parallel tasks, where there are no dependencies between the parallel subtasks; such tasks can be made to run in parallel very easily. An example of this is, building an ensemble of decision trees in a random forest algorithm—randomized decision trees can be built independently from one another and in parallel across tens or hundreds of CPUs, and can be combined to form the random forest. On the other end of the scale are tasks that cannot be parallelized, as each step of the task depends on the results of the previous step. One such example is a depth-first search of a tree, where the subtree to search at each step depends on the path taken in previous steps. Most algorithms fall somewhere in between with some steps that must run serially and some that can run in parallel. With this in mind, careful thought must be given when designing a parallel code that works correctly and efficiently.
Often an R program has some parts that have to be run serially and other parts that can run in parallel. Before making the effort to parallelize any of the R code, it is useful to have an estimate of the potential performance gains that can be achieved. Amdahl’s law provides a way to estimate the best attainable performance gain when you convert a code from serial to parallel execution. It divides a computing task into its serial and potentially-parallel parts and states that the time needed to execute the task in parallel will be no less than this formula:
T(n) = T(1)(P + (1-P)/n), where:
- T(n) is the time taken to execute the task using n parallel processes
- P is the proportion of the whole task that is strictly serial
The theoretical best possible speed up of the parallel algorithm is thus:
S(n) = T(1) / T(n) = 1 / (P + (1-P)/n)
For example, given a task that takes 10 seconds to execute on one processor, where half of the task can be run in parallel, then the best possible time to run it on four processors is T(4) = 10(0.5 + (1-0.5)/4) = 6.25 seconds.
The theoretical best possible speed up of the parallel algorithm with four processors is 1 / (0.5 + (1-0.5)/4) = 1.6x .
The following figure shows you how the theoretical best possible execution time decreases as more CPU cores are added. Notice that the execution time reaches a limit that is just above five seconds. This corresponds to the half of the task that must be run serially, where parallelism does not help.
Best possible execution time versus number of CPU cores
In general, Amdahl’s law means that the fastest execution time for any parallelized algorithm is limited by the time needed for the serial portions of the algorithm. Bear in mind that Amdahl’s law provides only a theoretical estimate. It does not account for the overheads of parallel computing (such as starting and coordinating tasks) and assumes that the parallel portions of the algorithm are infinitely scalable. In practice, these factors might significantly limit the performance gains of parallelism, so use Amdahl’s law only to get a rough estimate of the maximum speedup possible.
There are two main classes of parallelism: data parallelism and task parallelism. Understanding these concepts helps to determine what types of tasks can be modified to run in parallel.
In data parallelism, a dataset is divided into multiple partitions. Different partitions are distributed to multiple processors, and the same task is executed on each partition of data. Take for example, the task of finding the maximum value in a vector dataset, say one that has one billion numeric data points. A serial algorithm to do this would look like the following code, which iterates over every element of the data in sequence to search for the largest value. (This code is intentionally verbose to illustrate how the algorithm works; in practice, the max() function in R, though also serial in nature, is much faster.)
serialmax <- function(data) {
max = -Inf
for (i in data) {
if (i > max)
max = i
}
return max
}
One way to parallelize this algorithm is to split the data into partitions. If we have a computer with eight CPU cores, we can split the data into eight partitions of 125 million numbers each. Here is the pseudocode for how to perform the same task in parallel:
# Run this in parallel across 8 CPU cores
part.results <- run.in.parallel(serialmax(data.part))
# Compute global max
global.max <- serialmax(part.results)
This pseudocode runs eight instances of serialmax()in parallel—one for each data partition—to find the local maximum value in each partition. Once all the partitions have been processed, the algorithm finds the global maximum value by finding the largest value among the local maxima. This parallel algorithm works because the global maximum of a dataset must be the largest of the local maxima from all the partitions.
The following figure depicts data parallelism pictorially. The key behind data parallel algorithms is that each partition of data can be processed independently of the other partitions, and the results from all the partitions can be combined to compute the final results. This is similar to the mechanism of the MapReduce framework from Hadoop. Data parallelism allows algorithms to scale up easily as data volume increases—as more data is added to the dataset, more computing nodes can be added to a cluster to process new partitions of data.
Data parallelism
Other examples of computations and algorithms that can be run in a data parallel way include:
- Element-wise matrix operations such as addition and subtraction: The matrices can be partitioned and the operations are applied to each pair of partitions.
- Means: The sums and number of elements in each partition can be added to find the global sum and number of elements from which the mean can be computed.
- K-means clustering: After data partitioning, the K centroids are distributed to all the partitions. Finding the closest centroid is performed in parallel and independently across the partitions. The centroids are updated by first, calculating the sums and the counts of their respective members in parallel, and then consolidating them in a single process to get the global means.
- Frequent itemset mining using the Partition algorithm: In the first pass, the frequent itemsets are mined from each partition of data to generate a global set of candidate itemsets; in the second pass, the supports of the candidate itemsets are summed from each partition to filter out the globally infrequent ones.
The other main class of parallelism is task parallelism, where tasks are distributed to and executed on different processors in parallel. The tasks on each processor might be the same or different, and the data that they act on might also be the same or different. The key difference between task parallelism and data parallelism is that the data is not divided into partitions. An example of a task parallel algorithm performing the same task on the same data is the training of a random forest model. A random forest is a collection of decision trees built independently on the same data. During the training process for a particular tree, a random subset of the data is chosen as the training set, and the variables to consider at each branch of the tree are also selected randomly. Hence, even though the same data is used, the trees are different from one another. In order to train a random forest of say 100 decision trees, the workload could be distributed to a computing cluster with 100 processors, with each processor building one tree. All the processors perform the same task on the same data (or exact copies of the data), but the data is not partitioned.
The parallel tasks can also be different. For example, computing a set of summary statistics on the same set of data can be done in a task parallel way. Each process can be assigned to compute a different statistic—the mean, standard deviation, percentiles, and so on.
Pseudocode of a task parallel algorithm might look like this:
# Run 4 tasks in parallel across 4 cores
for (task in tasks)
run.in.parallel(task)
# Collect the results of the 4 tasks
results <- collect.parallel.output()
# Continue processing after all 4 tasks are complete
Implementing data parallel algorithms
Several R packages allow code to be executed in parallel. The parallel package that comes with R provides the foundation for most parallel computing capabilities in other packages. Let’s see how it works with an example.
This example involves finding documents that match a regular expression. Regular expression matching is a fairly computational expensive task, depending on the complexity of the regular expression. The corpus, or set of documents, for this example is a sample of the Reuters-21578 dataset for the topic corporate acquisitions (acq) from the tm package. Because this dataset contains only 50 documents, they are replicated 100,000 times to form a corpus of 5 million documents so that parallelizing the code will lead to meaningful savings in execution times.
library(tm)
data("acq")
textdata <- rep(sapply(content(acq), content), 1e5)
The task is to find documents that match the regular expression d+(,d+)? mln dlrs, which represents monetary amounts in millions of dollars. In this regular expression, d+ matches a string of one or more digits, and (,d+)? optionally matches a comma followed by one more digits. For example, the strings 12 mln dlrs, 1,234 mln dlrs and 123,456,789 mln dlrs will match the regular expression. First, we will measure the execution time to find these documents serially with grepl():
pattern <- "\d+(,\d+)? mln dlrs"
system.time(res1 <- grepl(pattern, textdata))
## user system elapsed
## 65.601 0.114 65.721
Next, we will modify the code to run in parallel and measure the execution time on a computer with four CPU cores:
library(parallel)
detectCores()
## [1] 4
cl <- makeCluster(detectCores())
part <- clusterSplit(cl, seq_along(textdata))
text.partitioned <- lapply(part, function(p) textdata[p])
system.time(res2 <- unlist(
parSapply(cl, text.partitioned, grepl, pattern = pattern)
))
## user system elapsed
## 3.708 8.007 50.806
stopCluster(cl)
In this code, the detectCores() function reveals how many CPU cores are available on the machine, where this code is executed. Before running any parallel code, makeCluster() is called to create a local cluster of processing nodes with all four CPU cores. The corpus is then split into four partitions using the clusterSplit() function to determine the ideal split of the corpus such that each partition has roughly the same number of documents.
The actual parallel execution of grepl() on each partition of the corpus is carried out by the parSapply() function. Each processing node in the cluster is given a copy of the partition of data that it is supposed to process along with the code to be executed and other variables that are needed to run the code (in this case, the pattern argument). When all four processing nodes have completed their tasks, the results are combined in a similar fashion to sapply().
Finally, the cluster is destroyed by calling stopCluster().
It is good practice to ensure that stopCluster() is always called in production code, even if an error occurs during execution. This can be done as follows:
doSomethingInParallel <- function(...) {
cl <- makeCluster(...)
on.exit(stopCluster(cl))
# do something
}
In this example, running the task in parallel on four processors resulted in a 23 percent reduction in the execution time. This is not in proportion to the amount of compute resources used to perform the task; with four times as many CPU cores working on it, a perfectly parallelizable task might experience as much as a 75 percent runtime reduction. However, remember Amdahl’s law—the speed of parallel code is limited by the serial parts, which includes the overheads of parallelization. In this case, calling makeCluster() with the default arguments creates a socket-based cluster. When such a cluster is created, additional copies of R are run as workers. The workers communicate with the master R process using network sockets, hence the name. The worker R processes are initialized with the relevant packages loaded, and data partitions are serialized and sent to each worker process. These overheads can be significant, especially in data parallel algorithms where large volumes of data needs to be transferred to the worker processes.
Besides parSapply(), parallel also provides the parApply() and parLapply() functions; these functions are analogous to the standard sapply(), apply(), and lapply() functions, respectively. In addition, the parLapplyLB() and parSapplyLB() functions provide load balancing, which is useful when the execution of each parallel task takes variable amounts of time. Finally, parRapply() and parCapply() are parallel row and column apply() functions for matrices.
On non-Windows systems, parallel supports another type of cluster that often incurs less overheads — forked clusters. In these clusters, new worker processes are forked from the parent R process with a copy of the data. However, the data is not actually copied in the memory unless it is modified by a child process. This means that, compared to socket-based clusters, initializing child processes is quicker and the memory usage is often lower.
Another advantage of using forked clusters is that parallel provides a convenient and concise way to run tasks on them via the mclapply(), mcmapply(), and mcMap() functions. (These functions start with mc because they were originally a part of the multicore package) There is no need to explicitly create and destroy the cluster, as these functions do this automatically. We can simply call mclapply() and state the number of worker processes to fork via the mc.cores argument:
system.time(res3 <- unlist(
mclapply(text.partitioned, grepl, pattern = pattern,
mc.cores = detectCores())
))
## user system elapsed
## 127.012 0.350 33.264
This shows a 49 percent reduction in execution time compared to the serial version, and 35 percent reduction compared to parallelizing using a socket-based cluster. For this example, forked clusters provide the best performance.
Due to differences in system configuration, you might see very different results when you try the examples in your own environment. When you develop parallel code, it is important to test the code in an environment that is similar to the one that it will eventually run in.
Implementing task parallel algorithms
Let’s now see how to implement a task parallel algorithm using both socket-based and forked clusters. We will look at how to run the same task and different tasks on workers in a cluster.
Running the same task on workers in a cluster
To demonstrate how to run the same task on a cluster, the task for this example is to generate 500 million Poisson random numbers. We will do this by using L’Ecuyer’s combined multiple-recursive generator, which is the only random number generator in base R that supports multiple streams to generate random numbers in parallel. The random number generator is selected by calling the RNGkind() function.
We cannot just use any random number generator in parallel because the randomness of the data depends on the algorithm used to generate random data and the seed value given to each parallel task. Most other algorithms were not designed to produce random numbers in multiple parallel streams, and might produce multiple highly correlated streams of numbers, or worse, multiple identical streams!
First, we will measure the execution time of the serial algorithm:
RNGkind("L'Ecuyer-CMRG")
nsamples <- 5e8
lambda <- 10
system.time(random1 <- rpois(nsamples, lambda))
## user system elapsed
## 51.905 0.636 52.544
To generate the random numbers on a cluster, we will first distribute the task evenly among the workers. In the following code, the integer vector samples.per.process contains the number of random numbers that each worker needs to generate on a four-core CPU. The seq() function produces ncores+1 numbers evenly distributed between 0 and nsamples, with the first number being 0 and the next ncores numbers indicating the approximate cumulative number of samples across the worker processes. The round() function rounds off these numbers into integers and diff() computes the difference between them to give the number of random numbers that each worker process should generate.
cores <- detectCores()
cl <- makeCluster(ncores)
samples.per.process <-
diff(round(seq(0, nsamples, length.out = ncores+1)))
Before we can generate the random numbers on a cluster, each worker needs a different seed from which it can generate a stream of random numbers. The seeds need to be set on all the workers before running the task, to ensure that all the workers generate different random numbers.
For a socket-based cluster, we can call clusterSetRNGStream() to set the seeds for the workers, then run the random number generation task on the cluster. When the task is completed, we call stopCluster() to shut down the cluster:
clusterSetRNGStream(cl)
system.time(random2 <- unlist(
parLapply(cl, samples.per.process, rpois,
lambda = lambda)
))
## user system elapsed
## 5.006 3.000 27.436
stopCluster(cl)
Using four parallel processes in a socket-based cluster reduces the execution time by 48 percent. The performance of this type of cluster for this example is better than that of the data parallel example because there is less data to copy to the worker processes—only an integer that indicates how many random numbers to generate.
Next, we run the same task on a forked cluster (again, this is not supported on Windows). The mclapply() function can set the random number seeds for each worker for us, when the mc.set.seed argument is set to TRUE; we do not need to call clusterSetRNGStream(). Otherwise, the code is similar to that of the socket-based cluster:
system.time(random3 <- unlist(
mclapply(samples.per.process, rpois,
lambda = lambda,
mc.set.seed = TRUE, mc.cores = ncores)
))
## user system elapsed
## 76.283 7.272 25.052
On our test machine, the execution time of the forked cluster is slightly faster, but close to that of the socket-based cluster, indicating that the overheads for this task are similar for both types of clusters.
Running different tasks on workers in a cluster
So far, we have executed the same tasks on each parallel process. The parallel package also allows different tasks to be executed on different workers. For this example, the task is to generate not only Poisson random numbers, but also uniform, normal, and exponential random numbers. As before, we start by measuring the time to perform this task serially:
RNGkind("L'Ecuyer-CMRG")
nsamples <- 5e7
pois.lambda <- 10
system.time(random1 <- list(pois = rpois(nsamples,
pois.lambda),
unif = runif(nsamples),
norm = rnorm(nsamples),
exp = rexp(nsamples)))
## user system elapsed
## 14.180 0.384 14.570
In order to run different tasks on different workers on socket-based clusters, a list of function calls and their associated arguments must be passed to parLapply(). This is a bit cumbersome, but parallel unfortunately does not provide an easier interface to run different tasks on a socket-based cluster. In the following code, the function calls are represented as a list of lists, where the first element of each sublist is the name of the function that runs on a worker, and the second element contains the function arguments. The function do.call() is used to call the given function with the given arguments.
cores <- detectCores()
cl <- makeCluster(cores)
calls <- list(pois = list("rpois", list(n = nsamples,
lambda = pois.lambda)),
unif = list("runif", list(n = nsamples)),
norm = list("rnorm", list(n = nsamples)),
exp = list("rexp", list(n = nsamples)))
clusterSetRNGStream(cl)
system.time(
random2 <- parLapply(cl, calls,
function(call) {
do.call(call[[1]], call[[2]])
})
)
## user system elapsed
## 2.185 1.629 10.403
stopCluster(cl)
On forked clusters on non-Windows machines, the mcparallel() and mccollect() functions offer a more intuitive way to run different tasks on different workers. For each task, mcparallel() sends the given task to an available worker. Once all the workers have been assigned their tasks, mccollect() waits for the workers to complete their tasks and collects the results from all the workers.
mc.reset.stream()
system.time({
jobs <- list()
jobs[[1]] <- mcparallel(rpois(nsamples, pois.lambda),
"pois", mc.set.seed = TRUE)
jobs[[2]] <- mcparallel(runif(nsamples),
"unif", mc.set.seed = TRUE)
jobs[[3]] <- mcparallel(rnorm(nsamples),
"norm", mc.set.seed = TRUE)
jobs[[4]] <- mcparallel(rexp(nsamples),
"exp", mc.set.seed = TRUE)
random3 <- mccollect(jobs)
})
## user system elapsed
## 14.535 3.569 7.97
Notice that we also had to call mc.reset.stream() to set the seeds for random number generation in each worker. This was not necessary when we used mclapply(), which calls mc.reset.stream() for us. However, mcparallel() does not, so we need to call it ourselves.
Summary
In this article, we learned about two classes of parallelism: data parallelism and task parallelism. Data parallelism is good for tasks that can be performed in parallel on partitions of a dataset. The dataset to be processed is split into partitions and each partition is processed on a different worker processes. Task parallelism, on the other hand, divides a set of similar or different tasks to amongst the worker processes. In either case, Amdahl’s law states that the maximum improvement in speed that can be achieved by parallelizing code is limited by the proportion of that code that can be parallelized.
Resources for Article:
Further resources on this subject:
- Using R for Statistics, Research, and Graphics [Article]
- Learning Data Analytics with R and Hadoop [Article]
- Aspects of Data Manipulation in R [Article]