spring boot

This article will teach you how to run ActiveMQ on Kubernetes and integrate it with your app through Spring Boot. We will deploy a clustered ActiveMQ broker using a dedicated operator. Then we are going to build and run two Spring Boot apps. The first of them is running in multiple instances and receiving messages from the queue, while the second is sending messages to that queue. In order to test the ActiveMQ cluster, we will use Kind. The consumer app connects to the cluster using several different modes. We will discuss those modes in detail.

You can find a lot of articles about other message brokers like RabbitMQ or Kafka on my blog. If you would to read about RabbitMQ on Kubernetes please refer to that article. In order to find out more about Kafka and Spring Boot integration, you can read the article about Kafka Streams and Spring Cloud Stream available here. Previously I didn’t write much about ActiveMQ, but it is also a very popular message broker. For example, it supports the latest version of AMQP protocol, while Rabbit is based on their extension of AMQP 0.9.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the messaging directory. You will find there three Spring Boot apps: simple-producer, simple-consumer and simple-counter. After that, you should just follow my instructions. Let’s begin.

Integrate Spring Boot with ActiveMQ

Let’s begin with integration between our Spring Boot apps and the ActiveMQ Artemis broker. In fact, ActiveMQ Artemis is the base of the commercial product provided by Red Hat called AMQ Broker. Red Hat actively develops a Spring Boot starter for ActiveMQ and an operator for running it on Kubernetes. In order to access Spring Boot, you need to include the Red Hat Maven repository in your pom.xml file:

1
2
3
4
<repository>
  <id>red-hat-ga</id>
  <url>https://maven.repository.redhat.com/ga</url>
</repository>

After that, you can include a starter in your Maven pom.xml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<dependency>
  <groupId>org.amqphub.spring</groupId>
  <artifactId>amqp-10-jms-spring-boot-starter</artifactId>
  <version>2.5.6</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>log4j-over-slf4j</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Then, we just need to enable JMS for our app with the @EnableJMS annotation:

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableJms
public class SimpleConsumer {

   public static void main(String[] args) {
      SpringApplication.run(SimpleConsumer.class, args);
   }

}

Our application is very simple. It just receives and prints an incoming message. The method for receiving messages should be annotated with @JmsListener. The destination field contains the name of a target queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
      .getLogger(Listener.class);

   @JmsListener(destination = "test-1")
   public void processMsg(SimpleMessage message) {
      LOG.info("============= Received: " + message);
   }

}

Here’s the class that represents our message:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SimpleMessage implements Serializable {

   private Long id;
   private String source;
   private String content;

   public SimpleMessage() {
   }

   public SimpleMessage(Long id, String source, String content) {
      this.id = id;
      this.source = source;
      this.content = content;
   }

   // ... GETTERS AND SETTERS

   @Override
   public String toString() {
      return "SimpleMessage{" +
              "id=" + id +
              ", source='" + source + '\'' +
              ", content='" + content + '\'' +
              '}';
   }
}

Finally, we need to set connection configuration settings. With AMQP Spring Boot starter it is very simple. We just need to set the property amqphub.amqp10jms.remoteUrl. For now, we are going to base on the environment variable set at the level of Kubernetes Deployment.

1
amqphub.amqp10jms.remoteUrl = ${ARTEMIS_URL}

The producer application is pretty similar. Instead of the annotation for receiving messages, we use Spring JmsTemplate for producing and sending messages to the target queue. The method for sending messages is exposed as an HTTP POST /producer/send endpoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RestController
@RequestMapping("/producer")
public class ProducerController {

   private static long id = 1;
   private final JmsTemplate jmsTemplate;
   @Value("${DESTINATION}")
   private String destination;

   public ProducerController(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
   }

   @PostMapping("/send")
   public SimpleMessage send(@RequestBody SimpleMessage message) {
      if (message.getId() == null) {
          message.setId(id++);
      }
      jmsTemplate.convertAndSend(destination, message);
      return message;
   }
}

Create a Kind cluster with Nginx Ingress

Our example apps are ready. Before deploying them, we need to prepare the local Kubernetes cluster. We will deploy there the ActiveMQ cluster consisting of three brokers. Therefore, our Kubernetes cluster will also consist of three nodes. Consequently, there are three instances of the consumer app running on Kubernetes. They are connecting to the ActiveMQ brokers over the AMQP protocol. There is also a single instance of the producer app that sends messages on demand. Here’s the diagram of our architecture.

activemq-spring-boot-kubernetes-arch

In order to run a multi-node Kubernetes cluster locally, we will use Kind. We will test not only communication over AMQP protocol but also expose the ActiveMQ management console over HTTP. Because ActiveMQ uses headless services for exposing a web console we have to create and configure Ingress on Kind to access it. Let’s begin.

