Introduction to Apache Kafka
Apache Kafka has become the de facto standard for building real-time data pipelines and streaming applications. Combined with Docker for containerization and Spring Boot for Java development, you get a powerful, scalable, and developer-friendly stack.
In this comprehensive guide, we'll build a production-ready Kafka application using Docker and Spring Boot, covering everything from basic setup to advanced patterns.
Why Kafka + Docker + Spring Boot?
Apache Kafka Benefits
- High Throughput: Handle millions of messages per second
- Scalability: Horizontal scaling with partitions
- Durability: Persistent storage with replication
- Real-time Processing: Low-latency message delivery
Docker Advantages
- Consistent development environments
- Easy Kafka cluster setup
- Simplified deployment
- Version management
Spring Boot Integration
- Spring Kafka abstraction layer
- Auto-configuration
- Easy serialization/deserialization
- Excellent error handling
Setting Up Kafka with Docker
Docker Compose Configuration
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka-data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
volumes:
zookeeper-data:
zookeeper-logs:
kafka-data:
Start your Kafka cluster:
docker-compose up -d
Spring Boot Kafka Producer
Dependencies (Maven)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
Configuration
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
linger.ms: 10
batch.size: 16384
consumer:
group-id: my-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "*"
Producer Implementation
@Service
@Slf4j
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessage(String topic, String key, Object message) {
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Message sent successfully: topic={}, partition={}, offset={}",
topic,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("Failed to send message: {}", ex.getMessage());
}
});
}
}
Spring Boot Kafka Consumer
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "user-events", groupId = "my-consumer-group")
public void consumeUserEvents(
@Payload UserEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Received message: event={}, partition={}, offset={}",
event, partition, offset);
// Process your message here
processEvent(event);
}
@KafkaListener(
topics = "order-events",
containerFactory = "kafkaListenerContainerFactory",
errorHandler = "kafkaErrorHandler"
)
public void consumeOrderEvents(@Payload OrderEvent event) {
log.info("Processing order: {}", event);
// Business logic here
}
private void processEvent(UserEvent event) {
// Your business logic
}
}
Advanced Configuration
Custom Kafka Configuration
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
Error Handling and Retry
@Component
@Slf4j
public class KafkaErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
log.error("Error processing message: {}", message.getPayload(), exception);
// Implement your retry logic or dead letter queue
return null;
}
}
// Configure retry with backoff
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
retryKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3L)
));
return factory;
}
Testing Kafka with Testcontainers
@SpringBootTest
@Testcontainers
class KafkaIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private KafkaProducerService producerService;
@Test
void testSendMessage() {
UserEvent event = new UserEvent("user123", "login");
producerService.sendMessage("user-events", "key1", event);
// Add assertions
}
}
Production Best Practices
1. Partitioning Strategy
Use proper key selection for even distribution and ordering guarantees.
2. Monitoring
- Use Kafka UI or Prometheus/Grafana
- Monitor lag, throughput, and error rates
- Set up alerts for critical metrics
3. Security
- Enable SSL/TLS encryption
- Implement SASL authentication
- Use ACLs for authorization
4. Performance Tuning
- Adjust batch.size and linger.ms for producers
- Configure fetch.min.bytes for consumers
- Set appropriate replication factors
Conclusion
You now have a solid foundation for building Kafka applications with Docker and Spring Boot. This stack provides the scalability and reliability needed for modern event-driven architectures. Start with the basics, monitor your metrics, and scale as needed.