Contents

Modern Java in Action 8 - concurrency and reactive programming

Enhanced Concurrency and Reactive programming

Task appropriate for the job

  • high parallelization : use fork-join pool and and parallel streams for highly CPU-bound calculations
  • high interactivity: high concurrency and a need to keep a lot of IO-bound tasks on single CPU: use Future (Completable Future) or Flow API

Problems and solutions

Threads problems

  • map 1:1 to OS threads (expensive to create and of limited number)
  • much higher number than hardware threads (cores? well, it seems a core is more complex)

CPUs like the Intel i7-6900K have multiple hardware threads per core, so the CPU can execute useful instructions even for short delays such as a cache miss.

  • but if waiting for IO (blocking state) is needed - OS threads are not utilized well

Solutions

Thread pool

  • allows to decouple task submission from thread creation
  • cheap to submit thousands of tasks (handled on first-come, first-served basis)

but:

  • a pool of K threads can only process maximum of K tasks concurrently
  • others wait in a queue
  • threads in a pool still will be blocked and not used for processing if are waiting for IO
  • one needs to remember to close (“shut-dow”) property the thread pool (s) before exiting the app

Other models

  • fork-join and streams have the property to nest exactly as the calling/returning of methods
    • whenever a task is started, the same method that started it waits for it to complete

Synchronous and asynchronous

The problem of adding the result of two blocking comutations:

1
2
int f(int x);
int g(int x);

can be solved in multiple ways:

  1. As a blocking sequential code:
1
2
3
int y = f(x);
int z = g(x);
System.out.println(y + z);
  1. As a computation in threads (note: complex thread management, awkward handling of results):
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class ThreadExample {

    public static void main(String[] args) throws InterruptedException {
        int x = 1337;
        Result result = new Result();

        Thread t1 = new Thread(() -> { result.left = f(x); } );
        Thread t2 = new Thread(() -> { result.right = g(x); });
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(result.left + result.right);
    }

    private static class Result {
        private int left;
        private int right;
    }
}
  1. Using executor service and providing calculations as tasks; waiting in blocking .get() calls for results:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class ExecutorServiceExample {
    public static void main(String[] args)
        throws ExecutionException, InterruptedException {

        int x = 1337;

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Integer> y = executorService.submit(() -> f(x));
        Future<Integer> z = executorService.submit(() -> g(x));
        System.out.println(y.get() + z.get());

        executorService.shutdown();
    }
}
  1. Future style API (computation is non-blocking, returns Futures:
1
2
3
Future<Integer> y = f(x);
Future<Integer> z = g(x);
System.out.println(y.get() + z.get());
  1. Reactive style API - passing a callback:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class CallbackStyleExample {
    public static void main(String[] args) {

        int x = 1337;
        Result result = new Result();

        f(x, (int y) -> {
            result.left = y;
            System.out.println((result.left + result.right));
        } );

        g(x, (int z) -> {
            result.right = z;
            System.out.println((result.left + result.right));
        });

    }
}
  1. CompletableFuture (stay tuned, this is a tasty bit in next chapter)

Dealing with exceptions

  1. Functional, reactive way - provide callback for handling result and another one for handling exception:
1
2
void f(int x, Consumer<Integer> dealWithResult,
  Consumer<Throwable> dealWithException);
  1. With Flow’s Subscriber interface:
1
void f(int x, Subscriber<Integer> s);

where Subscriber can do:

1
2
3
void    onComplete()
void    onError(Throwable throwable)
void    onNext(T item)

Box-and-channel diagram

This diagram presents a computational challenge:

Without parallelism, one can simply do:

1
2
int t = p(x);
System.out.println( r(q1(t), q2(t)) );

With parallelism, things get complicated.If implemented like below (which is good for low parallelism), then in case we need high parallelism and the boxes themselves have inner box-and-channel computations, we might end up with a lot of blocking with get calls, potentially casuing a deadlock:

1
2
3
4
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
System.out.println( r(a1.get(),a2.get()));

Solution with CompletableFuture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public class CFCombine {

    public static void main(String[] args) throws ExecutionException,
     InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int x = 1337;

        CompletableFuture<Integer> a = new CompletableFuture<>();
        CompletableFuture<Integer> b = new CompletableFuture<>();
        CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z);
        executorService.submit(() -> a.complete(f(x)));
        executorService.submit(() -> b.complete(g(x)));

        System.out.println(c.get());
        executorService.shutdown();

    }
}

The core line here says c = a.thenCombine(b, (y, z)-> y + z); which defines a required, optimal scenario: both a and b are CompletableFutures, they get executed asynchronously and once their results arrive, the callback is (asynchronously!) calculated as c.

Pub-sub model

Sometimes linear pipeline provided by Streams is not enough - we want several sinks or just an arbitrary shape of the pipeline. An expressive way of doing it is by using java.util.concurrent.Flow (since Java 9):

