Contents

Define a reusable and lazy pipeline

The problem

I have the idea of a pipeline: it is defined once, it can be reused and when called, it changes its state but does not evaluate eagerly, because it could be “fed” from an infinite source of data.

For example:

  • all integers which are multiple of 13 and has in its binary representation more ones than zeroes
  • indices of bytes read from /dev/random which are ascii characters

Perhaps there are two separate issues I’m thinking about here:

  1. Is the pipeline creating intermediate collections when processing items through the pipeline? For example, in Scala, all intermediate operations do create a collection (each map and filter)
  2. when exactly is the pipeline execution triggered?

In scala

Note: see thisinsightful comment on SO: https://stackoverflow.com/questions/31664918/does-scala-has-intermediate-terminal-ops-as-java8-has

By default intermediate operations are done eagerly, unless you use .iterator.

1
2
3
4
5
6
def report[T](v: T): T = {
  println(s"Reporting $v");
  v
}

val l = List(1, 2, 5, 10, 34, 78, 126)

Now you can either trigger the traversal or create an iterator which will evaluate/traverse one by one:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
scala> l.map(report).filter(_ < 50)
Reporting 1
Reporting 2
Reporting 5
Reporting 10
Reporting 34
Reporting 78
Reporting 126
val res14: List[Int] = List(1, 2, 5, 10, 34)

scala> l.iterator.map(report).filter(_ < 50)
Reporting 1
val res15: Iterator[Int] = non-empty iterator

Note how the first value get through the map, even if we haven’t explicitely trigerred it. However, the first element is not actually missing, we will get it eventually when we do next or use other terminal operations causing traversal and evaluation of pipeline operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
scala> val it = l.iterator.map(report).filter(_ < 50)
Reporting 1
val it: Iterator[Int] = non-empty iterator

scala> it.next
val res23: Int = 1

scala> for { i <- it } println(s"This is element $i")
Reporting 2
This is element 2
Reporting 5
This is element 5
Reporting 10
This is element 10
Reporting 34
This is element 34
Reporting 78
Reporting 126

The iterator approach allows to build a pipeline and evaluate elements when needed. However, we are not able to reuse the pipeline: once the iterator is exhausted, we’re done. We need to create a new iterator, repeating the pipeline code. It seems the pipeline needs to have a collection to work on attached to it. Or, in other words, there is no such thing as a pipeline here, I guess.

Reusability

According to the SO answer, if we want to reuse the pipeline, we can use a view. I’m not sure if we would really reuse anything. Let’s check. First, I will change the effecting function report so that it doubles the value when called:

1
def reportDouble(v: Int): Int = { println(s"Reporting $v -> ${v*2}"); 2 * v }

Then, I will try to create a pipeline and reuse it by filtering values twice.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
scala> val xl = l.map(reportDouble)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
val xl: List[Int] = List(2, 4, 10, 20, 68, 156, 252)

scala> xl.filter(_ < 5).foreach(println)
2
4

scala> xl.filter(_ > 15).foreach(println)
20
68
156
252

For a list, it evaluates eagerly, traverses the list and runs pipeline once - it (implicitly) creates a list (List is a Functor) and applies the pipeline once; and all subsequent filters/maps are done on the resulting list.

1
2
scala> xl.getClass
val res26: Class[? <: List[Int]] = class scala.collection.immutable.$colon$colon

If I create a view instead, I still have the map eagerly traversed, but then - here’s the difference - I get SeqView which runs the pipeline each time it is triggered with terminal operation (foreach):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scala> val xv = l.view.map(reportDouble)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
val xv: scala.collection.SeqView[Int] = SeqView(2, 4, 10, 20, 68, 156, 252)

scala> xv.filter(_ < 5).foreach(println)
Reporting 1 -> 2
2
Reporting 2 -> 4
4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252

scala> xv.filter(_ > 15).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
20
Reporting 34 -> 68
68
Reporting 78 -> 156
156
Reporting 126 -> 252
252

xv is a collection of evaluated elements. Well, I should have created a view and then pipe it thourgh the operations without sticking the collection to a val, so to speak:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala> l
val res31: List[Int] = List(1, 2, 5, 10, 34, 78, 126)

scala> l.map(reportDouble).filter(_ > 3).take(3).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
4
10
20

cala> l.view.map(reportDouble).filter(_ > 3).take(3).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
4
Reporting 5 -> 10
10
Reporting 10 -> 20
20

It seems the map did not force the creation of the collection in this case. Filtering and limit on take caused mimimal amount of operations.

LazyList

Docs: LazyList

