(For more resources related to this topic, see here.)
Parallelizing processing with pmap
The easiest way to parallelize data is to take a loop we already have and handle each item in it in a thread.
That is essentially what pmap does. If we replace a call to map with pmap, it takes each call to the function argument and executes it in a thread pool. pmap is not completely lazy, but it’s not completely strict, either: it stays just ahead of the output consumed. So if the output is never used, it won’t be fully realized.
For this recipe, we’ll calculate the Mandelbrot set. Each point in the output takes enough time that this is a good candidate to parallelize. We can just swap map for pmap and immediately see a speedup.
How to do it…
The Mandelbrot set can be found by looking for points that don’t settle on a value after passing through the formula that defines the set quickly.

We need a function that takes a point and the maximum number of iterations to try and return the iteration that it escapes on. That just means that the value gets above 4.
(defn getescapepoint [scaledx scaledy maxiterations] (loop [x 0, y 0, iteration 0] (let [x2 (* x x), y2 (* y y)] (if (and (< (+ x2 y2) 4) (< iteration maxiterations)) (recur (+ ( x2 y2) scaledx) (+ (* 2 x y) scaledy) (inc iteration)) iteration))))

The scaled points are the pixel points in the output, scaled to relative positions in the Mandelbrot set. Here are the functions that handle the scaling. Along with a particular xy coordinate in the output, they’re given the range of the set and the number of pixels each direction.
(defn scaleto ([pixel maximum [lower upper]] (+ (* (/ pixel maximum) (Math/abs ( upper lower))) lower))) (defn scalepoint ([pixelx pixely maxx maxy setrange] [(scaleto pixelx maxx (:x setrange)) (scaleto pixely maxy (:y setrange))]))

