If you don't know yet what a transducer is, or how they work, there are two amazing talks to get started, both by Rich Hickey: Transducers and Inside Transducers. I watched those talks a couple of times each, and I moderately use transducers in my projects; thus I have a decent understanding of transducers. Or so I thought.
Yesterday my coworker and I delved into reading the already implemented
transducers in clojure.core, and started pondering why some seq-operating
functions don't have a transducer counterpart. Multiple-collection
Probably missing because it doesn't fit the single-stream semantic very well.
drop-last? Those two can actually be implemented as
transducers by using a queue. And then…
cycle? Can we do cycle?
As it turned out, my knowledge of transducers was limited. I could use them, and
I got the idea behind them, but until I implemented one I lacked the perception
of the underlying machinery. Today I will guide you step by step through how we
cycle transducer, so you might also get a better comprehension of
Lingo and basics
What always confused me about transducers before were the names of the things, and how similar they sound. Let's try to figure out what meaning each name has.
- Reducing function (rf) should have three arities — 0, 1, and 2. The most
important is 2-arity that takes a result-so-far (a.k.a. accumulator) and the
current input value, and returns the new result updated with that value. What
can be an rf?
conjis an rf (takes a vector and an item, conjoins item to the tail).
+is an rf (takes sum-so-far and a number, adds the latter to the former).
- Transducer takes an rf, and returns a modified rf. The result should have
the same three arities, but now it does something slightly different. Imagine
+that also increments each summand.
- Reducing context is a process that calls the rf step-by-step over some
sequential data. You can think of it as of a generic fold operation. Examples
of reducing contexts are:
transduce(fold data into a single value),
into(fold data into a collection),
c.c.async/pipeline(fold data into a channel).
Let's look at an example and try to recognize all the new things in it.
(into  (comp (map inc) (filter odd?)) (range 10)) ;; [1 3 5 7 9]
Who is who?
(range 10) is obviously data — it is a sequence, and
knows how to work with sequences.
into is a reducing context — it will
consume data until it's exhausted, and will add the result of each step to the
empty vector we provided. Hidden behind the scenes, but still present, is
conj - our reducing function. It is used to add items to the vector. Now,
what is this part in the middle?
To make it less confusing, let's remember what
comp is. It composes multiple
functions together, returning a single function in which they are called on the
input parameter right-to-left outwards. Whew, that didn't help. How about this:
((comp str inc) 0) ;; "1" (str (inc 0)) ;; "1"
Should make more sense now. What happens first inside the reducing context
into) is transducer (
(comp ...) form we passed) is called on the
reducing function (
conj). Step by step:
conj— the initial rf.
((filter odd?) conj)— modified rf (but still rf!)
((map inc) ((filter odd?) conj))— even more modified rf (but still rf!)
What kind of reducing function comes out? It's an rf that increments its input,
then only if the input is odd, calls
conj on it. Now, let's spot all the
(map inc)is a transducer.
(filter odd?)is a transducer.
(comp (map inc) (filter odd?))is… wait for it… a transducer.
Finally, in all this mess, what do we call
(map inc) returns a
transducer for incrementing values, so
map is a maker of transducers (if this
were Java, we'd call it TransducerFactory).
cycle takes a collection and returns an infinite lazy sequence
where the items from the original collection are repeated in, you know, cycles.
Just to refresh:
(take 10 (cycle [1 2 3])) ;; (1 2 3 1 2 3 1 2 3 1)
Good. We want to write a transducer version of cycle (call it xcycle). What should it do? Naively, it should wait until the underlying data is exhausted, and then start producing the same data over and over again. We don't know how to do it yet, but let's lay the foundation.
(defn xcycle-v1 [rf] (let [coll (volatile! )] (fn ( (rf)) ([result] (rf result)) ([result input] (vswap! coll conj input) (rf result input))))) (into  xcycle-v1 [1 2 3]) ;; [1 2 3]
You may have a lot of questions. I'll try to answer all of them.
swap!, but cheaper and thread-unsafe. We don't care about thread safety here because we don't expose the value.
xcycle-v1is already a transducer — it takes a reducing function, and returns a modified reducing function. It doesn't have to be a transducer factory like
map— there's nothing to be partially wrapped.
- Our transducer behaves like an "identity" transducer — it doesn't seem to do anything. All it does right now is it silently remembers all the inputs that come through it and waits until they are over. But when will be able to use that?
To answer the last question, we need to understand what exactly a reducing context does. Again, step-wise:
- Take a transducer and call it on the initial rf, returning the modified rf (rrf).
- Call rrf with zero arguments to get the initial value. E.g.
(+) => 0. Transducers might modify this aspect of the reducing function, but they usually don't. Aha! That's what the 0-arity
( (rf))does! It falls through to the wrapped rf, and on the bottom, the initial rf (like
+) will know how to produce the initial value. Neat.
- While there are elements in the input collection, take them one by one, and
(rrf result input), where
resultis the outcome of the previous call.
- When the input collection is over, one last call
(rrf result)is performed. This is a way of telling the transducers "We are done! If you have anything to add, do it now, or forever hold your peace!"
So, it appears that step 4 is our chance to unleash our
cycle semantics. When
our 1-arity is called, we know that there will be no more inputs, which means we
can start from the beginning.
(defn xcycle-v2 [rf] (let [coll (volatile! ) add? (volatile! true)] (fn self ( (rf)) ([result] (vswap! add? (constantly false)) (self (reduce self result @coll))) ([result input] (when @add? (vswap! coll conj input)) (rf result input))))) (into  xcycle-v2 [1 2 3]) ;; StackOverflowError user/xcycle-v2/self--7991
Let's pretend for the moment that we don't see glaring non-terminating recursion and StackOverflowError, and try to understand what has changed.
- The resulting reducing function that we return got a name
- We added another piece of state
add?. It tells if the items should be added to the
coll, or if we are past the first cycle and have all of them already.
- Now in 1-arity, instead of sheepishly doing nothing, we do something
interesting — we use
reduceto go over the elements we accumulated during the "natural" phase, and call self on them again together with the currently accumulated result. Finally, when that is over, we call 1-arity
selfon it, thus creating a never-ending recursion (or, you know, a cycle).
This process can be depicted as a dialog between the Reducing Context and the Cycle. RC is flying the airplane, and Cycle is a passenger:
RC: Alright, all systems go, we are on the course towards the end of the input
sequence. Calling rrf on the result and inputs as usual. This Cycle guy doesn't
seem to do anything - whatever. Reaching the end of the input sequence in 3…
RC: …And we are done. Dear transducers, this is the final result. Quickly do to it what you have to, then pack your shit and leave.
Cycle: Interesting… So, you say we are done? I'd rather stay…
RC: What the hell? Get out!
Cycle: I've been watching what you were doing. Shuffles into pilot's seat. This seems easy. Mumbles. Call two-arity on the inputs, then finish with one-arity call… Pulls and pushes levers haphazardly. The airplane is back in the air.
RC: We are all gonna die!
Now, in all seriousness, how do we prevent the error? We'd want something like this to work:
(into  (comp xcycle-v2 (take 10)) [1 2 3]) ;; StackOverflowError clojure.core/deref (core.clj:2206)
Which means - cycle through the input collection, but only take the first 10
elements. But this doesn't work yet because we never terminate our recursion —
we treat the ending of the input data as a reason to start another cycle. What
we need is a way for
take to tell us "I had enough!".
Transducers support this functionality through a wrapper called
returning a reduced result transducers like
take can tell the outer context
that they consumed just enough data, and no more work should be done. With this
in mind, we write our final version of
(defn xcycle [rf] (let [coll (volatile! ) add? (volatile! true) enough? (volatile! false)] (fn self ( (rf)) ([result] (vswap! add? (constantly false)) (if @enough? (rf result) (self (reduce self result @coll)))) ([result input] (when @add? (vswap! coll conj input)) (let [res (rf result input)] (when (reduced? res) (vswap! enough? (constantly true))) res)))))
A couple of changes here:
- In two-arity we now track what the underlying rf returned us. If it's
something that is
reduced?, it means the underlying transducer said it had enough, and we should set the corresponding flag.
- In one-arity we know only go into recursion if
enough?is not true yet. Otherwise, we finally land that airplane and stand up from the pilot's seat.
The moment of truth…
(into  (comp xcycle (take 10)) [1 2 3]) ;; [1 2 3 1 2 3 1 2 3 1]
It works! Our cycle transducer can even be composed several times, as long as there's someone to stop it underneath:
(into  (comp xcycle (take 5) xcycle (take 20)) [1 2 3]) ;; [1 2 3 1 2 1 2 3 1 2 1 2 3 1 2 1 2 3 1 2]
Can you figure out what just happened?
Does anyone need a cycle transducer? Nope. But I immensely enjoyed implementing it by trial and error, and now I can appreciate the thought and effort Clojure team has put into transducers. I also like the cycle transducer for its peculiar nature: when the outer context stops providing this transducer with inputs, this is the moment when it starts going; but it stops when the underlying context asks it to. Think of it: cycle disregards all authority, but succumbs to the will of its subordinates. Isn't it a manager we all dream of?
This post was initially planned to be short, but I figured that repeating the same stuff again never hurt anyone. It worked for me, I hope some readers will also find this piece helpful. See you around.