Contents

Modern Java in Action 2 - fork-join and spliterators

book notes: fork join and spliterators

Second chapter shows how parallellization works with streams; explains what fork-join pool is (which is used by streams framework underneath), what data structures are decomposable, how to create your own spliterator for more effective splitting of source stream for parallel processing.

Parallel streams

  • created by a call to parallel() in a chain of stream operations
  • fork/join pool uses Runtime.getRuntime().available-Processors() number of threads by default, but you can change it by specifying java.util.concurrent.ForkJoinPool.common.parallelism system property:
    1
    
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
    

Hints

  • measure (check Java Microbenchmark Harness (JMH))
  • beware boxing/unboxing operations (use IntStream, DoubleStream of FloatStream)
  • limit() and findFirst() are worse on parallel streams (they require order) than findAny() with unordered()
  • don’t use inherently sequential functions: instead of this one, which is not easily parallelisable:
    1
    
    Stream.iterate(1L, i -> i + 1).limit(N)
    
    you should rather use:
    1
    
    LongStream.rangeClosed(1, N)
    
    which works on long values (no boxing) and produces easliy splittable range ideal for parallelization.
  • know which data structures are easily decomposable (ArrayList is - it can be split without traversing it, LinkedList isn’t)
  • performance depends also on order of stream operations which may change stream characteristics (SIZED stream can be split, filtered stream has unknow number of elems)
  • note the cost of terminal operation (if high - parallel time gain can be smaller that time used for combining partial results)

Decomposability table

class composability
ArrayList 😄 Excellent
IntStream.range 😄 Excellent
HashSet 😐 Good
TreeSet 😐 God
LinkedList 😭 Bad
Stream.iterate 😭 Bad

Fork Join pool

  • ForkJoinPool is an implementation of ExecutorService
  • requires creation of a RecursiveTask subclass with protected abstract R compute();
  • to use it, use following algorithm:
1
2
3
4
5
6
7
8
if (task is small enough or no longer divisible) {
    compute task sequentially
} else {
    split task in two subtasks
    call this method recursively possibly further splitting each subtask
    wait for the completion of all subtasks
    combine the results of each subtask
}
  • remember: join() is blocking, so use it after results of subtasks are ready
  • as a RecursiveTask, use compute() and fork(), don’t use the invoke() on a pool
  • wisely decide the criteria if the task should be split further (see example in a javadoc for RecursiveAction class where getSurplusQueuedTaskCount() is used for the criteria)
  • subtasks should take longer than creation of a new task
  • it is hard to debug (as with all multithreaded programs)
  • it is hard to measure, due to the fact that fork-join should be warmed-up

Work stealing

  • goal: ensure even distribution of work between threads
  • each thread in the pool keeps doubly-linked list of tasks to execute, takes one by one from the head and executes
  • if no more tasks it thread’s onw queue, it randomly selects a thread with no-empty queue and steals a task from the tail of that queue

Important You don’t have to use fork-join if your datastructure is decomposable; you just use parallel data streams. Automatic way of traversing a source and splitting it is “spliterator” - the interface which standard collections implement in the default implementation.

Spliterator

This is the documentation of Spliterator interface.

Spliterators define how a parallel stream can split the data it traverses

  • in Spliterator<T>, T is the type of source element
  • boolean tryAdvance(Consumer<? super T> action) is used to sequentially consume elmenets one by one, returning true if there are more elements
  • Spliterator<T> trySplit() used to create new “partition” of elements (returned as new spliterator) allowing this spliterator and new spliterator to be processed in parallel
  • long estimateSize() helps with even elements distribution
  • int characteristics() returns encded flags (may be used to control or optimize the usage of spliterator):
    • ORDERED - elems have defined order which is enforced when traversing
    • DISTINCT - elems are pairwise different (in terms of equals)
    • SORTED - elems follow sort order
    • SIZED - source has known size, so estimatedSize() is precise
    • NON-NULL - elems won’t be null
    • IMMUTABLE - source can’t be modified (no elems added, removed, modified during traversal)
    • CONCURRENT - source may be concurrently modified by other threadwithout any synchronization
    • SUBSIZED - this spliterator and newly created ones are SIZED

Example

This is taken from chapter 7

Stream of Characters

Stream of characters from string:

1
2
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
                                    .mapToObj(SENTENCE::charAt);

Reduction

Word counter used for reduction of the stream:

1
2
3
4
5
6
private int countWords(Stream<Character> stream) {
    WordCounter wordCounter = stream.reduce(
        new WordCounter(0, true),  
        WordCounter::accumulate,
        WordCounter::combine);
    return wordCounter.getCounter();

Implementation

Here is the code for WordCounter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class WordCounter {
    private final int counter;
    private final boolean lastSpace;
    public WordCounter(int counter, boolean lastSpace) {
        this.counter = counter;
        this.lastSpace = lastSpace;
    }
    public WordCounter accumulate(Character c) {                  
        if (Character.isWhitespace(c)) {
            return lastSpace ?
                   this :
                   new WordCounter(counter, true);
        } else {
            return lastSpace ?
                   new WordCounter(counter+1, false) :            
                   this;
        }
    }
    public WordCounter combine(WordCounter wordCounter) {         
        return new WordCounter(counter + wordCounter.counter,
                               wordCounter.lastSpace);            
    }
    public int getCounter() {
        return counter;
    }
}

Spliterator

This is the spliterator that spits caracter stream only on spaces:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class WordCounterSpliterator implements Spliterator<Character> {
    private final String string;
    private int currentChar = 0;
    public WordCounterSpliterator(String string) {
        this.string = string;
    }
    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        action.accept(string.charAt(currentChar++));                      
        return currentChar < string.length();                            
    }
    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentChar;
        if (currentSize < 10) {
            return null;                                                  
        }
        for (int splitPos = currentSize / 2 + currentChar;
                 splitPos < string.length(); splitPos++) {                
            if (Character.isWhitespace(string.charAt(splitPos))) {        
                Spliterator<Character> spliterator =                      
                   new WordCounterSpliterator(string.substring(currentChar,
                                                               splitPos));
                currentChar = splitPos;                                   
                return spliterator;                                       
            }
        }
        return null;
    }
    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }
    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NON-NULL + IMMUTABLE;
    }
}

Spliterator usage

It is used with StreamSupport.stream(splitarator, isParallel) stream creation function:

1
2
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);