An introduction to Vertx 4.x Futures

In order to write reactive applications in Vertx 3.x we use what is described a callback model. This approach is familiar to JS developers, yet it is not very practical when it comes to Java. Therefore, in the fourth release, the Vertx team added futures, that are used to handle results of asynchronous operations. In some sense, they are similar to Project Reactor publishers, yet they are easier to use (that is purely my opinion). Futures allow to define subscribers that are notified upon completion results (both successful or failed) as well futures have API to build both sequential and concurrent composition pipelines.

In this post we will examine how to use futures in order to write Vertx 4.x applications in the “new” style. I recommend to check this post, when you review any of my articles, that target Vertx 4.x in order to get an understanding of how to use futures.

What is the future in Vertx and how to create one

In Vertx, futures define a result of an asynchronous computation. This result is postponed and therefore we can’t interact with it directly; instead, we specify handlers, that will be called once any result will be obtained. From the theoretical point of view, that reminds us the publisher pattern, that is implemented in the Project Reactor. In same way, we have reactive publishers (Mono and Flux), that can emit some output. I omitted the definition of the particular output, for a reason: that is because the result can be either successful or it can be failed, and the completion can emit any number of elements – from 0 to one (in case of the Mono) or many (in case of the Flux) entities. Again, we can’t interact directly with publishers, but we could subscribe to the output.

Futures work in a similar manner. The future represents a derived result of an computation and can be successfully completed or can be failed. Once completed successfully, the Future class can emit the result. The Futures API in Vertx is easier to use than its Webflux counterpart. Basically, you need to remember following handlers (subscribers):

  • onComplete = this handler is called on any result of the future’s completion
  • onSuccess = this handler is called once the future is completed successfully (gives an access to the result)
  • onFailure = this handler is called once the future is failed (gives an access to the error)

The usage of futures will certainly make your code cleaner, than with callbacks; also with subscribers you can avoid numerous succeeded checking – you can provide unified subscribers for onFailure handlers. Let now review how to create a Future instance in Vertx 4.x. We can summarize these approaches into following groups:

  • Create from Java CompletableFuture and CompletionStage
  • Create a new instance with a literal (hard coded) value (either successful or failed)
  • Create from the Vertx Promise (we will not cover this method in this post)

The first point in our list is a creation of Futures from standard Java asynchronous types. You can initialize a Future from any CompletionStage type (such as the CompletableFuture class). Take a look on the code snippet below:

@Test
void createFromJava(Vertx vertx, VertxTestContext context){
    CompletableFuture<BigDecimal> result = CompletableFuture
        .supplyAsync(() -> BigDecimal.valueOf(10000).multiply(BigDecimal.valueOf(3)));

    Future<BigDecimal> future = Future.fromCompletionStage(result);

    future.onComplete(n-> {
        Assertions.assertThat(n.result()).isEqualByComparingTo(BigDecimal.valueOf(30000));
        context.completeNow();
    });
}

This fromComletionStage method resolves the Java async type to the Vertx Future. That provides an interoperability with Java APIs and other frameworks/libraries that operate with these types.

Another approach is to supply a literal (hard coded) value during the initialization. Beware! This is suitable for demo purposes or when you have already received a result!. Otherwise, please use the previous way, if you need to do first some computations (especially blocking computations). We can initialize the new Future with a success value:

@Test
void createFromValue(Vertx vertx, VertxTestContext context){
    Future<String> successFuture = Future.succeededFuture("Hello world!");
    successFuture.onSuccess(r -> {
        context.verify(() ->{
            Assertions.assertThat(r).isEqualTo("Hello world!");
            context.completeNow();
        });
    });
}

Alternatively, we can initialize a failed Future as following:

@Test
void createFailed(Vertx vertx, VertxTestContext context){
    Future<Throwable> failedFuture = Future.failedFuture(new RuntimeException());
    failedFuture.onFailure(err -> {
        context.verify(() -> {
            Assertions.assertThat(err).isInstanceOf(RuntimeException.class);
            context.completeNow();
        });
    });
}

