Imagine the following setup: service A serves a stream of data items by exposing the latest batch of these items as an HTTP endpoint. Application B polls service A every now and then, fetches the recent batch and puts the unwrapped items onto a core.async channel. The consumer(s) on the other side of the channel may come and go. The nature of the data is also such that it is droppable (not every item has to be processed) and becomes stale over time.
When implementing this task, I had two conflicting requirements for the channel:
- Backpressure is preferable. If nobody is reading from the channel (or reading slowly), it doesn't make sense to continuously poll A and fetch new data as fast as we can. This sounds like a job for a blocking channel backed by a fixed-size buffer. When the buffer is full, we stop getting new data from A and wait for consumers to start depleting the buffer.
- Newer data is better than older data. If nobody was consuming the channel for some time, and then a consumer returns, we don't want it to sift through all the stale values in the buffer until the new data starts arriving from A again.
To meet both criteria at once, I devised a special kind of channel called a leaky channel. The main idea behind it is that it is a blocking channel with a finite buffer that is steadily leaking values from the head, at the specified rate. This allows us to customize between how often the needless responses are sent, and how old the data in the buffer can become.
Without further ado, here's the initial implementation:
(require '[clojure.core.async :as a]) (defn leaky-chan [buffer-size leakage-ratio-per-sec] (let [c (a/chan (a/buffer buffer-size)) leakage-n (int (* buffer-size leakage-ratio-per-sec))] (a/go-loop [closed? false] (when-not closed? (a/<! (a/timeout 1000)) (recur (loop [i leakage-n] (when (pos? i) (a/alt! c ([v] (if v (recur (dec i)) ; Value dropped, go drop another. true ; nil returned instantly = channel closed )) :default false ; take blocked = channel empty, continue. :priority true)))))) c))
The code is quite straightforward. We create a channel with a fixed-size buffer. The second argument to the function is the ratio (0.0–1.0) of buffer size that has to be discarded each second. From that, we calculate the actual number of items to be dropped. In the end, we return the channel. The tricky part is the goroutine that in a loop waits for a second and then tries to drop N items from the channel. Let's examine it more thoroughly.
The central piece of logic is in the alt! call. We try to take a value from
the channel, non-blockingly. It is ensured by the
:default value and
:priority true option. If
alt! goes through the first path, and value is
non-nil — it means that we dropped a value. If the value is
nil, it means
that the channel is closed, and we prematurely stop the inner loop and return
true to the outer
recur, thus stopping the whole goroutine. This way, the
leaking goroutine won't run forever but will end when the channel closes.
:default path is chosen, it means that the take has blocked,
ergo the channel is already empty — try again in the next second.
It might seem that the scary
alt! construct can be replaced by poll! here.
poll! is a non-blocking take that immediately returns the value, or
it couldn't get one. The problem is,
poll! doesn't distinguish between a
closed channel and an empty channel. If we used
poll!, we wouldn't be able
to control the leaking goroutine by closing the channel.
Alright, let's try our new channel out:
(def c (leaky-chan 100 0.01)) ; Make a channel with buffer 100 which discards 1% ; of the buffer size per second. 100 * 0.01 = 1 ; value dropped per second. (dotimes [i 100] (a/>!! c i)) ; Put 100 values onto the channel. Values begin to drip. (repeatedly 10 #(a/<!! c)) ; Take 10 values from the channel. => (2 3 4 5 6 7 8 9 10 11) ; Apparently, the first two values have leaked out. (repeatedly 10 #(a/<!! c)) ; Take another 10 values. => (17 18 19 20 21 22 23 24 25 26) ; While we were messing around, even more values slipped away. (dotimes [i 100] (a/>!! c i)) ; Put another 100 values onto the channel. This ; call will block until the whole initial batch ; leaks out. Backpressure works.
Two improvements could be made to this implementation (but are outside the scope of this post). First, instead of dropping [possibly] large chunks of data each second, we could intelligently change the delay between leaks based on the leak ratio. The second improvement would be to dynamically control the leak ratio based on the activity of the consumers. For example, we could internally tag each value with a timestamp, and then define leakage as the maximum time that a task may stay in the buffer till it's dropped.
This is it. I'm sure that my case is very contrived, so such leaky channels are very unlikely to be usable anywhere else. However, I enjoyed solving this task and took it as an opportunity to appreciate core.async even more. Cheers!