you're reading...

Multi-threaded Parallel Codes in Clojure

Material in this post taken from the following links:

  • http://clojure-doc.org/articles/language/concurrency_and_parallelism.html
  • http://www.braveclojure.com/concurrency/
  • http://blog.factual.com/author/aaron

  • Introduction and Terminology

    Before we get to the Clojure features related to concurrency, lets lay a foundation and briefly cover some terminology.

    Term Definition This Guide Uses
    Concurrency When multiple threads are making progress, whether it is via time-slicing or parallelism
    Parallelism A condition that arises when at least two threads are executing simultaneously, e.g., on multiple cores or CPUs.
    Shared State When multiple threads of execution need to mutate (modify) one or more pieces of program state (e.g., variables, identities)
    Mutable Data Structures Data structures that, when changed, are updated “in place”
    Immutable Data Structures Data structures that, when changed, produce new data structures (copies), possibly with optimizations such as internal structural sharing
    Concurrency Hazards Conditions that occur in concurrent programs that prevent program from being correct (behaving the way its authors intended).
    Shared Mutable State When shared state is made of mutable data structures. A ripe ground for concurrency hazards.

    There are many concurrency hazards, some of the most common and well known are:

    Concurrency Hazard Brief Description
    Race Condition

    Data Race

    A condition where the correctness of the outcome is dependent on timing or relative ordering of events.

    A Data Race is where at least two threads concurrently access a data memory location, and at least one is a write access.

    Deadlock When two or more threads are waiting on each other to finish or release a shared resource, thus waiting forever and not making any progress
    Livelock When two or more threads are technically performing computation but not doing any useful work (not making progress), for example, because they endlessly pass a piece of data to each other but never actually process it
    Starvation When a thread is not given regular access to a shared resource and cannot make progress.

    Runtime Parallelism

    Clojure was designed to be a hosted language. Its primary target, the JVM, provides runtime parallelism support. JVM threads map 1:1 to kernel threads. Those will be executed in parallel given that enough cores are available for OS scheduler to use.

    In Clojure, many concurrency features are built on top of JVM threads and thus benefit from runtime parallelism if the program is running on a multi-core machine.

    Here we will look comparing the performance of the Clojure “map” versus “pmap” (parallel map) functions on multicore machines. (My local machine is dual core, and we look at OSC machines that have up to 12 cores per node.)

    It’s very common to have a collection of things, and need to transform it into a collection of different things. Clojure’s map function applies a given function to each element in a collection and returns a new collection with the results:

    map – clojure.core

    (map f) (map f coll) (map f c1 c2) (map f c1 c2 c3) (map f c1 c2 c3 & colls)

    Returns a lazy sequence consisting of the result of applying f to the set of first items of each coll, followed by applying f to set of second items in each coll, until any one of the colls is exhausted. Any remaining items in other colls are ignored. Function f should accept number-of-colls arguments.

    (map inc [1 2 3 4 5])
    ;; (2 3 4 5 6)
    ;; map can be used with multiple collections.
    (map + [1 2 3] [4 5 6])
    ;; (5 7 9)
    ;;A simple example using map
    ;;Get a vector of squares of some integers using an anonymous function...
    (map #(* % %) [0 1 2 3 4 5 6] ) 
    ;; (0 1 4 9 16 25 36 )

    Let’s break down this example, working from the inside out. Our second and last argument to map is the collection we want to transform — a handful of integers.

    The first argument to map is the function we want applied across our collection of integers. In this case that’s an anonymous function that takes a value and produces the square of that value. So we end up with a new collection: the squares of the items from the original collection.

    Imperative pseudo-code to illustrate what map is doing:

    // inputs are f and coll
    new_coll = new Collection
    for (each item i in coll) {  new_coll.add( f(i) ) }
    return new_coll

    Summing a Vector of Numbers Using Reduce

    In Clojure, to sum a list or vector of numbers we’d use the reduce function, and do this:

    (defn my-sum[nums](reduce+ nums))

    And actually, since the Clojure code to do the summing is so short, you probably wouldn’t even define it as a named function. Instead, when you need to do it, you’d just write:

    (reduce + nums)

    That’s so concise, it seems like cheating. In a way it is cheating. Lispy, functional languages like Clojure natively support this very common concept. This often means that common tasks can be done very concisely in Clojure, compared to an imperative language like Java.

    There are two versions of reduce; the simple version used above is like this:

    (reduce f coll)

    This simple version of reduce applies a function f to an accumulator value and each element of the collection coll. The function f is expected to take 2 arguments. reduce supplies these arguments on each iteration: it plugs in the accumulator (“the running total”) as the first argument, and it plugs in the next item in the iteration as the second argument. The accumulator starts with the first value of the collection, and the iteration starts on the next item.

    A more flexible version of reduce allows you to specify the initial accumulator value, val:

    (reduce f val coll)

    The accumulator value could be anything you design it to be, such as a number, another collection, or any arbitrary data structure.

    Another way to understand reduce is to consider this imperative pseudo code:

    iterator = coll.iterator()
    if (val is specified?) {
    accum = val }
    else {
    accum = iterator.next() }
    while(iterator.hasNext()) {
        accum = f(accum, iterator.next()) }
    return accum

    In the Clojure summation example, our function f is simply the + function. (Note that the + function can naturally take two arguments.) We didn’t make use of the optional val argument to reduce. However, it’s worthwhile to note that the optional val adds important flexibility. It can be very handy to supply an explicit custom starting value, e.g. a pre-populated collection or an empty hash-map.

    A histogram example using reduce

    Here is a commonly use Clojure idiom for producing a dictionary of word counts.

    (reduce #(assoc %1 %2 (inc (get %1 %2 0))) {} words)
    ;; an example application for downloaded text of Declaration of Independence
    (require '[clojure.string :as str])
    (def doi-words (str/split (slurp "http://www.constitution.org/usdeclar.txt") #"\s+")
    (reduce #(assoc %1 %2 (inc (get %1 %2 0))) {} doi-words)

    Let’s see how this use of reduce will build a word histogram. Define a function called counts that takes a collection of words and builds a hash-map where each key is a unique word, and each value is the count of how many times that word appears in the collection:

    (defn counts [words]  
            (fn [map-so-far a-word]    
                  (assoc map-so-far a-word (inc (get map-so-far a-word 0))))  
              {}     words))

    Our call to reduce runs across words, and it’s initial accumulator is an empty hash-map (the {}). The core work of our function is our first argument to reduce the following anonymous function…

    (fn [map-so-far a-word] (assoc map-so-far a-word (inc (get map-so-far word 0))))

    …which contains the logic to populate the hash-map. As reduce runs over a collection and passes in arguments to f, the first argument is the “running total” (in our case, map-so-far), and the second argument is the next item in the collection (in our case, word). assoc is the Clojure way to take a hash-map and assign a value to a key in that hash-map (like Java’s Map.put()). So (assoc map-so-far word …) is saying, “in the current hash-map of counts, associate this word with X”. In our case, X is obtained by the call to inc.

    The inc call is what implements the word counting logic. Here, get is like Java’s Map.get, except you can supply an optional argument to indicate what to return should the requested key not exist. So our call to get says, “return the associated value of this word in the current hash-map, or 0 if the word isn’t in the hash-map. Our inc call simply increments the value returned from get. Stringed together, we get the exact logic we need to keep running word counts.
    Now, if we wanted to we could shorten this up by using the % notation to avoid needing to name our arguments to the anonymous function. And there we have it:

    (defn counts [words] (reduce #(assoc %1 %2 (inc (get %1 %2 0))) {} words))

    PMAP: Parallel Map
    Let us turn now to a parallel version of the map function.

    pmap  – clojure.core
    (pmap f coll) (pmap f coll & colls)Like map, except f is applied in parallel. Semi-lazy in that the parallel computation stays ahead of the consumption, but doesn’t realize the entire result unless required. Only useful for computationally intensive functions where the time of f dominates the coordination overhead.

    ;; PMAP
    (defn long-running-job [n]
      (Thread/sleep 3000)                   ; wait for 3 seconds
      (+ n 10))
    (time (doall (map long-running-job (range 20))))
    ;; Elapsed time: 60053.862705 msecs
    (10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29)
    (time (doall (pmap long-running-job (range 20))))
    ;;Elapsed time: 3009.768988 msecs
    (10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29)
    ;; This is a false test for speedup, since sleep function apparently allows 
    ;; and arbitrary number of threads to rest simultaneously.
    ;; So let us define a collection active jobs using dorun.
    ;; dorun, doall, and doseq are all used for forcing lazy sequences, presumably to get side effects.
    ;; dorun - don't hold whole seq in memory while forcing, return nil
    ;; doall - hold whole seq in memory while forcing (i.e., all of it) and return the seq
    ;; doseq - same as dorun, but gives you chance to do something with each element as it's forced; returns nil
    (defn long-running-job2 [n]
      (dorun  (for [x (range 1000) y (range 1000)] (* x y))))
    (time (doall (map long-running-job2 (range 20))))
    ;; Elapsed time: 6489.148202 msecs
    ;;(nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil)
    (time (doall (pmap long-running-job2 (range 20))))
    ;;Elapsed time: 3378.788034 msecs
    ;;(nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil nil)
    ;; The experiment above shows pmap gives approximate 2x speedup.
    ;;String processing 
    (def alphabet-length 26)
    ;; vector of chars, A-Z
    (def letters (mapv (comp str char (partial + 65)) (range alphabet-length)))
    (defn random-string
    ;; returns a random string of specified length
      (apply str (take length (repeatedly #(rand-nth letters)))))
    (defn random-string-list
      [list-length string-length]
      (doall (take list-length (repeatedly (partial random-string string-length)))))
    (def orc-names (random-string-list 40 70000))
    ;; Use `dorun` to realize the lazy seq returned by map without
    ;; printing the results in the REPL
    (time (dorun (map clojure.string/lower-case orc-names)))
    (time (dorun (pmap clojure.string/lower-case orc-names)))
    ;;Partitioning for grain size
    (def numbers [1 2 3 4 5 6 7 8 9 10])
    (partition-all 3 numbers)
    (pmap inc numbers)  ; grain size 1
    (pmap (fn [number-group] (doall (map inc number-group)))
          (partition-all 3 numbers))
    (apply concat
           (pmap (fn [number-group] (doall (map inc number-group)))
                 (partition-all 3 numbers)))
    (def orc-name-abbrevs (random-string-list 200000 300))
    (time (dorun (map clojure.string/lower-case orc-name-abbrevs)))
      (apply concat
             (pmap (fn [name] (doall (map clojure.string/lower-case name)))
                   (partition-all 100000 orc-name-abbrevs)))))

    Experiments with Code Running on oakley.osc.edu

    $ ssh oakley.osc.edu -l ucn1386
    -bash-4.1$ qsub -I -l nodes=1:ppn=12 -l walltime=00:20:00
    qsub: waiting for job 4496542.oak-batch.osc.edu to startqsub: job 4496542.oak-batch.osc.edu ready
    -bash-4.1$ ../lein new wonderland
    -bash-4.1$ cd wonderland/
    -bash-4.1$ ../lein repl
    nREPL server started on port 58060 on host - nrepl://
    user=> (defn long-running-job2 [n]
              (dorun (for [x (range 1000) y (range 1000)] (* x y))))
    user=> (time (doall (map long-running-job2 (range 20))))
    "Elapsed time: 2939.689332 msecs"
    user=> (time (doall (pmap long-running-job2 (range 20))))
    "Elapsed time: 769.314443 msecs"
    user=> (def orc-names (random-string-list 40 700000))
    user=> (time (dorun (map clojure.string/lower-case orc-names)))
    "Elapsed time: 501.401414 msecs"
    user=> (time (dorun (pmap clojure.string/lower-case orc-names)))
    "Elapsed time: 80.57307 msecs"
    user=> (def orc-name-abbrevs (random-string-list 1200000 300))
    user=> (time (dorun (map clojure.string/lower-case orc-name-abbrevs)))
    "Elapsed time: 5349.481361 msecs"
    user=> (time (dorun (pmap clojure.string/lower-case orc-name-abbrevs)))
    "Elapsed time: 2831.096441 msecs"
    user=> (time (dorun (apply concat
                  (pmap (fn [name] (doall (map clojure.string/lower-case name)))
                                  (partition-all 100000 orc-name-abbrevs)))))
    "Elapsed time: 1064.08318 msecs"
    # Another Experiment
    user=> (def orc-names (random-string-list 100000 2))
    user=> (time (reduce #(assoc %1 %2 (inc (get %1 %2 0))) {} orc-names))
    "Elapsed time: 181.191269 msecs"
    user=> (defn mysort [x] (into (sorted-map-by (fn [key1 key2](compare [(get x key2) key2] [(get x key1) key1]))) x))
    user=> (mysort (r/reduce #(assoc %1 %2 (inc (get %1 %2 0))) {} orc-names))
    {"IX" 192, "BA" 192, "XH" 183, "PH" 180, "NB" 180, "SE" 179, "KN" 178, "BL" 178, "ZJ" 177, "WJ" 175, "CP" 174, "PG" 173, "LD" 173, "EI" 173, "BD" 173, "AR" 173, "SW" 172, "JF" 172, "JE" 172, "GX" 172,


No comments yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: