Implementing "cycle" transducer in Clojure

This post was originally written in a notebook. Check out Gorilla REPL if you haven't done it yet!

If you don't know yet what a transducer is, or how they work, there are two amazing talks to get started, both by Rich Hickey: Transducers and Inside Transducers. I watched those talks a couple of times each, and I moderately use transducers in my projects; thus I have a decent understanding of transducers. Or so I thought.

Yesterday my coworker and I delved into reading the already implemented transducers in clojure.core, and started pondering why some seq-operating functions don't have a transducer counterpart. Multiple-collection map? Probably missing because it doesn't fit the single-stream semantic very well. take-last and drop-last? Those two can actually be implemented as transducers by using a queue. And then… cycle? Can we do cycle?

As it turned out, my knowledge of transducers was limited. I could use them, and I got the idea behind them, but until I implemented one I lacked the perception of the underlying machinery. Today I will guide you step by step through how we implemented cycle transducer, so you might also get a better comprehension of this topic.

Lingo and basics

What always confused me about transducers before were the names of the things, and how similar they sound. Let's try to figure out what meaning each name has.

  • Reducing function (rf) should have three arities — 0, 1, and 2. The most important is 2-arity that takes a result-so-far (a.k.a. accumulator) and the current input value, and returns the new result updated with that value. What can be an rf? conj is an rf (takes a vector and an item, conjoins item to the tail). + is an rf (takes sum-so-far and a number, adds the latter to the former).
  • Transducer takes an rf, and returns a modified rf. The result should have the same three arities, but now it does something slightly different. Imagine a modified + that also increments each summand.
  • Reducing context is a process that calls the rf step-by-step over some sequential data. You can think of it as of a generic fold operation. Examples of reducing contexts are: transduce (fold data into a single value), into (fold data into a collection), c.c.async/pipeline (fold data into a channel).

Let's look at an example and try to recognize all the new things in it.

(into []
      (comp (map inc)
            (filter odd?))
      (range 10))

;; [1 3 5 7 9]

Who is who? (range 10) is obviously data — it is a sequence, and into knows how to work with sequences. into is a reducing context — it will consume data until it's exhausted, and will add the result of each step to the empty vector we provided. Hidden behind the scenes, but still present, is conj - our reducing function. It is used to add items to the vector. Now, what is this part in the middle?

To make it less confusing, let's remember what comp is. It composes multiple functions together, returning a single function in which they are called on the input parameter right-to-left outwards. Whew, that didn't help. How about this:

((comp str inc) 0) ;; "1"

(str (inc 0)) ;; "1"

Should make more sense now. What happens first inside the reducing context (inside into) is transducer ((comp ...) form we passed) is called on the reducing function (conj). Step by step:

  1. conj — the initial rf.
  2. ((filter odd?) conj) — modified rf (but still rf!)
  3. ((map inc) ((filter odd?) conj)) — even more modified rf (but still rf!)

What kind of reducing function comes out? It's an rf that increments its input, then only if the input is odd, calls conj on it. Now, let's spot all the transducers:

  1. (map inc) is a transducer.
  2. (filter odd?) is a transducer.
  3. (comp (map inc) (filter odd?)) is… wait for it… a transducer.

Finally, in all this mess, what do we call map? (map inc) returns a transducer for incrementing values, so map is a maker of transducers (if this were Java, we'd call it TransducerFactory).

Cycle transducer

The regular cycle takes a collection and returns an infinite lazy sequence where the items from the original collection are repeated in, you know, cycles. Just to refresh:

(take 10 (cycle [1 2 3]))

;; (1 2 3 1 2 3 1 2 3 1)

Good. We want to write a transducer version of cycle (call it xcycle). What should it do? Naively, it should wait until the underlying data is exhausted, and then start producing the same data over and over again. We don't know how to do it yet, but let's lay the foundation.

(defn xcycle-v1 [rf]
  (let [coll (volatile! [])]
      ([] (rf))
      ([result] (rf result))
      ([result input]
       (vswap! coll conj input)
       (rf result input)))))

(into [] xcycle-v1 [1 2 3])

;; [1 2 3]

You may have a lot of questions. I'll try to answer all of them.

  1. volatile! and vswap! are like atom and swap!, but cheaper and thread-unsafe. We don't care about thread safety here because we don't expose the value.
  2. xcycle-v1 is already a transducer — it takes a reducing function, and returns a modified reducing function. It doesn't have to be a transducer factory like map — there's nothing to be partially wrapped.
  3. Our transducer behaves like an "identity" transducer — it doesn't seem to do anything. All it does right now is it silently remembers all the inputs that come through it and waits until they are over. But when will be able to use that?

