Introduction
Suppose our application organizes asynchronous domain logic inside DomainEventListener
’s like the following code snippet.
|
|
This post shows how to integrate these listeners with Spring AMQP by taking advantage of Spring’s infrastructure. We also briefly discuss several important topics on using messaging middleware.
Up and running
|
|
The web management UI is available at http://localhost:15672. The default username and
password are both guest
.
Messaging with RabbitMQ is a very basic Hello World example for Spring Boot integration.
For a more production-ready setting, it’s mandatory to refer to the Spring AMQP reference documentation.
AMQP
AMQP (Advanced Message Queuing Protocol) is an application layer protocol for messaging middleware. RabbitMQ primarily supports the AMQP 0-9-1 model. It mainly revolves around the following three AMQP entities:
- exchange
- queue
- binding
Exchange, queue, and binding
Messages are published to an exchange and consumed from a queue. A queue can subscribe to exchanges with bindings. When a message arrives at an exchange, it will be routed to different queues based on:
- the routing key in the message, and
- the type of the addressed exchange.
Routing
The four basic types of exchange are:
- Direct exchange
- Fanout exchange
- Topic exchange
- Headers exchange
The routing mechanism further decouples the publisher and subscriber, making it easy to adapt to different workflows. It may look overwhelming at first sight, so we are sticking with the simplest Direct Exchange. In this case, the message will be routed to subscribing queues with the same routing key.
To learn more about RabbitMQ and AMQP, I would recommend RabbitMQ in Depth. Its clear explanation and illustration make it easier to grasp these concepts.
Declarative configuration with Spring AMQP
With Spring AMQP, we can configure the required AMQP entities in the message broker by registering them as beans in the application context. At runtime, Spring AMQP will issue requests to the broker and create them.
I like this declarative approach. It feels like Kubernetes without nasty YAML configurations.
|
|
In this code snippet, we first declare an application-wide domainEventExchange
. Then, we loop over
the DomainEventListener
’s in the application context and create the necessary queues and bindings.
Note that:
- We can declare multiple AMQP entities in a
Declarables
. They don’t need to be the same type. durable
means the created entity will survive broker restart.
Listener container
So far, we have declared desired entities in the broker, but haven’t yet wired up our DomainEventListener
’s to consume
messages from it and perform business logic. To do so, we need to wrap our event handlers
inside a MessageListenerContainer
.
A MessageListenerContainer
represents an active or hot component. It handles the connection to the message broker.
When the connection is broken, SimpleMessageListenerConainer
will try to restart the listener. As a lifecycle
component, it provides methods for starting and stopping.
To programmatically register our domain listeners, we can implement RabbitListenerConfigurer
and use
the RabbitListenerEndpointRegistrar
like this:
|
|
Spring AMQP will take care of creating the listener containers at runtime. Note that we passed an instance
of RabbitListenerContainerFactory
to the registrar. We will see that we can configure common properties of the
containers through the container factory.
Check out more details at:
Messages are ephemeral
It’s important to realize that in the AMQP model, the message broker just acts as a postman between sender and receiver. It holds on to the messages only temporarily. After successful delivery to all consumers, the message is usually removed from the broker.
In contrast, Redis Stream and Kafka are more like databases that store the messages until explicitly deleted.
Some repercussions:
- It’s easy to lose messages if something is misconfigured. For example, a message published to an exchange with no bound queues will be dropped. Conceptually, it seems only queues in AMQP have memory.
- No first-class support for doing CRUD on messages. For example, the web UI does not have a list screen to page through the messages. It’s possible to retrieve a few ones in the queue detail page, but it has a warning that says “Getting messages from a queue is a destructive action.” Although not necessarily always “destructive”, the action will probably cause side effects on the message.
- Spring AMQP shares this kind of mindset. For example, it is important to configure a
MessageRecoverer
in aRetryInterceptor
. By default, Spring AMQP will drop the message after retrying for configured times and issue a warning. Retry is discussed in the following section.
Resilience
Spring AMQP Reference: Resilience: Recovering from Errors and Broker Failures
Acknowledgment
Message consumers can fail at any time (due to business exceptions, dropped connections, application crash, etc). To prevent losing messages in this way, message brokers use acknowledgments: a message is removed from the broker only after the client explicitly acknowledges that it has processed it.
In Laravel, when a job is taken from the queue, it’s placed at
myqueue:reserved
key at the same time. These two steps form an atomic operation by using Lua scripting. This can also be seen as a form of acknowledgment.
Dead letter exchange
RabbitMQ Dead Letter Documentation
Suppose there is a mal-formed message. In our Spring application, if our message handlers throw an exception due to this message, by default, the message will be re-queued and delivered again, resulting in an infinite loop. A viable solution would be sending the bad message to other places for inspection after a few retries.
RabbitMQ can handle this situation with dead letter exchanges. When our consumer negatively acknowledges a message, the queue will “dead-letter” the message (annotate with some information about the failure) and route it to the configured dead letter exchange. This configuration can be applied globally in the broker with a policy or specified when creating a queue.
|
|
You can also apply a policy in the RabbitMQ web UI under the “admin” tag.
Note that the dead letter exchange is just a regular exchange. If you specify a non-existent exchange, RabbitMQ does not create it automatically. This is a type of misconfiguration that can cause lost messages.
AMQP entity declaration in Spring for configuring dead letter exchanges:
|
|
For each listener, we create:
- a corresponding dead letter queue, and
- its binding to the application-wide
deadLetterExchange()
.
By properly configuring listening queues and bindings, we ensure dead letters won’t get lost.
Then, update our queues()
code to configure the queues’ dead letter exchange.
|
|
We also need to set up our listener containers to instruct RabbitMQ not to requeue rejected messages. If a dead letter exchange is configured on the queue, the message will be routed to it. Otherwise, the message will be dropped.
Laravel’s failed jobs table is the equivalent concept.
Retry
It may be helpful to retry a few times before routing a failed message to the dead letter exchange. Spring provides some retry helpers, and we can configure them in the container factory like this:
|
|
Reference
https://blog.yujinyan.me/posts/notes-rabbitmq-spring-boot/