10 min read

(For more resources related to this topic, see here.)

Parallelizing processing with pmap

The easiest way to parallelize data is to take a loop we already have and handle each item in it in a thread.

That is essentially what pmap does. If we replace a call to map with pmap, it takes each call to the function argument and executes it in a thread pool. pmap is not completely lazy, but it’s not completely strict, either: it stays just ahead of the output consumed. So if the output is never used, it won’t be fully realized.

For this recipe, we’ll calculate the Mandelbrot set. Each point in the output takes enough time that this is a good candidate to parallelize. We can just swap map for pmap and immediately see a speed-up.

How to do it…

The Mandelbrot set can be found by looking for points that don’t settle on a value after passing through the formula that defines the set quickly.

  1. We need a function that takes a point and the maximum number of iterations to try and return the iteration that it escapes on. That just means that the value gets above 4.

    (defn get-escape-point [scaled-x scaled-y max-iterations] (loop [x 0, y 0, iteration 0] (let [x2 (* x x), y2 (* y y)] (if (and (< (+ x2 y2) 4) (< iteration max-iterations)) (recur (+ (- x2 y2) scaled-x) (+ (* 2 x y) scaled-y) (inc iteration)) iteration))))

  2. The scaled points are the pixel points in the output, scaled to relative positions in the Mandelbrot set. Here are the functions that handle the scaling. Along with a particular x-y coordinate in the output, they’re given the range of the set and the number of pixels each direction.

    (defn scale-to ([pixel maximum [lower upper]] (+ (* (/ pixel maximum) (Math/abs (- upper lower))) lower))) (defn scale-point ([pixel-x pixel-y max-x max-y set-range] [(scale-to pixel-x max-x (:x set-range)) (scale-to pixel-y max-y (:y set-range))]))

  3. The function output-points returns a sequence of x, y values for each of the pixels in the final output.

    (defn output-points ([max-x max-y] (let [range-y (range max-y)] (mapcat (fn [x] (map #(vector x %) range-y)) (range max-x)))))

  4. For each output pixel, we need to scale it to a location in the range of the Mandelbrot set and then get the escape point for that location.

    (defn mandelbrot-pixel ([max-x max-y max-iterations set-range] (partial mandelbrot-pixel max-x max-y max-iterations set-range)) ([max-x max-y max-iterations set-range [pixel-x pixel-y]] (let [[x y] (scale-point pixel-x pixel-y max-x max-y set-range)] (get-escape-point x y max-iterations))))

  5. At this point, we can simply map mandelbrot-pixel over the results of outputpoints. We’ll also pass in the function to use (map or pmap).

    (defn mandelbrot ([mapper max-iterations max-x max-y set-range] (doall (mapper (mandelbrot-pixel max-x max-y max-iterations set-range) (output-points max-x max-y)))))

  6. Finally, we have to define the range that the Mandelbrot set covers.

    (def mandelbrot-range {:x [-2.5, 1.0], :y [-1.0, 1.0]})

How do these two compare? A lot depends on the parameters we pass them.

user=> (def m (time (mandelbrot map 500 1000 1000 mandelbrot-range))) "Elapsed time: 28981.112 msecs" #'user/m user=> (def m (time (mandelbrot pmap 500 1000 1000 mandelbrot-range))) "Elapsed time: 34205.122 msecs" #'user/m user=> (def m (time (mandelbrot map 1000 10001000 mandelbrot-range))) "Elapsed time: 85308.706 msecs" #'user/m user=> (def m (time (mandelbrot pmap 1000 10001000 mandelbrot-range))) "Elapsed time: 49067.584 msecs" #'user/m

Refer to the following chart:

If we only iterate at most 500 times for each point, it’s slightly faster to use map and work sequentially. However, if we iterate 1,000 times each, pmap is faster.

How it works…

This shows that parallelization is a balancing act. If each separate work item is small, the overhead of creating the threads, coordinating them, and passing data back and forth takes more time than doing the work itself. However, when each thread has enough to do to make it worth it, we can get nice speed-ups just by using pmap.

Behind the scenes, pmap takes each item and uses future to run it in a thread pool. It forces only a couple more items than you have processors, so it keeps your machine busy, without generating more work or data than you need.

There’s more…

For an in-depth, excellent discussion of the nuts and bolts of pmap, along with pointers about things to watch out for, see David Liebke’s talk, From Concurrency to Parallelism (http://blip.tv/clojure/david-liebke-from-concurrency-to-parallelism-4663526).

See also

  • The Partitioning Monte Carlo Simulations for better pmap performance recipe

Parallelizing processing with Incanter

One of its nice features is that it uses the Parallel Colt Java library (http://sourceforge.net/projects/parallelcolt/) to actually handle its processing, so when you use a lot of the matrix, statistical, or other functions, they’re automatically executed on multiple threads.

For this, we’ll revisit the Virginia housing-unit census data and we’ll fit it to a linear regression.

Getting ready

We’ll need to add Incanter to our list of dependencies in our Leiningen project.clj file:

:dependencies [[org.clojure/clojure "1.5.0"] [incanter "1.3.0"]]

We’ll also need to pull those libraries into our REPL or script:

(use '(incanter core datasets io optimize charts stats))

We can use the following filename:

(def data-file "data/all_160_in_51.P35.csv")

How to do it…

For this recipe, we’ll extract the data to analyze and perform the linear regression. We’ll then graph the data afterwards.

  1. First, we’ll read in the data and pull the population and housing unit columns into their own matrix.

    (def data (to-matrix (sel (read-dataset data-file :header true) :cols [:POP100 :HU100])))

  2. From this matrix, we can bind the population and the housing unit data to their own names.

    (def population (sel data :cols 0)) (def housing-units (sel data :cols 1))

  3. Now that we have those, we can use Incanter to fit the data.

    (def lm (linear-model housing-units population))

  4. Incanter makes it so easy, it’s hard not to look at it.

    (def plot (scatter-plot population housing-units :legend true)) (add-lines plot population (:fitted lm)) (view plot)

Here we can see that the graph of housing units to families makes a very straight line:

How it works…

Under the covers, Incanter takes the data matrix and partitions it into chunks. It then spreads those over the available CPUs to speed up processing. Of course, we don’t have to worry about this. That’s part of what makes Incanter so powerful.

Partitioning Monte Carlo simulations for better pmap performance

In the Parallelizing processing with pmap recipe, we found that while using pmap is easy enough, knowing when to use it is more complicated. Processing each task in the collection has to take enough time to make the costs of threading, coordinating processing, and communicating the data worth it. Otherwise, the program will spend more time concerned with how (parallelization) and not enough time with what (the task).

The way to get around this is to make sure that pmap has enough to do at each step that it parallelizes. The easiest way to do that is to partition the input collection into chunks and run pmap on groups of the input.

For this recipe, we’ll use Monte Carlo methods to approximate pi . We’ll compare a serial version against a naïve parallel version against a version that uses parallelization and partitions.

Getting ready

We’ll use Criterium to handle benchmarking, so we’ll need to include it as a dependency in our Leiningen project.clj file, shown as follows:

:dependencies [[org.clojure/clojure "1.5.0"] [criterium "0.3.0"]]

We’ll use these dependencies and the java.lang.Math class in our script or REPL.

(use 'criterium.core) (import [java.lang Math])

How to do it…

To implement this, we’ll define some core functions and then implement a Monte Carlo method for estimating pi that uses pmap.

  1. We need to define the functions necessary for the simulation. We’ll have one that generates a random two-dimensional point that will fall somewhere in the unit square.

    (defn rand-point [] [(rand) (rand)])

  2. Now, we need a function to return a point’s distance from the origin.

    (defn center-dist [[x y]] (Math/sqrt (+ (* x x) (* y y))))

  3. Next we’ll define a function that takes a number of points to process, and creates that many random points. It will return the number of points that fall inside a circle.

    (defn count-in-circle [n] (->> (repeatedly n rand-point) (map center-dist) (filter #(<= % 1.0)) count))

  4. That simplifies our definition of the base (serial) version. This calls count-incircle to get the proportion of random points in a unit square that fall inside a circle. It multiplies this by 4, which should approximate pi.

    (defn mc-pi [n] (* 4.0 (/ (count-in-circle n) n)))

  5. We’ll use a different approach for the simple pmap version. The function that we’ll parallelize will take a point and return 1 if it’s in the circle, or 0 if not. Then we can add those up to find the number in the circle.

    (defn in-circle-flag [p] (if (<= (center-dist p) 1.0) 1 0)) (defn mc-pi-pmap [n] (let [in-circle (->> (repeatedly n rand-point) (pmap in-circle-flag) (reduce + 0))] (* 4.0 (/ in-circle n))))

  6. For the version that chunks the input, we’ll do something different again. Instead of creating the sequence of random points and partitioning that, we’ll have a sequence that tells how large each partition should be and have pmap walk across that, calling count-in-circle. This means that creating the larger sequences are also parallelized.

    (defn mc-pi-part ([n] (mc-pi-part 512 n)) ([chunk-size n] (let [step (int (Math/floor (float (/ n chunk-size)))) remainder (mod n chunk-size) parts (lazy-seq (cons remainder (repeat step chunk-size))) in-circle (reduce + 0 (pmap count-in-circle parts))] (* 4.0 (/ in-circle n)))))

Now, how do these work? We’ll bind our parameters to names, and then we’ll run one set of benchmarks before we look at a table of all of them. We’ll discuss the results in the next section.

user=> (def chunk-size 4096) #'user/chunk-size user=> (def input-size 1000000) #'user/input-size user=> (quick-bench (mc-pi input-size)) WARNING: Final GC required 4.001679309213317 % of runtime Evaluation count : 6 in 6 samples of 1 calls. Execution time mean :634.387833 ms Execution time std-deviation : 33.222001 ms Execution time lower quantile : 606.122000 ms ( 2.5%) Execution time upper quantile : 677.273125 ms (97.5%) nil

Here’s all the information in the form of a table:

Function

Input Size

Chunk Size

Mean

Std Dev.

GC Time

mc-pi

1,000,000

NA

634.39ms

33.22 ms

4.0%

 

mc-pi-pmap

1,000,000

NA

1.92 sec

888.52 ms

2.60%

 

mc-pi-part

1,000,000

4,096

455.94 ms

4.19 ms

8.75%

 

Here’s a chart with the same information:

How it works…

There are a couple of things we should talk about here. Primarily, we’ll need to look at chunking the inputs for pmap, but we should also discuss Monte Carlo methods.

Estimating with Monte Carlo simulations

Monte Carlo simulations work by throwing random data at a problem that is fundamentally deterministic, but when it’s practically infeasible to attempt a more straightforward solution. Calculating pi is one example of this. By randomly filling in points in a unit square, p/4 will be approximately the ratio of points that will fall within a circle centered on 0, 0. The more random points that we use, the better the approximation.

I should note that this makes a good demonstration of Monte Carlo methods, but it’s a terrible way to calculate pi. It tends to be both slower and less accurate than the other methods.

Although not good for this task, Monte Carlo methods have been used for designing heat shields, simulating pollution, ray tracing, financial option pricing, evaluating business or financial products, and many, many more things.

For a more in-depth discussion, Wikipedia has a good introduction to Monte Carlo methods at http://en.wikipedia.org/wiki/Monte_Carlo_method.

Chunking data for pmap

The table we saw earlier makes it clear that partitioning helped: the partitioned version took just 72 percent of the time that the serial version did, while the naïve parallel version took more than three times longer. Based on the standard deviations, the results were also more consistent.

The speed up is because each thread is able to spend longer on each task. There is a performance penalty to spreading the work over multiple threads. Context switching (that is, switching between threads) costs time, and coordinating between threads does as well. But we expect to be able to make that time and more up by doing more things at once. However, if each task itself doesn’t take long enough, then the benefit won’t out-weigh the costs. Chunking the input—and effectively creating larger individual tasks for each thread— gets around this by giving each thread more to do, and thereby spending less time context switching and coordinating, relative to the overall time spent running.

LEAVE A REPLY

Please enter your comment!
Please enter your name here