I hope, that this subsection gave you a basic introduction to the Future creation. However, in a real life, you will mostly operate with instances created by Vertx or other libraries, so it makes sense to have a broader overview of how to manipulate futures, including mapping, recovery and composition.

Mapping (Sequential composition)

The map operation is a common thing in functional programming. Simply speaking, the mapper pattern provides a way to convert a one type to another one. Mapping is known also as a sequential composition, due to the fact, that it awaits for the result of the future and then apply the map function on it (e.g. it composes results sequentially). There are two types of mapping in Vertx (and in general):

  • Map = implemented by the map() method
  • Flat map = implemented by the compose() method

Both do same task (conversion), but the flat map returns not a value, but a new Future. This is very useful, when the mapping operation requires a separate asynchronous operation itself. Have a look on the basic usage of the map() function:

@Test
void mapFuture(Vertx vertx, VertxTestContext context){
    Future<String> future = Future.succeededFuture("Hello world");
    future
            .map(s -> s.toUpperCase())
            .onSuccess(result -> context.verify(() -> {
                Assertions.assertThat(result).isUpperCase();
                context.completeNow();
    }));
}

In this example, we take a result and transform it to an uppercase string. In the onSuccess handler is supplied the changed variable. If you need to complete more complex (and potentially blocking) job, you need to use a flat map version and to return another Future instead of a pure value:

@Test
void composeMapFuture(Vertx vertx, VertxTestContext context){
    Future<String> future = Future.succeededFuture("Hello world!");
    future
            .compose(s -> Future.succeededFuture(s.toUpperCase()))
            .onComplete(r -> {
                if (r.succeeded()) {
                    Assertions.assertThat(r.result()).isUpperCase();
                    context.completeNow();
                } else {
                    context.failNow(r.cause());
                }
            });
}

Well, while in this example we use the uppercase mapper again, the idea is different. The compose() method returns a future, not a modified value, so it is a flat map operation, which allows to sequentially chain modifications on the result of the original future.

Note, that compose() and flatMap() methods in Vertx do same thing.

Recover the future

The previous section deals with the mapping operation of the success result. However, what will happen, if the future will fail? Based on our knowledge, we can say, that in this case, the execution will go to a onFailure handler. However, what if we do not want to do it, and instead we want to provide an alternative value for the failed scenario? You can understand this similar to default values (the value that is provided as a substitute in a case of an error). This is not clearly mapping operation, because we do not modify an error object (yet, we have an access to it, and it can help to determine a reason of an error).

This operation is called recovery. Take a look on the following implementation below:

@Test
void recoverFuture(Vertx vertx, VertxTestContext context){
    Future<String> future = Future.failedFuture(new RuntimeException());
    future
            .recover(err -> Future.succeededFuture("Hello!"))
            .onSuccess(r -> {
                context.verify(() -> {
                    Assertions.assertThat(r).isEqualTo("Hello!");
                    context.completeNow();
                });
            });
}

Note, that the initial future is failed. However, after the recover operation, it is successful and therefore the onSuccess handler is triggered.

Concurrent composition patterns

In previous section we saw mapping and recovery – both are considered to be sequential composition patterns. That means, that the future waits for the completion of the previous one before if will execute. However, it is often needed to call several futures in the same time (in parallel). That kind of patterns is called concurrent composition patterns. Such patterns expose the CompositeFuture instance, which is a wrapper for a collection of futures (list). It permits to coordinate several futures upon their completion.

Any

The any function create a CompositeFuture, that is considered to be successed, if any single future is successfully completed. It does not matter, if others fail, unless they all fail. In the later case, the aggregated result considered failed as well.

@Test
void anyComposition(Vertx vertx, VertxTestContext context){
    Future<BigDecimal> future1 = Future.fromCompletionStage(CompletableFuture.supplyAsync(() -> BigDecimal.valueOf(10000)));
    Future<BigDecimal> future2 = Future.failedFuture(new RuntimeException());
    Future<CompositeFuture> composed = CompositeFuture.any(future1, future2);
    composed.onSuccess(r -> context.completeNow());
}

