Kafka’s design makes it seem like a simple message queue, but when you’re building a robust Node.js consumer with kafkajs, the devil is in the details of how you manage state and handle failures.

Let’s watch a consumer in action. Imagine we have a Kafka topic named user_events with messages like this:

{
  "userId": "user-123",
  "eventType": "login",
  "timestamp": "2023-10-27T10:00:00Z"
}

Here’s a basic Node.js consumer setup using kafkajs:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-consumer-app',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'user-event-processors' });

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'user_events', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        key: message.key.toString(),
        value: message.value.toString(),
        headers: message.headers,
      });
      // Simulate processing the message
      await new Promise(resolve => setTimeout(resolve, 50));
    },
  });
};

run().catch(e => console.error('[consumer/run] Error:', e));

This simple script connects, subscribes, and logs each message. But what happens when processing takes longer than expected, or when a message is malformed? This is where production patterns become critical.

The core problem kafkajs consumers solve is reliably processing a stream of events from Kafka. The "stream" aspect means you’re not just fetching a batch and done; it’s continuous. The "reliably" part is the tricky bit. Kafka guarantees at-least-once delivery by default. This means your consumer might process the same message twice if a failure occurs between processing and committing the offset. Your application logic needs to be idempotent – processing a message multiple times should have the same effect as processing it once.

Let’s look at the internal levers you control:

  • groupId: This is fundamental. All consumers with the same groupId share the load of partitions for a given topic. If you have one consumer instance, it gets all partitions. If you have two, they split the partitions. Kafka manages this rebalancing automatically.
  • partitionAssignmentStrategy: kafkajs defaults to RoundRobinPartitionAssignor, which distributes partitions as evenly as possible. RangePartitionAssignor assigns contiguous ranges of partitions to consumers. For most cases, the default is fine.
  • fromBeginning: When subscribing, this flag determines if you start from the oldest message or the latest. In production, you often want to start from the beginning for a fresh deployment or recovery, but be cautious about reprocessing massive historical data.
  • eachMessage vs. eachBatch: eachMessage is convenient for simple processing. eachBatch gives you more control over committing offsets for multiple messages at once, which is more efficient and crucial for managing processing state.
  • Offset Committing: This is the heart of reliability. kafkajs doesn’t commit offsets automatically by default. You must explicitly tell Kafka when a message (or batch of messages) has been successfully processed. If your consumer crashes before committing, Kafka will redeliver those messages to another consumer (or the same one on restart).

The most common production pattern for kafkajs consumers revolves around managing the lifecycle of message processing and offset commits. You want to ensure that an offset is only committed after the side effects of processing that message are complete and durable.

Consider a scenario where your consumer fetches a message, starts a database transaction, processes the message, commits the database transaction, and then commits the Kafka offset. If the consumer crashes after the database commit but before the Kafka commit, upon restart, it will re-process the message. Because your processing logic is idempotent (e.g., a database INSERT ... ON CONFLICT UPDATE or a check to see if the record already exists), the second processing attempt will have no unintended side effects.

const { Kafka, logLevel } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-idempotent-consumer',
  brokers: ['localhost:9092'],
  logLevel: logLevel.WARN // Reduce noise in production
});

const consumer = kafka.consumer({
  groupId: 'user-event-processors-idempotent',
  sessionTimeout: 30000, // Default is 10s, can be too short for slow processing
  heartbeatInterval: 2000 // Default is 3s
});

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'user_events', fromBeginning: true });

  await consumer.run({
    autoCommit: false, // Crucial: we will commit manually
    eachMessage: async ({ topic, partition, message }) => {
      const prefix = `[${topic}/${partition}/${message.offset}]`;
      try {
        const event = JSON.parse(message.value.toString());
        console.log(`${prefix} Processing: ${event.userId} - ${event.eventType}`);

        // --- Start of Idempotent Processing ---
        // Simulate database operation that is idempotent
        await processUserEvent(event); // e.g., upserting user state
        // --- End of Idempotent Processing ---

        // If processUserEvent succeeds, commit the offset
        await consumer.commitOffsets([{ topic, partition, offset: (parseInt(message.offset) + 1).toString() }]);
        console.log(`${prefix} Successfully processed and committed offset.`);
      } catch (error) {
        console.error(`${prefix} Failed to process message:`, error);
        // Depending on the error and retry strategy, you might:
        // 1. Log and discard (if it's a poison pill you can't recover from)
        // 2. Send to a Dead Letter Queue (DLQ)
        // 3. Implement a retry mechanism (but be careful of infinite loops)
        // For now, we let Kafka redeliver by NOT committing.
      }
    },
  });
};

async function processUserEvent(event) {
  // Simulate an async operation, like a database write.
  // This operation MUST be idempotent.
  await new Promise(resolve => setTimeout(resolve, 100));
  if (event.eventType === 'error_trigger') {
    throw new Error('Simulated processing error');
  }
  console.log(`  -> Idempotently handled: ${event.userId} - ${event.eventType}`);
}

run().catch(async (e) => {
  console.error('[consumer/run] Uncaught error:', e);
  try {
    await consumer.disconnect();
  } catch (disconnectError) {
    console.error('[consumer/disconnect] Error during disconnect:', disconnectError);
  }
  process.exit(1);
});

In this pattern, autoCommit: false is paramount. We only call consumer.commitOffsets after our processUserEvent function has successfully completed. The offset we commit is message.offset + 1, signifying that we have successfully processed up to and including message.offset. If processUserEvent throws an error, we catch it, log it, and crucially, do not commit the offset. This ensures Kafka will redeliver the message.

The sessionTimeout and heartbeatInterval are important for managing consumer health. If a consumer stops sending heartbeats (because it’s stuck or crashed), Kafka will assume it’s dead and trigger a rebalance, assigning its partitions to other consumers in the group. Setting these appropriately prevents premature rebalances for slow-but-healthy consumers.

A subtle but powerful aspect is how kafkajs handles rebalances. When a consumer joins or leaves a group, a rebalance occurs. During this period, consumers temporarily stop fetching messages. If your processing logic involves complex state that needs to be persisted or cleaned up before handing over partitions, you can hook into consumer.on(consumer.events.CRASH, ...) and consumer.on(consumer.events.DISCONNECT, ...). However, the most direct way to handle partition loss gracefully is to ensure your message processing is always idempotent.

The next hurdle you’ll likely face is handling large volumes of messages efficiently and managing the "poison pill" scenario – messages that consistently fail processing and block progress.

Want structured learning?

Take the full Nodejs course →