The function outputpoints returns a sequence of x, y values for each of the pixels in the final output.
(defn outputpoints ([maxx maxy] (let [rangey (range maxy)] (mapcat (fn [x] (map #(vector x %) rangey)) (range maxx)))))

For each output pixel, we need to scale it to a location in the range of the Mandelbrot set and then get the escape point for that location.
(defn mandelbrotpixel ([maxx maxy maxiterations setrange] (partial mandelbrotpixel maxx maxy maxiterations setrange)) ([maxx maxy maxiterations setrange [pixelx pixely]] (let [[x y] (scalepoint pixelx pixely maxx maxy setrange)] (getescapepoint x y maxiterations))))

At this point, we can simply map mandelbrotpixel over the results of outputpoints. We’ll also pass in the function to use (map or pmap).
(defn mandelbrot ([mapper maxiterations maxx maxy setrange] (doall (mapper (mandelbrotpixel maxx maxy maxiterations setrange) (outputpoints maxx maxy)))))

Finally, we have to define the range that the Mandelbrot set covers.
(def mandelbrotrange {:x [2.5, 1.0], :y [1.0, 1.0]})
How do these two compare? A lot depends on the parameters we pass them.
user=> (def m (time (mandelbrot map 500 1000 1000 mandelbrotrange))) "Elapsed time: 28981.112 msecs" #'user/m user=> (def m (time (mandelbrot pmap 500 1000 1000 mandelbrotrange))) "Elapsed time: 34205.122 msecs" #'user/m user=> (def m (time (mandelbrot map 1000 10001000 mandelbrotrange))) "Elapsed time: 85308.706 msecs" #'user/m user=> (def m (time (mandelbrot pmap 1000 10001000 mandelbrotrange))) "Elapsed time: 49067.584 msecs" #'user/m
Refer to the following chart:
If we only iterate at most 500 times for each point, it’s slightly faster to use map and work sequentially. However, if we iterate 1,000 times each, pmap is faster.
How it works…
This shows that parallelization is a balancing act. If each separate work item is small, the overhead of creating the threads, coordinating them, and passing data back and forth takes more time than doing the work itself. However, when each thread has enough to do to make it worth it, we can get nice speedups just by using pmap.
Behind the scenes, pmap takes each item and uses future to run it in a thread pool. It forces only a couple more items than you have processors, so it keeps your machine busy, without generating more work or data than you need.
There’s more…
For an indepth, excellent discussion of the nuts and bolts of pmap, along with pointers about things to watch out for, see David Liebke’s talk, From Concurrency to Parallelism (http://blip.tv/clojure/davidliebkefromconcurrencytoparallelism4663526).
See also

The Partitioning Monte Carlo Simulations for better pmap performance recipe
Parallelizing processing with Incanter
One of its nice features is that it uses the Parallel Colt Java library (http://sourceforge.net/projects/parallelcolt/) to actually handle its processing, so when you use a lot of the matrix, statistical, or other functions, they’re automatically executed on multiple threads.
For this, we’ll revisit the Virginia housingunit census data and we’ll fit it to a linear regression.
Getting ready
We’ll need to add Incanter to our list of dependencies in our Leiningen project.clj file:
:dependencies [[org.clojure/clojure "1.5.0"] [incanter "1.3.0"]]
We’ll also need to pull those libraries into our REPL or script:
(use '(incanter core datasets io optimize charts stats))
We can use the following filename:
(def datafile "data/all_160_in_51.P35.csv")
How to do it…
For this recipe, we’ll extract the data to analyze and perform the linear regression. We’ll then graph the data afterwards.

First, we’ll read in the data and pull the population and housing unit columns into their own matrix.
(def data (tomatrix (sel (readdataset datafile :header true) :cols [:POP100 :HU100])))

From this matrix, we can bind the population and the housing unit data to their own names.
(def population (sel data :cols 0)) (def housingunits (sel data :cols 1))

Now that we have those, we can use Incanter to fit the data.
(def lm (linearmodel housingunits population))

Incanter makes it so easy, it’s hard not to look at it.
(def plot (scatterplot population housingunits :legend true)) (addlines plot population (:fitted lm)) (view plot)
Here we can see that the graph of housing units to families makes a very straight line:
How it works…
Under the covers, Incanter takes the data matrix and partitions it into chunks. It then spreads those over the available CPUs to speed up processing. Of course, we don’t have to worry about this. That’s part of what makes Incanter so powerful.
Partitioning Monte Carlo simulations for better pmap performance
In the Parallelizing processing with pmap recipe, we found that while using pmap is easy enough, knowing when to use it is more complicated. Processing each task in the collection has to take enough time to make the costs of threading, coordinating processing, and communicating the data worth it. Otherwise, the program will spend more time concerned with how (parallelization) and not enough time with what (the task).
The way to get around this is to make sure that pmap has enough to do at each step that it parallelizes. The easiest way to do that is to partition the input collection into chunks and run pmap on groups of the input.
For this recipe, we’ll use Monte Carlo methods to approximate pi . We’ll compare a serial version against a naïve parallel version against a version that uses parallelization and partitions.
Getting ready
We’ll use Criterium to handle benchmarking, so we’ll need to include it as a dependency in our Leiningen project.clj file, shown as follows:
:dependencies [[org.clojure/clojure "1.5.0"] [criterium "0.3.0"]]
We’ll use these dependencies and the java.lang.Math class in our script or REPL.
(use 'criterium.core) (import [java.lang Math])
How to do it…
To implement this, we’ll define some core functions and then implement a Monte Carlo method for estimating pi that uses pmap.

We need to define the functions necessary for the simulation. We’ll have one that generates a random twodimensional point that will fall somewhere in the unit square.
(defn randpoint [] [(rand) (rand)])

Now, we need a function to return a point’s distance from the origin.
(defn centerdist [[x y]] (Math/sqrt (+ (* x x) (* y y))))

Next we’ll define a function that takes a number of points to process, and creates that many random points. It will return the number of points that fall inside a circle.
(defn countincircle [n] (>> (repeatedly n randpoint) (map centerdist) (filter #(<= % 1.0)) count))

That simplifies our definition of the base (serial) version. This calls countincircle to get the proportion of random points in a unit square that fall inside a circle. It multiplies this by 4, which should approximate pi.
(defn mcpi [n] (* 4.0 (/ (countincircle n) n)))

We’ll use a different approach for the simple pmap version. The function that we’ll parallelize will take a point and return 1 if it’s in the circle, or 0 if not. Then we can add those up to find the number in the circle.
(defn incircleflag [p] (if (<= (centerdist p) 1.0) 1 0)) (defn mcpipmap [n] (let [incircle (>> (repeatedly n randpoint) (pmap incircleflag) (reduce + 0))] (* 4.0 (/ incircle n))))

For the version that chunks the input, we’ll do something different again. Instead of creating the sequence of random points and partitioning that, we’ll have a sequence that tells how large each partition should be and have pmap walk across that, calling countincircle. This means that creating the larger sequences are also parallelized.
(defn mcpipart ([n] (mcpipart 512 n)) ([chunksize n] (let [step (int (Math/floor (float (/ n chunksize)))) remainder (mod n chunksize) parts (lazyseq (cons remainder (repeat step chunksize))) incircle (reduce + 0 (pmap countincircle parts))] (* 4.0 (/ incircle n)))))
Now, how do these work? We’ll bind our parameters to names, and then we’ll run one set of benchmarks before we look at a table of all of them. We’ll discuss the results in the next section.
user=> (def chunksize 4096) #'user/chunksize user=> (def inputsize 1000000) #'user/inputsize user=> (quickbench (mcpi inputsize)) WARNING: Final GC required 4.001679309213317 % of runtime Evaluation count : 6 in 6 samples of 1 calls. Execution time mean :634.387833 ms Execution time stddeviation : 33.222001 ms Execution time lower quantile : 606.122000 ms ( 2.5%) Execution time upper quantile : 677.273125 ms (97.5%) nil
Here’s all the information in the form of a table:
Function 
Input Size 
Chunk Size 
Mean 
Std Dev. 
GC Time 
mcpi 
1,000,000 
NA 
634.39ms 
33.22 ms 
4.0%

mcpipmap 
1,000,000 
NA 
1.92 sec 
888.52 ms 
2.60%

mcpipart 
1,000,000 
4,096 
455.94 ms 
4.19 ms 
8.75%

Here’s a chart with the same information:
How it works…
There are a couple of things we should talk about here. Primarily, we’ll need to look at chunking the inputs for pmap, but we should also discuss Monte Carlo methods.
Estimating with Monte Carlo simulations
Monte Carlo simulations work by throwing random data at a problem that is fundamentally deterministic, but when it’s practically infeasible to attempt a more straightforward solution. Calculating pi is one example of this. By randomly filling in points in a unit square, p/4 will be approximately the ratio of points that will fall within a circle centered on 0, 0. The more random points that we use, the better the approximation.
I should note that this makes a good demonstration of Monte Carlo methods, but it’s a terrible way to calculate pi. It tends to be both slower and less accurate than the other methods.
Although not good for this task, Monte Carlo methods have been used for designing heat shields, simulating pollution, ray tracing, financial option pricing, evaluating business or financial products, and many, many more things.
For a more indepth discussion, Wikipedia has a good introduction to Monte Carlo methods at http://en.wikipedia.org/wiki/Monte_Carlo_method.
Chunking data for pmap
The table we saw earlier makes it clear that partitioning helped: the partitioned version took just 72 percent of the time that the serial version did, while the naïve parallel version took more than three times longer. Based on the standard deviations, the results were also more consistent.
The speed up is because each thread is able to spend longer on each task. There is a performance penalty to spreading the work over multiple threads. Context switching (that is, switching between threads) costs time, and coordinating between threads does as well. But we expect to be able to make that time and more up by doing more things at once. However, if each task itself doesn’t take long enough, then the benefit won’t outweigh the costs. Chunking the input—and effectively creating larger individual tasks for each thread— gets around this by giving each thread more to do, and thereby spending less time context switching and coordinating, relative to the overall time spent running.