Introduction to Apache Kafka

Apache Kafka has become the de facto standard for building real-time data pipelines and streaming applications. Combined with Docker for containerization and Spring Boot for Java development, you get a powerful, scalable, and developer-friendly stack.

In this comprehensive guide, we'll build a production-ready Kafka application using Docker and Spring Boot, covering everything from basic setup to advanced patterns.

Why Kafka + Docker + Spring Boot?

Apache Kafka Benefits

  • High Throughput: Handle millions of messages per second
  • Scalability: Horizontal scaling with partitions
  • Durability: Persistent storage with replication
  • Real-time Processing: Low-latency message delivery

Docker Advantages

  • Consistent development environments
  • Easy Kafka cluster setup
  • Simplified deployment
  • Version management

Spring Boot Integration

  • Spring Kafka abstraction layer
  • Auto-configuration
  • Easy serialization/deserialization
  • Excellent error handling

Setting Up Kafka with Docker

Docker Compose Configuration

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
      - zookeeper-logs:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    volumes:
      - kafka-data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

volumes:
  zookeeper-data:
  zookeeper-logs:
  kafka-data:

Start your Kafka cluster:

docker-compose up -d

Spring Boot Kafka Producer

Dependencies (Maven)

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

Configuration

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        linger.ms: 10
        batch.size: 16384
    consumer:
      group-id: my-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      properties:
        spring.json.trusted.packages: "*"

Producer Implementation

@Service
@Slf4j
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(String topic, String key, Object message) {
        ListenableFuture<SendResult<String, Object>> future = 
            kafkaTemplate.send(topic, key, message);
        
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("Message sent successfully: topic={}, partition={}, offset={}", 
                    topic, 
                    result.getRecordMetadata().partition(), 
                    result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("Failed to send message: {}", ex.getMessage());
            }
        });
    }
}

Spring Boot Kafka Consumer

@Service
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics = "user-events", groupId = "my-consumer-group")
    public void consumeUserEvents(
        @Payload UserEvent event,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) long offset) {
        
        log.info("Received message: event={}, partition={}, offset={}", 
            event, partition, offset);
        
        // Process your message here
        processEvent(event);
    }

    @KafkaListener(
        topics = "order-events",
        containerFactory = "kafkaListenerContainerFactory",
        errorHandler = "kafkaErrorHandler"
    )
    public void consumeOrderEvents(@Payload OrderEvent event) {
        log.info("Processing order: {}", event);
        // Business logic here
    }

    private void processEvent(UserEvent event) {
        // Your business logic
    }
}

Advanced Configuration

Custom Kafka Configuration

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> 
        kafkaListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

Error Handling and Retry

@Component
@Slf4j
public class KafkaErrorHandler implements KafkaListenerErrorHandler {

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        log.error("Error processing message: {}", message.getPayload(), exception);
        
        // Implement your retry logic or dead letter queue
        return null;
    }
}

// Configure retry with backoff
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> 
    retryKafkaListenerContainerFactory() {
    
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    
    factory.setErrorHandler(new SeekToCurrentErrorHandler(
        new DeadLetterPublishingRecoverer(kafkaTemplate()),
        new FixedBackOff(1000L, 3L)
    ));
    
    return factory;
}

Testing Kafka with Testcontainers

@SpringBootTest
@Testcontainers
class KafkaIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Autowired
    private KafkaProducerService producerService;

    @Test
    void testSendMessage() {
        UserEvent event = new UserEvent("user123", "login");
        producerService.sendMessage("user-events", "key1", event);
        
        // Add assertions
    }
}

Production Best Practices

1. Partitioning Strategy

Use proper key selection for even distribution and ordering guarantees.

2. Monitoring

  • Use Kafka UI or Prometheus/Grafana
  • Monitor lag, throughput, and error rates
  • Set up alerts for critical metrics

3. Security

  • Enable SSL/TLS encryption
  • Implement SASL authentication
  • Use ACLs for authorization

4. Performance Tuning

  • Adjust batch.size and linger.ms for producers
  • Configure fetch.min.bytes for consumers
  • Set appropriate replication factors

Conclusion

You now have a solid foundation for building Kafka applications with Docker and Spring Boot. This stack provides the scalability and reliability needed for modern event-driven architectures. Start with the basics, monitor your metrics, and scale as needed.