Modern Java in Action 8 - concurrency and reactive programming
Enhanced Concurrency and Reactive programming

Task appropriate for the job
- high parallelization : use fork-join pool and and parallel streams for highly CPU-bound calculations
- high interactivity: high concurrency and a need to keep a lot of IO-bound tasks on single CPU: use Future (Completable Future) or Flow API
Problems and solutions
Threads problems
- map 1:1 to OS threads (expensive to create and of limited number)
- much higher number than hardware threads (cores? well, it seems a core is more complex)
CPUs like the Intel i7-6900K have multiple hardware threads per core, so the CPU can execute useful instructions even for short delays such as a cache miss.
- but if waiting for IO (blocking state) is needed - OS threads are not utilized well
Solutions
Thread pool
- allows to decouple task submission from thread creation
- cheap to submit thousands of tasks (handled on first-come, first-served basis)
but:
- a pool of K threads can only process maximum of K tasks concurrently
- others wait in a queue
- threads in a pool still will be blocked and not used for processing if are waiting for IO
- one needs to remember to close (“shut-dow”) property the thread pool (s) before exiting the app
Other models
- fork-join and streams have the property to nest exactly as the calling/returning of methods
- whenever a task is started, the same method that started it waits for it to complete
Synchronous and asynchronous
The problem of adding the result of two blocking comutations:
|
|
can be solved in multiple ways:
- As a blocking sequential code:
|
|
- As a computation in threads (note: complex thread management, awkward handling of results):
|
|
- Using executor service and providing calculations as tasks; waiting in blocking
.get()
calls for results:
|
|
- Future style API (computation is non-blocking, returns
Future
s:
|
|
- Reactive style API - passing a callback:
|
|
- CompletableFuture (stay tuned, this is a tasty bit in next chapter)
Dealing with exceptions
- Functional, reactive way - provide callback for handling result and another one for handling exception:
|
|
- With
Flow
’s Subscriber interface:
|
|
where Subscriber can do:
|
|
Box-and-channel diagram
This diagram presents a computational challenge:
Without parallelism, one can simply do:
|
|
With parallelism, things get complicated.If implemented like below (which is good for low parallelism), then in case we need high parallelism and the boxes themselves have inner box-and-channel computations,
we might end up with a lot of blocking with get
calls, potentially casuing a deadlock:
|
|
Solution with CompletableFuture
|
|
The core line here says c = a.thenCombine(b, (y, z)-> y + z);
which defines a required, optimal scenario:
both a
and b
are CompletableFutures, they get executed asynchronously and once their results arrive, the callback is (asynchronously!) calculated as c
.
Pub-sub model
Sometimes linear pipeline provided by Streams is not enough - we want several sinks or just an arbitrary shape of the pipeline. An expressive way of doing it is by using java.util.concurrent.Flow (since Java 9):
This model consists of:
- a publisher to which a subscriber can subscribe
- the connection (a subscription)
- messages (or events) transmitted using the connection
A bit more about CompletableFutures
Standard execution pattern with Future:
|
|
is not concise at all and is not enough for following scenarios:
- combining two asynchronous computations (both if independent and when the second depends on the result of the first)
- waiting for the completion of all tasks performed by a set of Futures
- waiting for the completion of only the quickest task in a set of Futures
- programmatically completing a Future
- reacting to a Future completion (be notified when the completion happens, perform an action with the result of the Future) - not not wait in get
This can be achieved with CompletableFutures’ API.
This is javadoc documentation for: CompletableFuture.
It is really worth skimming-through.
Synchronous and asynchronous APIS
- synchronous: a function call waits until a result is ready (in the same or in another thread) - traditional API
- asynchronous: a function call returns immediately and the caller may do other things; the result is executed in other thread and processed further (e.g. in a callback method) - common style of programming in IO-intensive applications
Note: in the realm of Operating Systems, we often say “blocking” and “non-blocking” operation to talk about specific implementation of IO operation.
Error propagation
|
|
- methods
complete
andcompleteExceptionally
allow to handle result and exceptions (ifget
has no timeout, exception in other thread would just kill the computation, making tha caller wait onget
forever)
CompletableFuture factory methods
The same as above can be achieved with CompletableFuture.supplyAsync
. This and othe rfactory methods have an overloaded version that takes an executor as last parameter.
|
|
Example with executor service (note: we make threads daemons, so that they will be killed when jvm exits; non-daemon would still be alive, waining for never-to-happen events or conditions, keeping - unnecessarily -an OS thread)
|
|
If shop.getPrice
gives a string which can be parsed to Quote instance, and if you can calcucale a discount based on Quote data,
then you could do the following. Note that parsing Quote is not sheduled asynchronously, but when Quote is parsed, discount calculation
is supplied to an executor for asynchronous calculation possiby by other thread in the pool (supplyAsync
; if supply
is used, then the same thread would pick up the discount calculation after quote is ready).
|
|
Combine two independent futures
If a computation should be performed on the result of two independent tasks, then:
|
|
Here, two tasks are supplied for asynchronous execution to the pool:
- getting the price (
shop.getPrice(product)
) - getting rate tor exchange service (
exchangeService.getRate(Money.EUR, Money.USD)
) Then, the results are awaited for and combined - when ready - together in.thenCombine
combinator.
Timeout
We could use simple timout for the whole operation and/or provide a default value that would complete the futrure if the timeout occurs:
|
|
Ten wpis jest częścią serii modern-java-in-action.
- 2022-05-07 - Modern Java in Action 8 - concurrency and reactive programming
- 2022-02-07 - Modern Java in Action 7 - notes about the module system
- 2022-30-06 - Modern Java in Action 6 - Time and Date
- 2022-20-06 - Modern Java in Action 5 - Optional
- 2022-17-06 - Modern Java in Action 4 - refactoring and testing
- 2022-15-06 - Modern Java in Action 3 - collection API
- 2022-14-06 - Modern Java in Action 2 - fork-join and spliterators
- 2022-03-06 - Modern Java in Action 1 - Java 8 refresher