If you're using Redpanda (or Kafka) and hitting mysterious consumer errors like:
Message consumption error: BrokerTransportFailure
You might be dealing with a message size limit issue! I ran into this recently, and increasing the fetch.message.max.bytes limit solved it instantly.
The Problem
My Rust consumer, built with rdkafka, was randomly failing with BrokerTransportFailure. Debugging didn’t reveal much rather going back n forth as everything otherwise with broker connection, topic, network reachability was all good; but after experimenting with different configurations, I realized the issue was message size constraints as this particular topic hosted quite large size of messages being received
Kafka/Redpanda has default limits on how much data can be fetched per request. If a message is too big, the broker refuses to send it, causing consumer failures.
The Fix
To resolve this, I had to increase the message size limits on both the the consumer. And that does not require any change in cluster or broker - but only in client code
The key setting here is fetch.message.max.bytes, which allows the consumer to fetch larger messages.
pub struct RPMessageProcessor {
consumer: StreamConsumer,
}
impl RPMessageProcessor {
pub fn new(group_id: &str, topic_name: &str) -> Self {
let broker = env::var("REDPANDA_BROKERS").unwrap_or_else(|_| "localhost:9092".to_string());
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", broker)
.set("group.id", group_id)
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set("fetch.message.max.bytes", "262144000") // Increased to 250 MB
.create()
.expect("Consumer creation failed");
tracing::debug!("Streaming Consumer Created");
consumer
.subscribe(&[topic_name])
.expect("Subscription failed");
tracing::debug!("Subscribed to {}", topic_name);
Self { consumer }
}
pub async fn run(&self, message_type: &str) {
loop {
match self.consumer.recv().await {
Ok(msg) => {
let should_process = msg.headers().and_then(|headers| {
headers.iter().find(|header| {
header.key == "message-type"
&& header
.value
.map(|v| v == message_type.as_bytes())
.unwrap_or(false)
})
}).is_some();
if should_process {
if let Err(err) = Self::process_message(&msg).await {
tracing::error!("Error processing message: {}", err);
}
}
// Commit offset
if let Err(e) = self.consumer.commit_message(&msg, CommitMode::Sync) {
tracing::error!("Failed to commit offset: {}", e);
}
}
Err(err) => tracing::error!("Kafka error: {}", err),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn process_message(msg: &BorrowedMessage<'_>) -> Result<()> {
if let Some(payload) = msg.payload_view::().transpose()? {
tracing::info!("Processing message: {}", payload);
} else {
tracing::error!("Received empty or invalid message");
}
Ok(())
}
}
How it is invoked?
RPMessageProcessor::new("CRE-RP-CG", &CONFIG.edx.input_topic)
.run("CRE_HEADER_VALUE")
.await;
Lesson Learned: Size DOES Matter!
If you’re running into Kafka or Redpanda consumer errors, check your message size limits! The broker, producer, and consumer need to be in sync to avoid failures