To answer the last question, we need to understand what exactly a reducing context does. Again, step-wise:

  1. Take a transducer and call it on the initial rf, returning the modified rf (rrf).
  2. Call rrf with zero arguments to get the initial value. E.g. (+) => 0. Transducers might modify this aspect of the reducing function, but they usually don't. Aha! That's what the 0-arity ([] (rf)) does! It falls through to the wrapped rf, and on the bottom, the initial rf (like +) will know how to produce the initial value. Neat.
  3. While there are elements in the input collection, take them one by one, and call (rrf result input), where result is the outcome of the previous call.
  4. When the input collection is over, one last call (rrf result) is performed. This is a way of telling the transducers "We are done! If you have anything to add, do it now, or forever hold your peace!"

So, it appears that step 4 is our chance to unleash our cycle semantics. When our 1-arity is called, we know that there will be no more inputs, which means we can start from the beginning.

(defn xcycle-v2 [rf]
  (let [coll (volatile! [])
        add? (volatile! true)]
    (fn self
      ([] (rf))
       (vswap! add? (constantly false))
       (self (reduce self result @coll)))
      ([result input]
       (when @add?
         (vswap! coll conj input))
       (rf result input)))))

(into [] xcycle-v2 [1 2 3])

;; StackOverflowError user/xcycle-v2/self--7991

Let's pretend for the moment that we don't see glaring non-terminating recursion and StackOverflowError, and try to understand what has changed.

  1. The resulting reducing function that we return got a name self.
  2. We added another piece of state add?. It tells if the items should be added to the coll, or if we are past the first cycle and have all of them already.
  3. Now in 1-arity, instead of sheepishly doing nothing, we do something interesting — we use reduce to go over the elements we accumulated during the "natural" phase, and call self on them again together with the currently accumulated result. Finally, when that is over, we call 1-arity self on it, thus creating a never-ending recursion (or, you know, a cycle).

This process can be depicted as a dialog between the Reducing Context and the Cycle. RC is flying the airplane, and Cycle is a passenger:

RC: Alright, all systems go, we are on the course towards the end of the input sequence. Calling rrf on the result and inputs as usual. This Cycle guy doesn't seem to do anything - whatever. Reaching the end of the input sequence in 3… 2… 1…
RC: …And we are done. Dear transducers, this is the final result. Quickly do to it what you have to, then pack your shit and leave.
Cycle: Interesting… So, you say we are done? I'd rather stay…
RC: What the hell? Get out!
Cycle: I've been watching what you were doing. Shuffles into pilot's seat. This seems easy. Mumbles. Call two-arity on the inputs, then finish with one-arity call… Pulls and pushes levers haphazardly. The airplane is back in the air.
RC: We are all gonna die!

Now, in all seriousness, how do we prevent the error? We'd want something like this to work:

(into [] (comp xcycle-v2 (take 10)) [1 2 3])

;; StackOverflowError clojure.core/deref (core.clj:2206)

Which means - cycle through the input collection, but only take the first 10 elements. But this doesn't work yet because we never terminate our recursion — we treat the ending of the input data as a reason to start another cycle. What we need is a way for take to tell us "I had enough!".

Transducers support this functionality through a wrapper called (reduced). By returning a reduced result transducers like take can tell the outer context that they consumed just enough data, and no more work should be done. With this in mind, we write our final version of xcycle:

(defn xcycle [rf]
  (let [coll (volatile! [])
        add? (volatile! true)
        enough? (volatile! false)]
    (fn self
      ([] (rf))
       (vswap! add? (constantly false))
       (if @enough?
         (rf result)
         (self (reduce self result @coll))))
      ([result input]
       (when @add?
         (vswap! coll conj input))
       (let [res (rf result input)]
         (when (reduced? res)
           (vswap! enough? (constantly true)))

A couple of changes here:

  1. In two-arity we now track what the underlying rf returned us. If it's something that is reduced?, it means the underlying transducer said it had enough, and we should set the corresponding flag.
  2. In one-arity we know only go into recursion if enough? is not true yet. Otherwise, we finally land that airplane and stand up from the pilot's seat.

The moment of truth…

(into [] (comp xcycle (take 10)) [1 2 3])

;; [1 2 3 1 2 3 1 2 3 1]

It works! Our cycle transducer can even be composed several times, as long as there's someone to stop it underneath:

(into [] (comp xcycle
               (take 5)
               (take 20))
      [1 2 3])

;; [1 2 3 1 2 1 2 3 1 2 1 2 3 1 2 1 2 3 1 2]

Can you figure out what just happened?


Does anyone need a cycle transducer? Nope. But I immensely enjoyed implementing it by trial and error, and now I can appreciate the thought and effort Clojure team has put into transducers. I also like the cycle transducer for its peculiar nature: when the outer context stops providing this transducer with inputs, this is the moment when it starts going; but it stops when the underlying context asks it to. Think of it: cycle disregards all authority, but succumbs to the will of its subordinates. Isn't it a manager we all dream of?

This post was initially planned to be short, but I figured that repeating the same stuff again never hurt anyone. It worked for me, I hope some readers will also find this piece helpful. See you around.