In the first step, we are going to create a Kind cluster. It consists of a control plane and three workers. The configuration has to be prepared correctly to run the Nginx Ingress Controller. We should add the ingress-ready label to a single worker node and expose ports 80 and 443. Here’s the final version of a Kind config file:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
    kubeadmConfigPatches:
    - |
      kind: JoinConfiguration
      nodeRegistration:
        kubeletExtraArgs:
          node-labels: "ingress-ready=true"      
    extraPortMappings:
    - containerPort: 80
      hostPort: 80
      protocol: TCP
    - containerPort: 443
      hostPort: 443
      protocol: TCP  
  - role: worker
  - role: worker

Now, let’s create a Kind cluster by executing the following command:

1
$ kind create cluster --config kind-config.yaml

If your cluster has been successfully created you should see similar information:

|520x345

After that, let’s install the Nginx Ingress Controller. It is just a single command:

1
$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml

Let’s verify the installation:

1
2
3
4
5
$ kubectl get pod -n ingress-nginx
NAME                                        READY   STATUS      RESTARTS  AGE
ingress-nginx-admission-create-wbbzh        0/1     Completed   0         1m
ingress-nginx-admission-patch-ws2mv         0/1     Completed   0         1m
ingress-nginx-controller-86b6d5756c-rkbmz   1/1     Running     0         1m

Install ActiveMQ Artemis on Kubernetes

Finally, we may proceed to the ActiveMQ Artemis installation. Firstly, let’s install the required CRDs. You may find all the YAML manifests inside the operator repository on GitHub.

1
2
$ git clone https://github.com/artemiscloud/activemq-artemis-operator.git
$ cd activemq-artemis-operator

The manifests with CRDs are located in the deploy/crds directory:

1
$ kubectl create -f ./deploy/crds

After that, we can install the operator:

1
2
3
4
5
6
7
$ kubectl create -f ./deploy/service_account.yaml
$ kubectl create -f ./deploy/role.yaml
$ kubectl create -f ./deploy/role_binding.yaml
$ kubectl create -f ./deploy/election_role.yaml
$ kubectl create -f ./deploy/election_role_binding.yaml
$ kubectl create -f ./deploy/operator_config.yaml
$ kubectl create -f ./deploy/operator.yaml

In order to create a cluster, we have to create the ActiveMQArtemis object. It contains a number of brokers being a part of the cluster (1). We should also set the accessor, to expose the AMQP port outside of every single broker pod (2). Of course, we will also expose the management console (3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemis
metadata:
  name: ex-aao
spec:
  deploymentPlan:
    size: 3 # (1)
    image: placeholder
    messageMigration: true
    resources:
      limits:
        cpu: "500m"
        memory: "1024Mi"
      requests:
        cpu: "250m"
        memory: "512Mi"
  acceptors: # (2)
    - name: amqp
      protocols: amqp
      port: 5672
      connectionsAllowed: 5
  console: # (3)
    expose: true

Once the ActiveMQArtemis is created, and the operator starts the deployment process. It creates the StatefulSet object:

1
2
3
$ kubectl get statefulset
NAME        READY   AGE
ex-aao-ss   3/3     1m

It starts all three pods with brokers sequentially:

1
2
3
4
5
$ kubectl get pod -l application=ex-aao-app
NAME          READY   STATUS    RESTARTS    AGE
ex-aao-ss-0   1/1     Running   0           5m
ex-aao-ss-1   1/1     Running   0           3m
ex-aao-ss-2   1/1     Running   0           1m

Let’s display a list of Services created by the operator. There is a single Service per broker for exposing the AMQP port (ex-aao-amqp-*) and web console (ex-aao-wsconsj-*):

activemq-spring-boot-kubernetes-services|696x205

The operator automatically creates Ingress objects per each web console Service. We will modify them by adding different hosts. Let’s say that is the one.activemq.com domain for the first broker, two.activemq.com for the second broker, etc.

1
2
3
4
5
$ kubectl get ing    
NAME                      CLASS    HOSTS                  ADDRESS     PORTS   AGE
ex-aao-wconsj-0-svc-ing   <none>   one.activemq.com       localhost   80      1h
ex-aao-wconsj-1-svc-ing   <none>   two.activemq.com       localhost   80      1h
ex-aao-wconsj-2-svc-ing   <none>   three.activemq.com                  localhost   80      1h

After creating ingresses we would have to add the following line in /etc/hosts.

1
127.0.0.1    one.activemq.com two.activemq.com three.activemq.com

Now, we access the management console, for example for the third broker under the following URL http://three.activemq.com/console.

activemq-spring-boot-kubernetes-console|696x377

Once the broker is ready, we may define a test queue. The name of that queue is test-1.

1
2
3
4
5
6
7
8
apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemisAddress
metadata:
  name: address-1
spec:
  addressName: address-1
  queueName: test-1
  routingType: anycast

Run the Spring Boot app on Kubernetes and connect to ActiveMQ

Now, let’s deploy the consumer app. In the Deployment manifest, we have to set the ActiveMQ cluster connection URL. But wait… how to connect it? There are three brokers exposed using three separate Kubernetes Services. Fortunately, the AMQP Spring Boot starter supports it. We may set the addresses of three brokers inside the failover section. Let’s try it to see what will happen.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-consumer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-consumer
  template:
    metadata:
      labels:
        app: simple-consumer
    spec:
      containers:
      - name: simple-consumer
        image: piomin/simple-consumer
        env:
          - name: ARTEMIS_URL
            value: failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)
        resources:
          limits:
            memory: 256Mi
            cpu: 500m
          requests:
            memory: 128Mi
            cpu: 250m

