🐇 Introduction
One day I received a ticket where I needed to write a Consumer that would accept batch messages.
Naturally, as a TDD guy, I wrote tests where I simulated an error while the consumer was processing batch messages and expected all failed messages to be redirected to the Dead Letter Queue.
However, I was surprised that not a single message went to the Dead Letter Queue as in normal queue error handling cases.
As it turned out, Spring uses RejectAndDontRequeueRecoverer by default, and this object processes only one message at a time, but we need to process the list at once, but how?
After a long googling 🤪 , on the back of spring docs, I found a magic MessageBatchRecoverer 🌚 interface that will help to manage failed batch messages.
😈 MessageBatchRecoverer
When a batch is being processed at once, retry recoverer helps to handle errors. If something goes wrong, you should include the index of the record where the error happened in the exception. This way, the tool can properly handle the remaining records without losing data.
The arguments of this method are:
- List var1 — It is clear that this is the list of our messages, during the processing of which an unknown error occurred.
- Throwable var2 — an error due to which we found ourselves in this method.
package org.springframework.amqp.rabbit.retry;
import java.util.List;
import org.springframework.amqp.core.Message;
@FunctionalInterface
public interface MessageBatchRecoverer extends MessageRecoverer {
void recover(List<Message> var1, Throwable var2);
}
For instance, if you received 10 messages and 9 of them were processed normally, but an error occurred while processing the 10th message, all 10 messages will be moved to MessageBatchRecoverer .
🧬 Code
build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.boot:spring-boot-testcontainers")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("org.springframework.amqp:spring-rabbit-test")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:rabbitmq")
testImplementation("org.mockito.kotlin:mockito-kotlin:3.2.0")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}
application.yaml
rabbitmq:
exchange: message-batcher-exchange
queue: message-batcher-queue
spring:
application:
name: message-batcher-recoverer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
batch-size: 10
prefetch: 20
server:
port: 8080
🚩CustomMessageBatchRecoverer.kt 🚩
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer
import org.springframework.stereotype.Component
@Component
class CustomMessageBatchRecoverer : MessageBatchRecoverer {
private val rejectAndDontRequeueRecoverer = RejectAndDontRequeueRecoverer()
override fun recover(messages: List<Message>, cause: Throwable) {
println("Recovering batch of messages due to: ${cause.message}")
messages.forEach { message ->
/*
* Custom logic to handle each message
* For example, you can log the message, send it to a dead-letter queue, etc.
*/
rejectAndDontRequeueRecoverer.recover(message, cause)
}
}
}
RejectAndDontRequeueRecoverer is a MessageRecoverer implementation that tells the listener container to reject the message without requeuing it. This allows failed messages to be routed to a Dead Letter Queue, if the broker is configured to support it.
Of course, you can also implement your own custom logic to handle failed messages, including those in batch.
👑 RabbitMQConfig.kt 👑
import com.illia.ponomarov.medium.messagebatcherrecoverer.recoverer.CustomMessageBatchRecoverer
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.QueueBuilder
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class RabbitMQConfig(
@Value("\${rabbitmq.exchange}") val exchangeName: String,
@Value("\${rabbitmq.queue}") val queueName: String,
) {
@Bean
fun queue() = QueueBuilder.durable(queueName)
.quorum()
.deadLetterExchange("")
.deadLetterRoutingKey("$queueName.dlq")
.build()
@Bean
fun dlq() = QueueBuilder.durable("$queueName.dlq")
.quorum()
.build()
@Bean
fun exchange() = TopicExchange(exchangeName, true, false)
@Bean
fun biding(queue: Queue, exchange: TopicExchange) =
BindingBuilder.bind(queue)
.to(exchange)
.with(queueName)
@Bean("batchListenerContainerFactory")
fun batchListenerContainerFactory(
connectionFactory: ConnectionFactory,
customMessageBatchRecover: CustomMessageBatchRecoverer,
): SimpleRabbitListenerContainerFactory =
SimpleRabbitListenerContainerFactory().apply {
val retryInterceptor = RetryInterceptorBuilder.stateless()
.recoverer(customMessageBatchRecover)
.build()
setMessageConverter(SimpleMessageConverter())
setConnectionFactory(connectionFactory)
setConsumerBatchEnabled(true)
setAdviceChain(retryInterceptor)
}
}
We use RetryInterceptorBuilder 💖 if we want to specify the behavior of our recoverer if an error occurs during message processing in Consumer.
In our case, we have created a custom ⚙️ CustomMessageBatchRecoverer component, which implements the MessageBatchRecoverer interface.
🚩 We also need to enable batch consumers.
We do this through the set method
setConsumerBatchEnabled(true)
🚩 Note , if we set to true, Spring will also default to set batchListener to true.
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
if (consumerBatchEnabled) {
this.setBatchListener(true);
}
}
HelloWorldRequest.kt
import java.io.Serializable
import java.time.LocalDateTime
import java.util.UUID
data class HelloWorldRequest(
val id: UUID = UUID.randomUUID(),
val message: String,
val date: LocalDateTime = LocalDateTime.now()
) : Serializable
The structure of the message we are going to process.
HelloWorldService.kt
import org.springframework.stereotype.Service
@Service
class HelloWorldService{
fun sendHelloWorldRequest(request: HelloWorldRequest) {
println("Sending HelloWorldRequest: $request")
}
}
HelloWorldConsumer.kt
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class HelloWorldConsumer(
private val helloWorldService: HelloWorldService
) {
@RabbitListener(
queues = ["\${rabbitmq.queue}"],
containerFactory = "batchListenerContainerFactory"
)
fun consume(message: List<Message>) {
message.forEach { helloWorldRequest ->
println("id: ${helloWorldRequest}")
helloWorldService.sendHelloWorldRequest(
HelloWorldRequest(
message = helloWorldRequest.body.decodeToString()
)
)
}
println("Processed message: $message")
}
}
When creating the batcher consumer, the main thing is to specify the containerFactory in which we use our customized CustomMessageBatchRecoverer.kt ✨ .
ConsumerTest.kt
import org.mockito.kotlin.any
import org.mockito.kotlin.doThrow
import org.mockito.kotlin.whenever
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.springframework.test.context.bean.override.mockito.MockitoBean
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.shaded.org.awaitility.Awaitility
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.test.Test
@SpringBootTest
@Testcontainers
class ConsumerTest {
companion object {
@Container
private val rabbitMQContainer = RabbitMQContainer("rabbitmq:latest")
@JvmStatic
@DynamicPropertySource
fun registerProperties(registry: DynamicPropertyRegistry) {
registry.add("spring.rabbitmq.host") { rabbitMQContainer.host }
registry.add("spring.rabbitmq.port") { rabbitMQContainer.amqpPort }
registry.add("spring.rabbitmq.username") { rabbitMQContainer.adminUsername }
registry.add("spring.rabbitmq.password") { rabbitMQContainer.adminPassword }
}
}
@Value("\${rabbitmq.queue}")
private lateinit var queueName: String
@Autowired
lateinit var rabbitmqTemplate: RabbitTemplate
@Autowired
lateinit var amqpAdmin: AmqpAdmin
@MockitoBean
lateinit var helloWorldService: HelloWorldService
@Test
fun testConsumer() {
/**
* * Simulate a failure in the HelloWorldService
*/
doThrow(RuntimeException("Test exception"))
.whenever(
helloWorldService
).sendHelloWorldRequest(
any<HelloWorldRequest>()
)
/*
* Send 20 messages to the queue
*/
(0 until 20).map {
rabbitmqTemplate.convertAndSend(
queueName,
HelloWorldRequest(
id = UUID.randomUUID(),
message = "Hello World $it"
)
)
}
/**
* * Wait for the messages to be processed and check that the HelloWorldService was called 20 times
*/
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted {
amqpAdmin.getQueueInfo("$queueName.dlq")?.let {
assert(it.messageCount == 20) {
"Expected 20 messages in the DLQ, but found ${it.messageCount}"
}
}
}
}
}
In the testConsumer test method, we specify that when the method is called in the HelloWorldService, we will always throw an RuntimeException, which will cause our batch message to be redirected to our recoverer 🦾.
Naturally, we expect our Dead Letter Queue to contain all 20 messages.
Conclusion
In this article, I wanted to show you how we can manage batch messages that we may get an error while processing.
I have attached the materials I used to write the article.
Github
Link to github project