You can assert, that despite the future2 is failed, the composite result is considered to be successfully completed. However, if both futures are failed, the total result fails too:

@Test
void anyCompositionFail(Vertx vertx, VertxTestContext context){
    Future<BigDecimal> future1 = Future.failedFuture(new RuntimeException());
    Future<BigDecimal> future2 = Future.failedFuture(new RuntimeException());
    Future<CompositeFuture> composed = CompositeFuture.any(future1, future2);
    composed
            .onSuccess(r -> context.failNow("That should not be!"))
            .onFailure(r -> context.completeNow());
}

All

The all function produces the CompositeFuture, that is considered to be successful, if all futures are completed successfully. If any given future will fail, the aggregated result is failed too, despite of other futures.

@Test
void allCompositionSuccess(Vertx vertx, VertxTestContext context){
    Future<BigDecimal> future1 = Future.fromCompletionStage(CompletableFuture.supplyAsync(() -> BigDecimal.valueOf(10000)));
    Future<BigDecimal> future2 = Future.succeededFuture(new BigDecimal(20000));
    Future<CompositeFuture> composed = CompositeFuture.all(future1, future2);
    composed.onSuccess(r -> {
        BigDecimal value1 = r.resultAt(0);
        BigDecimal value2 = r.resultAt(1);
        context.verify(() -> {
            Assertions.assertThat(value1).isEqualByComparingTo(new BigDecimal((10000)));
            Assertions.assertThat(value2).isEqualByComparingTo(new BigDecimal(20000));
            context.completeNow();
        });
    });
}

Note, that because the CompositeFuture is actually the list inside, it means that we access a future with the resultAt() method based on 0 index, like with list elements.

This code snippet demonstrates the failed result of the all operation:

@Test
void allCompositionFailed(Vertx vertx, VertxTestContext context){
    Future<BigDecimal> future1 = Future.fromCompletionStage(CompletableFuture.supplyAsync(() -> BigDecimal.valueOf(10000)));
    Future<BigDecimal> future2 = Future.failedFuture(new RuntimeException());
    Future<CompositeFuture> composed = CompositeFuture.all(future1, future2);
    composed
            .onSuccess(r -> context.failNow("That should not be!"))
            .onFailure(err -> context.completeNow());
}

Join

The final operation that is covered in this post is the join method. In a nutshell, it produces the same result as the all operation: the result composite future is succeeded when all futures are succeeded and is failed when any future is failed. However, there is a difference:

  • The all composition fails immediately as soon as a single future in a list fails
  • The join composition continues to run other futures, despite the one is failed, until the full completion

Take a look on the code snippet below, which demonstrates the usage of the join method:

@Test
void joinFutures(Vertx vertx, VertxTestContext context){
    Future<BigDecimal> future1 = Future.fromCompletionStage(CompletableFuture.supplyAsync(() -> BigDecimal.valueOf(10000)));
    Future<BigDecimal> future2 = Future.succeededFuture(new BigDecimal(20000));
    Future<CompositeFuture> composed = CompositeFuture.join(future1, future2);
    composed.onSuccess(r -> {
        BigDecimal value1 = r.resultAt(0);
        BigDecimal value2 = r.resultAt(1);
        context.verify(() -> {
            BigDecimal sum = value1.add(value2);
            Assertions.assertThat(sum).isEqualByComparingTo(new BigDecimal("30000"));
            context.completeNow();
        });
    });
}

Source code

All source code snippets for this post (as well for my other Vertx posts) are available in this github repository. Feel free to explore them!

Conclusion

The Vertx 3.x programming model uses only callbacks to write applications. In the Vertx 4.x version was added another paradigm – futures, that define Vertx way of implementing reactive publishers. The Futures API allows to interoperate asynchronous code from native Java types and build both sequential and concurrent composition pipelines. If you have any questions, don’t hesitate to contact me.