LazyLists can be infinite, but their values are memoized. What does it mean? The pipeline would be evaluated in a way that elements that were previously accessed will not be evaluated again.

If we create a lazy list from list l and map reportDouble, mapping operation by itself will initiate traversal :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
scala> val lali = LazyList.from(l)
val lali: LazyList[Int] = LazyList(1, 2, 5, 10, 34, 78, 126)
scala> val lali = lali.map(reportDouble)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
val lali: LazyList[Int] = LazyList(2, 4, 10, 20, 68, 156, 252)

but then, as those values have been caclulated, filtering does not trigger re-calculation

1
2
3
4
5
6
7
8
9
scala> lali.filter(_ > 15).foreach(println)
20
68
156
252

scala> lali.filter(_ < 5).foreach(println)
2
4

So, it seems there is no difference between mapping over simply a List… wait, perhaps I should place map before filter and then take just a few items out of resulting collection. What happens? Ah, here is the difference: elements are only then calculated if required - lets compare: lazy list only calculates required 3 elemens:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
scala> lali.map(reportDouble).filter(_ < 20).take(3).foreach(println)
Reporting 1 -> 2
2
Reporting 2 -> 4
4
Reporting 5 -> 10
10

scala> lali.map(reportDouble).filter(_ > 3).take(3).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
4
Reporting 5 -> 10
10
Reporting 10 -> 20
20

but List always calculates all of them, no matter that we require only three elements:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala> l.map(reportDouble).filter(_ < 20).take(3).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
2
4
10

scala> l.map(reportDouble).filter(_ > 3).take(3).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
4
10
20

Notion of terminal operation

I still don’t have a good mental model of Scala collections: perhaps there is no “terminal operation” notion at all? It seems to me that map is a terminal operation, it imlicitely creates target container of proper type (monoid and semigroup properties?) unless it is part of the bigger pipeline and then it doesn’t.

And I should not have high expectation for LazyList in this regard, unless - again - I hide the map inside:

1
2
3
4
5
6
7
8
LazyList.from(l).map(reportDouble).filter(_ > 3).take(3).foreach(println)
Reporting 1 -> 2
Reporting 2 -> 4
4
Reporting 5 -> 10
10
Reporting 10 -> 20
20

Python

In Python, mapping and filtering operations leave you with something like iterator which by itself does not trigger enything.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
In [1]: li = [1, 2, 5, 10, 34, 78, 126]

In [2]: def reportDouble(v):
   ...:     print("Reporting {v} -> {v*2}")
   ...:     return v * 2
   ...:

In [3]: map(reportDouble, li)
Out[3]: <map at 0x7b77bd1bf5e0>

In [4]: filter(lambda v: v < 15, map(reportDouble, li))
Out[4]: <filter at 0x7b77bc4fd9c0>

One needs to explicitely create a list out of such map or filter, but interestingly, filter understands it filters over map and somehow forces its evaluation. We have to force evaluation the of the filter result:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
In [27]: fm = filter(lambda v: v < 15, map(reportDouble, li))

In [28]: [i for i in fm]
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Reporting 10 -> 20
Reporting 34 -> 68
Reporting 78 -> 156
Reporting 126 -> 252
Out[28]: [2, 4, 10]

Clojure

Perhaps what I want is the transducer concept I encountered in clojure. It seems like a definition or composition of operations which do not require a collection and can exist by itself. The documentation states that operations such as map, filter or take (for others see also cheatscheet).

What the docs say is:

Most sequence functions included in Clojure have an arity that produces a transducer. This arity omits the input collection; the inputs will be supplied by the process applying the transducer. Note: this reduced arity is not currying or partial application.

Deriving from the documentation example, I can create xf transducer which is a composition of other transducers (comp is composing right-to-left, so the mental model for final transucer operation is that it executes left-to-right, first filtering, then mapping and finally taking two elements). Final reducing operation is “+” here; transduced sequence before addition can be retrieved by using (into [] ...) form:

1
2
3
4
5
6
user> (def xf (comp (filter odd?) (map inc) (take 2))) 
#'user/xf
user> (transduce xf + [3,4,5,6,7,8,9,10,11,12])
10
user> (into [] xf  [3,4,5,6,7,8,9,10,11,12])
[4 6]

What happens when I define xf such that it prints an element, let’s say: very early, before filter? Would It then print everything? At least, it will not at the time transducer is defined. I expect it to fire for all vec elements, however. I aslo expect that if I place print after take, I will print two filterred and inced values, 4 and 6:

Let’s check. Define a: it takes value a, prints it and returns it.