The application is prepared to be deployed with Skaffold. If you run the skaffold dev command you will deploy and see the logs of all three instances of the consumer app. What’s the result? All the instances connect to the first URL from the list as shown below.

spring boot

Fortunately, there is a failover parameter that helps distribute client connections more evenly across multiple remote peers. With the failover.randomize option, URIs are randomly shuffled before attempting to connect to one of them. Let’s replace the ARTEMIS_URL env in the Deployment manifest with the following line:

1
failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)?failover.randomize=true

The distribution between broker instances looks slightly better. Of course, the result is random, so you may get different results.

spring boot

The first way to distribute the connections is through the dedicated Kubernetes Service. We don’t have to leverage the services created automatically by the operator. We can create our own Service that load balances between all available pods with brokers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
kind: Service
apiVersion: v1
metadata:
  name: ex-aao-amqp-lb
spec:
  ports:
    - name: amqp
      protocol: TCP
      port: 5672
  type: ClusterIP
  selector:
    application: ex-aao-app

Now, we can resign from the failover section on the client side and fully rely on Kubernetes mechanisms.

1
2
3
4
5
6
7
spec:
  containers:
  - name: simple-consumer
    image: piomin/simple-consumer
    env:
      - name: ARTEMIS_URL
        value: amqp://ex-aao-amqp-lb:5672

This time we won’t see anything in the application logs, because all the instances connect to the same URL. We can verify a distribution between all the broker instances using e.g. the management web console. Here’s a list of consumers on the first instance of ActiveMQ:

Active MQ

Below, you will exactly the same results for the second instance. All the consumer app instances have been distributed equally between all available brokers inside the cluster.

cluster

Now, we are going to deploy the producer app. We use the same Kubernetes Service for connecting the ActiveMQ cluster.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-producer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-producer
  template:
    metadata:
      labels:
        app: simple-producer
    spec:
      containers:
        - name: simple-producer
          image: piomin/simple-producer
          env:
            - name: ARTEMIS_URL
              value: amqp://ex-aao-amqp-lb:5672
            - name: DESTINATION
              value: test-1
          ports:
            - containerPort: 8080

Because we have to call the HTTP endpoint let’s create the Service for the producer app:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
apiVersion: v1
kind: Service
metadata:
  name: simple-producer
spec:
  type: ClusterIP
  selector:
    app: simple-producer
  ports:
  - port: 8080

Let’s deploy the producer app using Skaffold with port-forwarding enabled:

1
$ skaffold dev --port-forward

Here’s a list of our Deployments:

list of deployment

In order to send a test message just execute the following command:

1
2
3
$ curl http://localhost:8080/producer/send \
  -d "{\"source\":\"test\",\"content\":\"Hello\"}" \
  -H "Content-Type:application/json"

Advanced configuration

If you need more advanced traffic distribution between brokers inside the cluster you can achieve it in several ways. For example, we can dynamically override configuration property on runtime. Here’s a very simple example. After starting the application we are connecting the external service over HTTP. It returns the next instance number.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Configuration
public class AmqpConfig {

    @PostConstruct
    public void init() {
        RestTemplate t = new RestTemplateBuilder().build();
        int x = t.getForObject("http://simple-counter:8080/counter", Integer.class);
        System.setProperty("amqphub.amqp10jms.remoteUrl",
                "amqp://ex-aao-amqp-" + x + "-svc:5672");
    }

}

Here’s the implementation of the counter app. It just increments the number and divides it by the number of the broker instances. Of course, we may create a more advanced implementation, and provide e.g. connection to the instance of a broker running on the same Kubernetes node as the app pod.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SpringBootApplication
@RestController
@RequestMapping("/counter")
public class CounterApp {

   private static int c = 0;

   public static void main(String[] args) {
      SpringApplication.run(CounterApp.class, args);
   }

   @Value("${DIVIDER:0}")
   int divider;

   @GetMapping
   public Integer count() {
      if (divider > 0)
         return c++ % divider;
      else
         return c++;
   }
}

Final Thoughts

ActiveMQ is an interesting alternative to RabbitMQ as a message broker. In this article, you learned how to run, manage and integrate ActiveMQ with Spring Boot on Kubernetes. It can be declaratively managed on Kubernetes thanks to ActiveMQ Artemis Operator. You can also easily integrate it with Spring Boot using a dedicated starter. It provides various configuration options and is actively developed by Red Hat and the community.

Reference https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/