rabbit

🐇 Introduction

One day I received a ticket where I needed to write a Consumer that would accept batch messages.

Naturally, as a TDD guy, I wrote tests where I simulated an error while the consumer was processing batch messages and expected all failed messages to be redirected to the Dead Letter Queue.

However, I was surprised that not a single message went to the Dead Letter Queue as in normal queue error handling cases.

As it turned out, Spring uses RejectAndDontRequeueRecoverer by default, and this object processes only one message at a time, but we need to process the list at once, but how?

After a long googling 🤪 , on the back of spring docs, I found a magic MessageBatchRecoverer 🌚 interface that will help to manage failed batch messages.

😈 MessageBatchRecoverer

When a batch is being processed at once, retry recoverer helps to handle errors. If something goes wrong, you should include the index of the record where the error happened in the exception. This way, the tool can properly handle the remaining records without losing data.

The arguments of this method are:

  1. List var1 — It is clear that this is the list of our messages, during the processing of which an unknown error occurred.
  2. Throwable var2 — an error due to which we found ourselves in this method.
package org.springframework.amqp.rabbit.retry;

import java.util.List;
import org.springframework.amqp.core.Message;

@FunctionalInterface
public interface MessageBatchRecoverer extends MessageRecoverer {

   void recover(List<Message> var1, Throwable var2);

}

For instance, if you received 10 messages and 9 of them were processed normally, but an error occurred while processing the 10th message, all 10 messages will be moved to MessageBatchRecoverer .

🧬 Code

build.gradle.kts

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-amqp")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.boot:spring-boot-testcontainers")
    testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
    testImplementation("org.springframework.amqp:spring-rabbit-test")
    testImplementation("org.testcontainers:junit-jupiter")
    testImplementation("org.testcontainers:rabbitmq")
    testImplementation("org.mockito.kotlin:mockito-kotlin:3.2.0")
    testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

application.yaml

rabbitmq:
  exchange: message-batcher-exchange
  queue: message-batcher-queue

spring:
  application:
      name: message-batcher-recoverer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        batch-size: 10
        prefetch: 20
server:
  port: 8080

🚩CustomMessageBatchRecoverer.kt 🚩

import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer
import org.springframework.stereotype.Component

@Component
class CustomMessageBatchRecoverer : MessageBatchRecoverer {

    private val rejectAndDontRequeueRecoverer = RejectAndDontRequeueRecoverer()

    override fun recover(messages: List<Message>, cause: Throwable) {
        println("Recovering batch of messages due to: ${cause.message}")
        messages.forEach { message ->
           /*
             * Custom logic to handle each message
             * For example, you can log the message, send it to a dead-letter queue, etc.
            */
            rejectAndDontRequeueRecoverer.recover(message, cause)
        }
    }
}

RejectAndDontRequeueRecoverer is a MessageRecoverer implementation that tells the listener container to reject the message without requeuing it. This allows failed messages to be routed to a Dead Letter Queue, if the broker is configured to support it.

Of course, you can also implement your own custom logic to handle failed messages, including those in batch.

👑 RabbitMQConfig.kt 👑

import com.illia.ponomarov.medium.messagebatcherrecoverer.recoverer.CustomMessageBatchRecoverer
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.QueueBuilder
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class RabbitMQConfig(
    @Value("\${rabbitmq.exchange}") val exchangeName: String,
    @Value("\${rabbitmq.queue}") val queueName: String,
) {

    @Bean
    fun queue() = QueueBuilder.durable(queueName)
                              .quorum()
                              .deadLetterExchange("")
                              .deadLetterRoutingKey("$queueName.dlq")
                              .build()

    @Bean
    fun dlq() = QueueBuilder.durable("$queueName.dlq")
                            .quorum()
                            .build()

    @Bean
    fun exchange() = TopicExchange(exchangeName, true, false)

    @Bean
    fun biding(queue: Queue, exchange: TopicExchange) =
        BindingBuilder.bind(queue)
                      .to(exchange)
                      .with(queueName)

    @Bean("batchListenerContainerFactory")
    fun batchListenerContainerFactory(
        connectionFactory: ConnectionFactory,
        customMessageBatchRecover: CustomMessageBatchRecoverer,
    ): SimpleRabbitListenerContainerFactory =
        SimpleRabbitListenerContainerFactory().apply {
            val retryInterceptor = RetryInterceptorBuilder.stateless()
                .recoverer(customMessageBatchRecover)
                .build()

            setMessageConverter(SimpleMessageConverter())
            setConnectionFactory(connectionFactory)
            setConsumerBatchEnabled(true)
            setAdviceChain(retryInterceptor)
        }
}

We use RetryInterceptorBuilder 💖 if we want to specify the behavior of our recoverer if an error occurs during message processing in Consumer.

In our case, we have created a custom ⚙️ CustomMessageBatchRecoverer component, which implements the MessageBatchRecoverer interface.

🚩 We also need to enable batch consumers.

