you're reading...

Parallel Reducers in Clojure



Available since 1.0 (source)
(reduce f coll)(reduce f val coll)
f should be a function of 2 arguments. If val is not supplied,
returns the result of applying f to the first 2 items in coll, then
applying f to that result and the 3rd item, etc. If coll contains no
items, f must accept no arguments as well, and reduce returns the
result of calling f with no arguments. If coll has only 1 item, it
is returned and f is not called. If val is supplied, returns the
result of applying f to val and the first item in coll, then
applying f to that result and the 2nd item, etc. If coll contains no
items, returns val and f is not called.

(reduce + [1 2 3 4 5]) ;;=> 15
(reduce + []) ;;=> 0
(reduce + 1 []) ;;=> 1
(reduce + 1 [2 3]) ;;=> 6

;; converting a vector to a set
(reduce conj #{} [:a :b :c])
;;=> #{:a :c :b}
;; Create a word frequency map out of a large string s.
;; `s` is a long string containing a lot of words 🙂
(reduce #(assoc %1 %2 (inc (%1 %2 0)))
(re-seq #”\w+” s))

; (This can also be done using the `frequencies` function.)


Reducers provide an alternative approach to using sequences to manipulate standard Clojure collections. Sequence functions are typically applied lazily, in order, create intermediate results, and in a single thread. However, many sequence functions (like map and filter) could conceptually be applied in parallel, yielding code that will get faster automatically as machines get more cores. For more details on the rationale for reducers, see the original blogposts.

A reducer is the combination of a reducible collection (a collection that knows how to reduce itself) with a reducing function (the “recipe” for what needs to be done during the reduction). The standard sequence operations are replaced with new versions that do not perform the operation but merely transform the reducing function. Execution of the operations is deferred until the final reduction is performed. This removes the intermediate results and lazy evaluation seen with sequences.

Additionally, some collections (persistent vectors and maps) are foldable. The fold operation on a reducer executes the reduction in parallel by:

  1. Partitioning the reducible collection at a specified granularity (default = 512 elements)
  2. Applying reduce to each partition
  3. Recursively combining each partition using Java’s fork/join framework.

If a collection does not support folding, it will fall back to non-parallel reduce instead.

reduce and fold

The clojure.core.reducers namespace (aliased here as r) provides an alternate r/reduce function.

(r/reduce f coll)
(r/reduce f init coll)

The reducers version differs in that:
  • Map colls are reduced with reduce-kv
  • When init is not provided, f is invoked with no arguments to produce an identity value
    • Note: f may be invoked multiple times to provide the identity value

In general most users will not call r/reduce directly and instead should prefer r/fold, which implements parallel reduce and combine. However, it may be useful to execute an eager reduce with fewer intermediate results.

(r/fold reducef coll)
(r/fold combinef reducef coll)
(r/fold n combinef reducef coll)

r/fold takes a reducible collection and partitions it into groups of approximately n (default 512) elements. Each group is reduced using the reducef function. The reducef function will be called with no arguments to produce an identity value in each partition. The results of those reductions are then reduced with the combinef (defaults to reducef) function. When called with no arguments, (combinef) must produce its identity element – this will be called multiple times. Operations may be performed in parallel. Results will preserve order.

The following functions (analagous to the sequence versions) create reducers from a reducible or foldable collection: r/map r/mapcat r/filter r/remover/flatten r/take-while r/take and r/drop. None of these functions actually transforms the source collection. To produce an accumulated result, you must use r/reduce or r/fold. To produce an output collection, use clojure.core/into to choose the collection type or the provided r/foldcat to produce a collection that is reducible, foldable, seqable, and counted.

(def numbers (vec (range 10000000)))
(time (reduce + numbers))
"Elapsed time: 1285.164011 msecs"
user=> (time (r/fold + numbers))
"Elapsed time: 97.693202 msecs"

(time (reduce + (filter odd? (map inc numbers))))
"Elapsed time: 1630.899458 msecs"
(time (r/fold  + (r/filter odd? (r/map inc numbers))))
"Elapsed time: 186.142905 msecs"

;; A speedup of 9x

Simple Examples:



No comments yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: