Modern Java in Action 2 - fork-join and spliterators
book notes: fork join and spliterators
Second chapter shows how parallellization works with streams; explains what fork-join pool is (which is used by streams framework underneath), what data structures are decomposable, how to create your own spliterator for more effective splitting of source stream for parallel processing.
Parallel streams
- created by a call to
parallel()
in a chain of stream operations - fork/join pool uses
Runtime.getRuntime().available-Processors()
number of threads by default, but you can change it by specifyingjava.util.concurrent.ForkJoinPool.common.parallelism
system property:1
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
Hints
- measure (check Java Microbenchmark Harness (JMH))
- beware boxing/unboxing operations (use IntStream, DoubleStream of FloatStream)
- limit() and findFirst() are worse on parallel streams (they require order) than findAny() with unordered()
- don’t use inherently sequential functions: instead of this one, which is not easily parallelisable:
you should rather use:
1
Stream.iterate(1L, i -> i + 1).limit(N)
which works on long values (no boxing) and produces easliy splittable range ideal for parallelization.1
LongStream.rangeClosed(1, N)
- know which data structures are easily decomposable (ArrayList is - it can be split without traversing it, LinkedList isn’t)
- performance depends also on order of stream operations which may change stream characteristics (SIZED stream can be split, filtered stream has unknow number of elems)
- note the cost of terminal operation (if high - parallel time gain can be smaller that time used for combining partial results)
Decomposability table
class | composability |
---|---|
ArrayList | 😄 Excellent |
IntStream.range | 😄 Excellent |
HashSet | 😐 Good |
TreeSet | 😐 God |
LinkedList | 😭 Bad |
Stream.iterate | 😭 Bad |
Fork Join pool
- ForkJoinPool is an implementation of ExecutorService
- requires creation of a RecursiveTask
subclass with protected abstract R compute();
- to use it, use following algorithm:
|
|
- remember: join() is blocking, so use it after results of subtasks are ready
- as a RecursiveTask, use compute() and fork(), don’t use the invoke() on a pool
- wisely decide the criteria if the task should be split further (see example in a javadoc for RecursiveAction class where
getSurplusQueuedTaskCount()
is used for the criteria) - subtasks should take longer than creation of a new task
- it is hard to debug (as with all multithreaded programs)
- it is hard to measure, due to the fact that fork-join should be warmed-up
Work stealing
- goal: ensure even distribution of work between threads
- each thread in the pool keeps doubly-linked list of tasks to execute, takes one by one from the head and executes
- if no more tasks it thread’s onw queue, it randomly selects a thread with no-empty queue and steals a task from the tail of that queue
Important You don’t have to use fork-join if your datastructure is decomposable; you just use parallel data streams. Automatic way of traversing a source and splitting it is “spliterator” - the interface which standard collections implement in the default implementation.
Spliterator
This is the documentation of Spliterator interface.
Spliterators define how a parallel stream can split the data it traverses
- in
Spliterator<T>
, T is the type of source element boolean tryAdvance(Consumer<? super T> action)
is used to sequentially consume elmenets one by one, returningtrue
if there are more elementsSpliterator<T> trySplit()
used to create new “partition” of elements (returned as new spliterator) allowing this spliterator and new spliterator to be processed in parallellong estimateSize()
helps with even elements distributionint characteristics()
returns encded flags (may be used to control or optimize the usage of spliterator):- ORDERED - elems have defined order which is enforced when traversing
- DISTINCT - elems are pairwise different (in terms of
equals
) - SORTED - elems follow sort order
- SIZED - source has known size, so
estimatedSize()
is precise - NON-NULL - elems won’t be null
- IMMUTABLE - source can’t be modified (no elems added, removed, modified during traversal)
- CONCURRENT - source may be concurrently modified by other threadwithout any synchronization
- SUBSIZED - this spliterator and newly created ones are SIZED
Example
This is taken from chapter 7
Stream of Characters
Stream of characters from string:
|
|
Reduction
Word counter used for reduction of the stream:
|
|
Implementation
Here is the code for WordCounter:
|
|
Spliterator
This is the spliterator that spits caracter stream only on spaces:
|
|
Spliterator usage
It is used with StreamSupport.stream(splitarator, isParallel)
stream creation function:
|
|
Ten wpis jest częścią serii modern-java-in-action.
- 2022-05-07 - Modern Java in Action 8 - concurrency and reactive programming
- 2022-02-07 - Modern Java in Action 7 - notes about the module system
- 2022-30-06 - Modern Java in Action 6 - Time and Date
- 2022-20-06 - Modern Java in Action 5 - Optional
- 2022-17-06 - Modern Java in Action 4 - refactoring and testing
- 2022-15-06 - Modern Java in Action 3 - collection API
- 2022-14-06 - Modern Java in Action 2 - fork-join and spliterators
- 2022-03-06 - Modern Java in Action 1 - Java 8 refresher