We do this through the set method
setConsumerBatchEnabled(true)

🚩 Note , if we set to true, Spring will also default to set batchListener to true.

public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
  this.consumerBatchEnabled = consumerBatchEnabled;
  if (consumerBatchEnabled) {
    this.setBatchListener(true);
  }
}

HelloWorldRequest.kt

import java.io.Serializable
import java.time.LocalDateTime
import java.util.UUID

data class HelloWorldRequest(
    val id: UUID = UUID.randomUUID(),
    val message: String,
    val date: LocalDateTime = LocalDateTime.now()
) : Serializable

The structure of the message we are going to process.

HelloWorldService.kt

import org.springframework.stereotype.Service

@Service
class HelloWorldService{
    fun sendHelloWorldRequest(request: HelloWorldRequest) {
        println("Sending HelloWorldRequest: $request")
    }
}

HelloWorldConsumer.kt

import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component


@Component
class HelloWorldConsumer(
    private val helloWorldService: HelloWorldService
) {

    @RabbitListener(
        queues = ["\${rabbitmq.queue}"],
        containerFactory = "batchListenerContainerFactory"
    )
    fun consume(message: List<Message>) {
        message.forEach { helloWorldRequest ->
            println("id: ${helloWorldRequest}")
            helloWorldService.sendHelloWorldRequest(
                HelloWorldRequest(
                    message = helloWorldRequest.body.decodeToString()
                )
            )
        }
        println("Processed message: $message")
    }
}

When creating the batcher consumer, the main thing is to specify the containerFactory in which we use our customized CustomMessageBatchRecoverer.kt ✨ .

ConsumerTest.kt

import org.mockito.kotlin.any
import org.mockito.kotlin.doThrow
import org.mockito.kotlin.whenever
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.springframework.test.context.bean.override.mockito.MockitoBean
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.shaded.org.awaitility.Awaitility
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.test.Test

@SpringBootTest
@Testcontainers
class ConsumerTest {

    companion object {
        @Container
        private val rabbitMQContainer = RabbitMQContainer("rabbitmq:latest")

        @JvmStatic
        @DynamicPropertySource
        fun registerProperties(registry: DynamicPropertyRegistry) {
            registry.add("spring.rabbitmq.host") { rabbitMQContainer.host }
            registry.add("spring.rabbitmq.port") { rabbitMQContainer.amqpPort }
            registry.add("spring.rabbitmq.username") { rabbitMQContainer.adminUsername }
            registry.add("spring.rabbitmq.password") { rabbitMQContainer.adminPassword }
        }
    }


    @Value("\${rabbitmq.queue}")
    private lateinit var queueName: String

    @Autowired
    lateinit var rabbitmqTemplate: RabbitTemplate

    @Autowired
    lateinit var amqpAdmin: AmqpAdmin

    @MockitoBean
    lateinit var helloWorldService: HelloWorldService

    @Test
    fun testConsumer() {

        /**
         * * Simulate a failure in the HelloWorldService
         */
        doThrow(RuntimeException("Test exception"))
            .whenever(
                helloWorldService
            ).sendHelloWorldRequest(
                any<HelloWorldRequest>()
            )

        /*
            * Send 20 messages to the queue
         */
        (0 until 20).map {
            rabbitmqTemplate.convertAndSend(
                queueName,
                HelloWorldRequest(
                    id = UUID.randomUUID(),
                    message = "Hello World $it"
                )
            )
        }

        /**
         * * Wait for the messages to be processed and check that the HelloWorldService was called 20 times
         */

        Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted {
            amqpAdmin.getQueueInfo("$queueName.dlq")?.let {
                assert(it.messageCount == 20) {
                    "Expected 20 messages in the DLQ, but found ${it.messageCount}"
                }
            }
        }
    }

}

In the testConsumer test method, we specify that when the method is called in the HelloWorldService, we will always throw an RuntimeException, which will cause our batch message to be redirected to our recoverer 🦾.

Naturally, we expect our Dead Letter Queue to contain all 20 messages.

Conclusion

In this article, I wanted to show you how we can manage batch messages that we may get an error while processing.

I have attached the materials I used to write the article.

Github

Link to github project

Links

  1. https://docs.spring.io/spring-amqp/docs/current/api/org/springframework/amqp/rabbit/retry/RejectAndDontRequeueRecoverer.html?source=post_page-----366de355b390---------------------------------------

  2. https://docs.spring.io/spring-amqp/docs/2.3.1/api/index.html?org%2Fspringframework%2Famqp%2Frabbit%2Fretry%2FMessageBatchRecoverer.html=&source=post_page-----366de355b390---------------------------------------

  3. https://docs.spring.io/spring-retry/docs/api/current/org/springframework/retry/interceptor/RetryInterceptorBuilder.html?source=post_page-----366de355b390---------------------------------------

  4. https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/config/RetryInterceptorBuilder.html?source=post_page-----366de355b390---------------------------------------