I’ve been working on a graph-layout algorithm and needed a way to test the performance of different parameterizations of the algorithm.

For performance testing, I generate a bunch of random graph layouts and then record the improvement score in the layout found by the algorithm. An example of one factor in the  improvement score is the reduction in the number of intersections in the layout, but there are other factors as well.

Since every algorithm run is completely independent of every other, and I have a multi-core machine, this seems like a good time to try out a little parallelization and/or concurrency in clojure.

Here I walk through two basic strategies for implementing the benchmarking system: using concurrency (where I look at each native clojure reference type), and one implementation that is parallel but without any shared data structures.

As a preliminary, since I don’t want to talk about graph layout algorithms right now, we will substitute a dummy function for the actual layout algorithm. This function crunches a few numbers then returns a random “score” result:

=> (defn run-algorithm []

     (do

       (reduce + (range (rand-int 1e4)))

       (rand-int 10)))

What we want our system to produce is a frequency map of improvement scores after running the algorithm on a number of random layouts.

Threadpool versus an atom

For the first implementation, we use a persistent hashmap wrapped in an atom as our frequency map. We then create a threadpool and a list of 10,000 clojure functions. Each function is a Runnable task that the threadpool will execute concurrently.

Inside each task, we run the dummy layout algorithm for a result, and then concurrently update the frequency map with the result. The update is multi-step: we first query the frequency map to determine if the result already exists. If so, we increment its value; else we assoc the result to the map:

=> (time

     (let [m (atom {})

           p (java.util.concurrent.Executors/newFixedThreadPool 8)

           t (for [_ (range 1e5)]

               (fn []

                 (let [r (run-algorithm)]

                   (swap! m (fn [a]

                              (assoc a r (inc (get a r 0))))))))]

       (do

         (.invokeAll p t)

         (reduce + (vals @m)))))

"Elapsed time: 6228.962969 msecs"

100000

This seems to work since it registers all 100000 expected updates although I am not 100% sure it’s thread-safe since I’m not completely clear on the swap! semantics. Either way it gives almost a 6x execution speed increase over the equivalent serial version listed below:

=> (time (reduce + (vals (frequencies (for [_ (range 1e5)] (run-algorithm))))))

"Elapsed time: 32401.273625 msecs"

100000

Next: agents!

Agents

The code is essentially exactly the same, we just substitute agent for atom, and send for swap!:

=> (time

     (let [m (agent {})

           p (java.util.concurrent.Executors/newFixedThreadPool 8)

           t (for [_ (range 1e5)]

               (fn []

                 (let [r (run-algorithm)]

                   (send m (fn [a]

                              (assoc a r (inc (get a r 0))))))))]

       (do

         (.invokeAll p t)

         (reduce + (vals @m)))))

"Elapsed time: 5925.71933 msecs"

100000

Seems to work and execution speed appears maybe slightly faster than the atom implementation.

Refs

Same basic scheme (with the addition of the dosync), same results:

=> (time

     (let [m (ref {})

           p (java.util.concurrent.Executors/newFixedThreadPool 8)

           t (for [_ (range 1e5)]

               (fn []

                 (let [r (run-algorithm)]

                   (dosync

                     (commute m

                       (fn [a]

                         (assoc a r (inc (get a r 0)))))))))]

       (do

         (.invokeAll p t)

         (reduce + (vals @m)))))

"Elapsed time: 6003.470274 msecs"

100000

No concurrency, just parallelism

The last implementation strategy I was curious about was eschewing concurrency altogether and using a fork-join style where we run the algorithm in parallel then compute the frequency map in a single thread. This might be faster since there is no contention on a shared data structure:

=> (time

     (let [p (java.util.concurrent.Executors/newFixedThreadPool 8)

           t (for [_ (range 1e5)] #(run-algorithm))

           r (frequencies (map #(.get %) (.invokeAll p t)))]

       (reduce + (vals r))))

"Elapsed time: 5874.390879 msecs"

100000

Interestingly, just as performant if not slightly more so than the concurrent implementations.