You probably already know about Futures
A Future represents the pending result of an asynchronous computation. It offers a method — get — that returns the result of the computation when it's done.The problem is that a call to get is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.
Sure — you can keep coding all scenarios into the job you're sending to the executor, but why should you have to worry about all the plumbing around the logic you really care about?
This is where CompletableFuture saves the day
Beside implementing the Future interface, CompletableFuture also implements the CompletionStage interface.A CompletionStage is a promise. It promises that the computation eventually will be done.
The great thing about the CompletionStage is that it offers a vast selection of methods that let you attach callbacks that will be executed on completion.
This way we can build systems in a non-blocking fashion.
public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
The simplest asynchronous computation
Let's start with the absolute basics — creating a simple asynchronous computation.CompletableFuture.supplyAsync(this::sendMsg);
It's as easy as that.
supplyAsync takes a Supplier containing the code we want to execute asynchronously — in our case the sendMsg method.
If you've worked a bit with Futures in the past, you may wonder where the Executor went. If you want to, you can still define it as a second argument. However, if you leave it out it will be submitted to the ForkJoinPool.commonPool().
Methods that do not take an Executor as an argument but end with ...Async will use ForkJoinPool.commonPool() (global, general purpose pool introduces in JDK 8).
This applies to most methods in CompletableFuture class. runAsync() is simple to understand, notice that it takes Runnable, therefore it returns CompletableFuture<Void> as Runnable doesn't return anything. If you need to process something asynchronously and return result, use Supplier<U>:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);
lambdas:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor);
methods:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
Creating and obtaining CompletableFuture
Factory Methods:static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
Attaching a callback
Our first asynchronous task is done. Let's add a callback to it!The beauty of a callback is that we can say what should happen when an asynchronous computation is done without waiting around for the result.
In the first example, we simply sent a message asynchronously by executing sendMsg in its own thread.
Now let's add a callback where we notify about how the sending of the message went.
CompletableFuture.supplyAsync(this::sendMsg).thenAccept(this::notify);
thenAccept is one of many ways to add a callback. It takes a Consumer — in our case notify — which handles the result of the preceding computation when it's done.
Chaining multiple callbacks
Transforming and acting on one CompletableFuture (thenApply):
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
lambdas:
CompletableFuture<Double> f3 = f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
Running code on completion (thenAccept/thenRun)
CompletableFuture<Void> thenAccept(Consumer<? super T> block);CompletableFuture<Void> thenRun(Runnable action);
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
...Async variants are available as well for both methods, with implicit and explicit executor. I can't emphasize this enough: thenAccept()/thenRun() methods do not block (even without explicit executor). Treat them like an event listener/handler that you attach to a future and that will execute some time in the future. "Continuing" message will appear immediately, even if future is not even close to completion.
Combining two CompletableFuture together
Combining (chaining) two futures (thenCompose())<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f = docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc) //...
Transforming values of two futures (thenCombine())
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...
Notice that in Java 8 you can replace (cust, shop) -> findRoute(cust, shop) with simple this::findRoute method reference:
customerFuture.thenCombine(shopFuture, this::findRoute);
To keep passing values, you can simply use thenApply instead.
thenApply takes a Function which accepts a value, but also return one.
To see how this works, let's extend our previous example by first finding a receiver.
CompletableFuture.supplyAsync(this::findReceiver).thenApply(this::sendMsg).thenAccept(this::notify);
Now the asynchronous task will first find a receiver, then send a message to the receiver before it passes the result on to the last callback to notify.
Building asynchronous systems
When building bigger asynchronous systems, things work a bit differently.You'll usually want to compose new pieces of code based on smaller pieces of code. Each of these pieces would typically be asynchronous — in our case returning CompletionStages.
Until now, sendMsg has been a normal blocking function. Let's now assume that we got a sendMsgAsync method that returns a CompletionStage.
If we kept using thenApply to compose the example above, we would end up with nested CompletionStages.
// Returns type CompletionStage<CompletionStage<String>>
CompletableFuture.supplyAsync(this::findReceiver).thenApply(this::sendMsgAsync);
We don't want that, so instead we can use thenCompose which allows us to give a Function that returns a CompletionStage. This will have a flattening effect like a flatMap.
// Returns type CompletionStage<String>
CompletableFuture.supplyAsync(this::findReceiver).thenCompose(this::sendMsgAsync);
This way we can keep composing new functions without losing the one layered CompletionStage.
Callback as a separate task using the async suffix
Until now all our callbacks have been executed on the same thread as their predecessor.If you want to, you can submit the callback to the ForkJoinPool.commonPool() independently instead of using the same thread as the predecessor. This is done by using the async suffix version of the methods CompletionStage offers.
Let's say we want to send two messages at one go to the same receiver.
CompletableFuture<String> receiver = CompletableFuture.supplyAsync(this::findReceiver);
receiver.thenApply(this::sendMsg);
receiver.thenApply(this::sendOtherMsg);
In the example above, everything will be executed on the same thread. This results in the last message waiting for the first message to complete.
Now consider this code instead.
CompletableFuture<String> receiver = CompletableFuture.supplyAsync(this::findReceiver);
receiver.thenApplyAsync(this::sendMsg);
receiver.thenApplyAsync(this::sendMsg);
By using the async suffix, each message is submitted as separate tasks to the ForkJoinPool.commonPool(). This results in both the sendMsg callbacks being executed when the preceding calculation is done.
The key is — the asynchronous version can be convenient when you have several callbacks dependent on the same computation.
What to do when it all goes wrong
As you know, bad things can happen. And if you've worked with Future before, you know how bad it could get.Luckily CompletableFuture has a nice way of handling this, using exceptionally.
CompletableFuture.supplyAsync(this::failingMsg).exceptionally(ex -> new Result(Status.FAILED)).thenAccept(this::notify);
exceptionally gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception.
This way succeeding callbacks can continue with the alternative result as input.
If you need more flexibility, check out whenComplete and handle for more ways of handling errors.
Error handling of single CompletableFuture
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {if (ok != null) {
return Integer.parseInt(ok);
} else {
log.warn("Problem", ex);
return -1;
}
});
lambdas:
CompletableFuture<String> safe = future.exceptionally(ex -> "We have a problem: " + ex.getMessage());
handle() is called always, with either result or exception argument being not-null. This is a one-stop catch-all strategy.
Callback depending on multiple computations
Sometimes it would be really helpful to be able to create a callback that is dependent on the result of two computations. This is where thenCombine becomes handy.thenCombine allows us to register a BiFunction callback depending on the result of two CompletionStages.
To see how this is done, let’s in addition to finding a receiver also execute the heavy job of creating some content before sending a message.
CompletableFuture<String> to = CompletableFuture.supplyAsync(this::findReceiver);
CompletableFuture<String> text = CompletableFuture.supplyAsync(this::createContent);
to.thenCombine(text, this::sendMsg);
First, we've started two asynchronous jobs — finding a receiver and creating some content. Then we use thenCombine to say what we want to do with the result of these two computations by defining our BiFunction.
It's worth mentioning that there is another variant of thenCombine as well — called runAfterBoth. This version takes a Runnable not caring about the actual values of the preceding computation — only that they're actually done.
Callback dependent on one or the other
Ok, so we've now covered the scenario where you depend on two computations. Now, what about when you just need the result of one of them?Let’s say you have two sources of finding a receiver. You’ll ask both, but will be happy with the first one returning with a result.
CompletableFuture<String> firstSource = CompletableFuture.supplyAsync(this::findByFirstSource);
CompletableFuture<String> secondSource = CompletableFuture.supplyAsync(this::findBySecondSource);
firstSource.acceptEither(secondSource, this::sendMsg);
As you can see, it's solved easily by acceptEither taking the two awaiting calculations and a Function that will be executed with the result of the first one to return.
Waiting for both CompletableFutures to complete
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
Imagine that in the example above, instead of producing new CompletableFuture<Route> you simply want send some event or refresh GUI immediately. This can be easily achieved with thenAcceptBoth():
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
final Route route = findRoute(cust, shop);
//refresh GUI with route
});
I hope I'm wrong but maybe some of you are asking themselves a question: why can't I simply block on these two futures? Like here:
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());
Waiting for first CompletableFuture to complete
Another interesting part of the CompletableFuture API is the ability to wait for first (as opposed to all) completed future. This can come handy when you have two tasks yielding result of the same type and you only care about response time, not which task resulted first. API methods (...Async variations are available as well):
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
System.out.println("Result: " + s);
});
Transforming first completed
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone = fast.applyToEither(predictable, Function.<String>identity());
Combining multiple CompletableFuture together
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf() takes an array of futures and returns a future that completes when all of the underlying futures are completed (barrier waiting for all). anyOf() on the other hand will wait only for the fastest of the underlying futures.