This is the first article in a series where I’ll show how to connect and configure systems using Brighter. In this article, we’ll focus on integrating Brighter with Apache Kafka .

Quick Introduction to Kafka

Kafka is a distributed streaming platform designed for high-throughput, real-time message processing. Key concepts include:

  • Streams: Process messages individually, with concurrency limited by the number of topic partitions.
  • Core Components:
    • Topic: A category/feed to which messages are published.
    • Partition: A shard of a topic that enables parallel processing.
    • Consumer Group: A set of consumers that collaboratively process messages from a topic.

To integrate Kafka with Brighter, you’ll need:
Topic Name: The target Kafka topic (e.g., greeting.topic).
Number of Partitions: Determines concurrency limits.
Consumer Group ID: Ensures message distribution among consumers.

Requirement

  1. .NET 8 or superior
  2. A .NET project with these NuGet packages
  3. Docker/podman: For local Kafka setup.

Local Kafka Setup with Docker/Podman

Use this docker-compose.yml to spin up Kafka, Zookeeper, and a UI:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - "2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    healthcheck:
      test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
      interval: 10s
      timeout: 10s
      retries: 5
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9997:9997"
    expose:
      - "29092"
      - "9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_MIN_INSYNC_REPLICAS: "1"

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8088:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
      DYNAMIC_CONFIG_ENABLED: "true"

Steps:

  1. Run podman-compose -f docker-compose.yml up -d (or docker-compose).
  2. Access the Kafka UI at http://localhost:8088 .

Note: This uses PLAINTEXT for simplicity. Use SSL/SASL_SSL in production.

Brighter Recap

Before continue about Kafka configuration, let's recap what we already know about Brighter.

Request (Command/Event)

Brighter uses IRequest to mark objects for processing. Extend Command or Event:

public class Greeting() : Event(Guid.NewGuid())
{
    public string Name { get; set; } = string.Empty;
}

Message Mapper

Translates between Brighter requests and Kafka messages:

public class GreetingMapper : IAmAMessageMapper<Greeting>
{
    public Message MapToMessage(Greeting request)
    {
        var header = new MessageHeader();
        header.Id = request.Id; 
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "greeting.topic"; // The target topic to be publish
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public Greeting MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<Greeting>(message.Body.Bytes)!;
    }
}

Request Handler

Processes incoming messages:

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting command)
    {
        logger.LogInformation("Hello {Name}", command.Name);
        return base.Handle(command);
    }
}

Configuring Kafka with Brighter

Kafka Connection

Define connection settings:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample", // Application Name
    BootStrapServers = ["localhost:9092"], // Broker address
    SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL in prod
    SaslMechanisms = SaslMechanism.Plain,
    SaslUsername = "admin", // For SASL authentication
    SaslPassword = "admin-secret"
};

Kafka Consumer

Configure subscriptions and channels:

.AddServiceActivator(opt =>
 {
     opt.Subscriptions =
     [
         new KafkaSubscription<Greeting>(
             new SubscriptionName("kafka.greeting.subscription"), // The subscription name, it's used internal only, so you can put whatevery you want
             new ChannelName("greeting.topic"), // The topic name
             new RoutingKey("greeting.topic"),  // The topic name
             groupId: "some-consumer-group", // The Kafka Group ID
             makeChannels: OnMissingChannel.Create, // Tell to Brighter what to do when the topic not exists
             numOfPartitions: 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
             noOfPerformers: 2, // The number of subscription running in parallel, it doesn't make sense to be bigger than the number of partition
             isAsync: false, // If you want to use RequestHandlerAsync
           ),
     ];

     opt.ChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})

Kafka Producer

Set up the external bus for publishing:

.UseExternalBus(new KafkaProducerRegistryFactory(connection,
  [
      new KafkaPublication
      {
          MakeChannels = OnMissingChannel.Create,
          NumPartitions = 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
          Topic = new RoutingKey("greeting.topic"), // The topic name, and you should use this topic when you are mapping an object to message
      },
  ]).Create()
);

Conclusion

Integrating Brighter with Kafka simplifies building scalable, message-driven systems.

Reference