This model consists of:

  • a publisher to which a subscriber can subscribe
  • the connection (a subscription)
  • messages (or events) transmitted using the connection

A bit more about CompletableFutures

Standard execution pattern with Future:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ExecutorService executor = Executors.newCachedThreadPool();        
Future<Double> future = executor.submit(new Callable<Double>() {   
        public Double call() {
            return doSomeLongComputation();                        
        }});
doSomethingElse();                                                 
try {
    Double result = future.get(1, TimeUnit.SECONDS);               
} catch (ExecutionException ee) {
    // the computation threw an exception
} catch (InterruptedException ie) {
    // the current thread was interrupted while waiting
} catch (TimeoutException te) {
    // the timeout expired before the Future completion
}

is not concise at all and is not enough for following scenarios:

  • combining two asynchronous computations (both if independent and when the second depends on the result of the first)
  • waiting for the completion of all tasks performed by a set of Futures
  • waiting for the completion of only the quickest task in a set of Futures
  • programmatically completing a Future
  • reacting to a Future completion (be notified when the completion happens, perform an action with the result of the Future) - not not wait in get

This can be achieved with CompletableFutures’ API.

CompletableFuture javadoc

This is javadoc documentation for: CompletableFuture.

It is really worth skimming-through.

Synchronous and asynchronous APIS

  • synchronous: a function call waits until a result is ready (in the same or in another thread) - traditional API
  • asynchronous: a function call returns immediately and the caller may do other things; the result is executed in other thread and processed further (e.g. in a callback method) - common style of programming in IO-intensive applications

Note: in the realm of Operating Systems, we often say “blocking” and “non-blocking” operation to talk about specific implementation of IO operation.

Error propagation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () -> {
                try {
                    double price = calculatePrice(product);
                    futurePrice.complete(price);                   
                } catch (Exception ex) {
                    futurePrice.completeExceptionally(ex);         
                }
    }).start();
    return futurePrice;
}
  • methods complete and completeExceptionally allow to handle result and exceptions (if get has no timeout, exception in other thread would just kill the computation, making tha caller wait on get forever)

CompletableFuture factory methods

The same as above can be achieved with CompletableFuture.supplyAsync. This and othe rfactory methods have an overloaded version that takes an executor as last parameter.

1
2
3
public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

Example with executor service (note: we make threads daemons, so that they will be killed when jvm exits; non-daemon would still be alive, waining for never-to-happen events or conditions, keeping - unnecessarily -an OS thread)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private final Executor executor =
        Executors.newFixedThreadPool(Math.min(shops.size(), 100),   
                                     (Runnable r) -> {
                Thread t = new Thread(r);
                t.setDaemon(true);                                  
                return t;
            }
);

CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
  shop.getPrice(product), executor);

If shop.getPrice gives a string which can be parsed to Quote instance, and if you can calcucale a discount based on Quote data, then you could do the following. Note that parsing Quote is not sheduled asynchronously, but when Quote is parsed, discount calculation is supplied to an executor for asynchronous calculation possiby by other thread in the pool (supplyAsync; if supply is used, then the same thread would pick up the discount calculation after quote is ready).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
        shops.stream()
             .map(shop -> CompletableFuture.supplyAsync(                   
                                   () -> shop.getPrice(product), executor))
             .map(future -> future.thenApply(Quote::parse))                
             .map(future -> future.thenCompose(quote ->                    
                         CompletableFuture.supplyAsync(
                           () -> Discount.applyDiscount(quote), executor)))
                .collect(toList());
    return priceFutures.stream()
            .map(CompletableFuture::join)                                  
            .collect(toList());
}

Combine two independent futures

If a computation should be performed on the result of two independent tasks, then:

1
2
3
4
5
6
7
Future<Double> futurePriceInUSD =
        CompletableFuture.supplyAsync(() -> shop.getPrice(product))       
        .thenCombine(
            CompletableFuture.supplyAsync(
                () ->  exchangeService.getRate(Money.EUR, Money.USD)),    
            (price, rate) -> price * rate                                 
        );

Here, two tasks are supplied for asynchronous execution to the pool:

  • getting the price (shop.getPrice(product))
  • getting rate tor exchange service (exchangeService.getRate(Money.EUR, Money.USD)) Then, the results are awaited for and combined - when ready - together in .thenCombine combinator.

Timeout

We could use simple timout for the whole operation and/or provide a default value that would complete the futrure if the timeout occurs:

1
2
3
4
5
6
7
8
Future<Double> futurePriceInUSD =
        CompletableFuture.supplyAsync(() -> shop.getPrice(product))
        .thenCombine(
            CompletableFuture.supplyAsync(
                () ->  exchangeService.getRate(Money.EUR, Money.USD))
              .completeOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),      
            (price, rate) -> price * rate
        ).orTimeout(3, TimeUnit.SECONDS);