Advanced Vertx EventBus – Part 1: Invalid and Dead Messages

Message-based systems allow to provide a greater level of interoperability as well a greater level of fault tolerance in general. Due to the fact, that messages are exchanged in an asynchronous manner, components are not depended on each other; similarly, using a separate broker component (such as EventBus) can decrease coupling in general. Another aspect that arises here, is surely, messages are sent and received in a reactive way, but what if they are lost or expired? With an intermediate layer, such as the mentioned Vertx EventBus, we do not relate components with each other, like for instance with different Observer pattern implementations. What will happen if the destination do not exist at all?

All these questions are relevant, especially when you design big and complex applications. Hopefully, there are already solutions, that can fit your needs. In this post we will talk about two messaging patterns – dead messages and invalid messages. While they can seem same, it is not correct. The first option, that a message can not be processed due to an absent destination. In the second case, a message will be delivered to its recipient, but such component can not process it correctly. I recommend readers to check my previous post on Vertx 4 EventBus API as an introduction to the topic and to review my book Principles of Vertx, that can serve as an architectural guide.

Dead messages pattern

We will start from simpler paradigm (I think, that it is simpler). The dead message pattern (it is also referred as dead letter channel) describes such implementation, when the sent message can not be received due to an absent recipient. In reality, we don’t care about reasons: the component could be not yet created, an address is invalid etc. The important thing is that message is not arrived to its destination. The implementation of this design pattern seems to be appropriate for one-to-one communication, such as point-to-point messaging, then for broadcasting. Let take a helicopter view on this technique:

We have here two possible outcomes. A valid destination means, that the message will be delivered to its receiver. However, an invalid destination means, that message will not be delivered and we need to care about it. Such messages can be collected in a special queue or just can be logged and dropped. For demonstration purposes, we will do the second option.

Now, let go to practice. When it comes to Vertx EventBus, we have three available protocols to communicate – broadcasting, point-to-point and a special case of the second one – request-reply. In order to implement the mentioned paradigm we need to use the last one, because it gives us a reply handler – an important tool to assert messaging results. You can use it in two ways – either by using Futures API or by using a callback (it depends on your coding style and will not affect the code). In order to maintain general compatibility with Vertx 3.x, I will stick to a callback-based style in this post.

In the code snippet below we send a message to a non-existing destination and we use a reply handler in order to find delivery outcomes:

class PublisherVerticle extends AbstractVerticle {

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        EventBus eventBus = vertx.eventBus();

        JsonObject message = new JsonObject();
        message.put("Message", "Hello world!");

        eventBus.request("dead-consumer", message, reply -> {
            if (reply.failed()) {
                Throwable error = reply.cause();
                if (error instanceof ReplyException) {

                    ReplyException exception = (ReplyException) error;
                    System.out.println(exception.failureCode());
                    System.out.println(exception.failureType());

                    ReplyFailure failure = exception.failureType();
                    if (failure == ReplyFailure.NO_HANDLERS) {
                        System.out.println("This is the dead letter");
                    }
                }
            }
            startPromise.complete();
        });
    }

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();

        PublisherVerticle publisher = new PublisherVerticle();
        vertx.deployVerticle(publisher, result -> {
            vertx.undeploy(result.result(), result2 -> vertx.close());
        });
    }
}

We call the cause() method in order to get an actual type of the exception. In cases of failed delivery, Vertx throws an ReplyException, that we can use in order to verify a particular issue that was occurred. There are two methods for that:

  • failureCode() returns an integer value, that represents the failure code; for Vertx reply errors it is -1, but you can set your own fail code in order to determine different error types.
  • failureType() returns a ReplyFailure variable. This is basically an enum, that contains several useful error types: NO_HANDLERS (that means no receivers found), TIMEOUT (timeout is expired), RECIPIENT_FAILURE (we will this later in the next section) and a general ERROR (in this case, you should not retry a delivery).

In this case, as we implement a dead message pattern, we expect to obtain the NO_HANDLERS failure result. Now we can either log the message or to collect within a dead message queue. In next sections we will observe other reasons.

Message expiration as a special case of the dead messages pattern

Before we will finish with this topic, I would like to give an outlook on a message expiration. From the architectural point of view it is considered as a special case of the dead messages pattern: the message which is expired will not be delivered to its destination, no matter if it does exist and is dropped to a dead message queue. In Vertx we need to specify a DeliveryOptions configuration object with a required timeout. If a reply is not received within that time, the reply handler will be called with a failure. By default, the sent timeout is 30 seconds. Take a look on this example:

DeliveryOptions deliveryOptions = new DeliveryOptions();
deliveryOptions.setSendTimeout(1000);

Please note, that the timeout is specified in milliseconds. This configuration should be passed to any sending method (request, publish, send). Due to the fact, that we stick with a request-reply messaging, we will use the request() method. The expected failure type in the case of expired messages is TIMEOUT, which means that no reply was received before the timeout time.

Invalid messages pattern

The previous case describes the situation where a message can not be delivered at all to the recipient and is dropped. However, there is another possibility – the recipient does exist, but it fails to proceed the message. This situation is different, because we need to first verify a message on a consumer in order to be able to say it is invalid and then top reply to the origin and to return a result. In a nutshell, the invalid message pipeline (invalid message channel) looks like below:

For an implementation we will use same request-reply messaging, however we need to have a consumer component too, because it will drop the message and will reply with a corresponding outcome. In this case we define a failure in the ConsumerVerticle using the Message.fail method. This method accepts two arguments:

  • Error code
  • Error message

Both can be used in order to differentiate error types. Take a look on the following implementation of the ConsumerVerticle.java:

class ConsumerVerticle extends AbstractVerticle {

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer("my-consumer", reply -> reply.fail(400, "Invalid message"));
        startPromise.complete();
    }

}

The expected failure type is RECIPIENT_FAILURE, that represents a rejected message (invalid message in the Vertx wording). But, additionally, you can use failureCode and getMessage functions to access your custom code and message. This is how it can look like for the PublisherVerticle verticle:

eventBus.request("my-consumer", message, reply -> {
    Throwable error = reply.cause();
    if (error instanceof ReplyException) {
        ReplyException ex = (ReplyException) error;
        ReplyFailure failure = ex.failureType();
        if (failure == ReplyFailure.RECIPIENT_FAILURE) {
            System.out.println(ex.getMessage());
            System.out.println(ex.failureCode());
        }
    }
    startPromise.complete();
    });
}

Source code

This post is a part of series of different topics regarding development with Eclipse Vertx. The common repository, that contains sources for these posts is available here.

Summary

The usage of dead messages and invalid messages patterns can be a great addition to your systems’ architecture. In this post we discovered general principles behind and provided simple implementations, however if you will add actual queues to these patterns, you can collect and handle messages, that can be delivered. The one of possible situations is to cache such messages and to retry delivery, when the recipient will become available. We also did an overview of a message expiration as a special case of the dead messages pattern. I hope, that this post will help you when designing and developing reactive systems with Vertx. In case you have questions or suggestions or just feedback, please don’t hesitate to contact me.