1
2
3
4
5
6
7
user> (defn  a [a] (println "* " a " *") a)
#'user/a
user> (map a [1 2 50])
*  1  *
*  2  *
*  50  *
(1 2 50)

Transducer xf now is composed of (map a) as first element of the pipeline. Wow, see what happened: it not only did not call map on each element of the sequence - it only executec exectly 3 times - minimal number of times needed to actually retrieve two elements required by last element of the pipeline…:

1
2
3
4
5
6
7
8
user> (def xf (comp (map a) (filter odd?) (map inc) (take 2)))
#'user/xf
user> (transduce xf + [3,4,5,6,7,8,9,10,11,12])
*  3  *
*  4  *
*  5  *
10
user> 

Interesting.

In Rust

This behavior is similar to what we get in Rust - there are iterators everywhere :) For example, below program constructs an infinite collection (wait! it is iterator!) and traverses it:

1
2
3
4
5
6
7
8
let result: Vec<String> = (0u64..)
    .filter(|n| n % 2 == 0)
    .map(|n| n * n)
    .take_while(|&n| n < 1_000_000)
    .filter(|n| n.to_string().starts_with('4'))
    .map(|n| format!("val:{n}"))
    .collect();
// The entire chain runs element-by-element — no intermediate Vec

So the example with simple collection will be as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
fn report_double(v: i32) -> i32 {
    let doubled = v * 2;
    println!("Reporting {} -> {}", v, doubled);
    doubled
}
fn simple_pipeline() -> Vec<i32> {
    let v = vec![1, 2, 5, 10, 34, 78, 126];
    v.iter()
        .map(|n| report_double(*n))
        .filter(|v| *v < 20)
        .take(3)
        .collect()
}
fn main() {
    println!("Example pipeline: {:#?}", simple_pipeline())
}

which behaves nicely - does not go through whole collection and gives exactly what I expected = three elements:

1
2
3
4
5
6
7
8
Reporting 1 -> 2
Reporting 2 -> 4
Reporting 5 -> 10
Example pipeline: [
    2,
    4,
    10,
]

Making the pipeline more generic can be done by making collection’s element generic or by creating a function that takes an Iterator<Item=i32> - which allows to use the pipeline on both vec and set (note non-deteministic output):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fn pipeline<T: Iterator<Item = i32>>(it: T) -> Vec<i32> {
    it.map(|n| report_double(n))
        .filter(|v| *v < 20)
        .take(3)
        .collect()
}
fn main() {
    println!("Print pipeline:");
    println!("{:#?}", pipeline(vec![3, 4, 3, 100, 200].into_iter()));
    println!(
        "{:#?}",
        pipeline(HashSet::from([3, 4, 3, 100, 200]).into_iter())
    );
}

or by also returning an iterator - in this case I need to force collection (note a call to .collect::<Vec<i32>>())to be able to print the results as iterator itself does not implement Display:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fn pipeline<T: Iterator<Item = i32>>(it: T) -> impl Iterator<Item = i32> {
    it.map(|n| report_double(n)).filter(|v| *v < 20).take(3)
}
fn main() {
    println!("Print pipeline:");
    println!(
        "{:#?}",
        pipeline(vec![1, 1, 3, 4, 3, 100, 200].into_iter()).collect::<Vec<i32>>()
    );
    println!(
        "{:#?}",
        pipeline(HashSet::from([1, 1, 3, 4, 3, 100, 200]).into_iter()).collect::<Vec<i32>>()
    );
}

which gives following output (obviously, duplicates disappeared on HashSet construction, and set output changes in each execution of the program):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
 Print pipeline:
Reporting 1 -> 2
Reporting 1 -> 2
Reporting 3 -> 6
[
    2,
    2,
    6,
]
Reporting 4 -> 8
Reporting 200 -> 400
Reporting 100 -> 200
Reporting 1 -> 2
Reporting 3 -> 6
[
    8,
    2,
    6,
]

Conclusions

  • for Scala, use views or iterators; remember that mapp creates target collection
  • learn LazyList usecases; it is non-trivial
  • Rust behaves intuitively
  • Python seems a bit oldscool with its map/filter and a need to [i for i in map/filter...]

Personal note

I just remembered that I like Clojure, a lot. I also like Rust (I ❤️ 🦀) becasue of my earlier exposure to Scala - that’s for sure. This is the first modern language where I could do proper matching on ADTs (I remember data classes in Kotlin but don’t have it vivid in memory), didn’t have to sit inside classes to do anything - I could just use top-level functions. Perhaps in Scala it was not possible before 3.0? Not sure.

Next

Just keep learning.