Message Queues: RabbitMQ, Kafka, and Async

TT
Message Queues: RabbitMQ, Kafka, and Async

Message Queues: RabbitMQ, Kafka, and Async

Synchronous HTTP calls couple services together—if the downstream service is slow or unavailable, the upstream service blocks or fails. Message queues decouple producers from consumers: the producer publishes a message and moves on; consumers process it asynchronously at their own pace. RabbitMQ and Kafka are the dominant solutions, but they have fundamentally different architectures suited to different use cases.


Core Concepts

text
Without message queue (tight coupling):
OrderService → HTTP → InventoryService → HTTP → NotificationService
           ↑ if any link is slow, everything backs up

With message queue (loose coupling):
OrderService → publish → [order-created queue] → InventoryService
                                               ↘ NotificationService
                                               ↘ AnalyticsService
           ↑ OrderService doesn't know or care about consumers
ConceptMeaning
ProducerService that sends messages
ConsumerService that receives and processes messages
QueueBuffer that holds messages until consumed
TopicNamed channel for related messages (Kafka term)
ExchangeRoutes messages to queues (RabbitMQ term)
Consumer groupSet of consumers that share work (each message goes to one member)
AcknowledgmentConsumer signals it has processed a message
Dead letter queueWhere unprocessable messages are sent after retries exhausted

RabbitMQ: Traditional Message Broker

RabbitMQ is a general-purpose message broker built on AMQP. Producers send to exchanges; exchanges route to queues based on routing rules. Consumers subscribe to queues.

Exchange Types

text
Direct exchange: route by exact routing key match
  Producer → exchange (key: "order.created") → queue "orders"

Fanout exchange: broadcast to all bound queues
  Producer → exchange → queue "inventory"
                      → queue "notifications"
                      → queue "analytics"

Topic exchange: route by pattern matching
  Producer → exchange (key: "order.us.created") 
           → queue bound to "order.*.created"   ✓ matches
           → queue bound to "order.us.*"         ✓ matches
           → queue bound to "payment.#"          ✗ no match

Headers exchange: route by message header attributes
  Producer → exchange (header: region=EU, type=premium)
           → queue matching {region: EU}          ✓ matches

RabbitMQ with Node.js (amqplib)

typescript
import amqp, { Channel, Connection } from 'amqplib';

class RabbitMQClient {
  private connection: Connection | null = null;
  private channel: Channel | null = null;

  async connect(url = 'amqp://localhost'): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();
  }

  // Publish a message
  async publish(exchange: string, routingKey: string, message: object): Promise<void> {
    if (!this.channel) throw new Error('Not connected');
    
    await this.channel.assertExchange(exchange, 'topic', { durable: true });
    
    this.channel.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,        // survive broker restart
        contentType: 'application/json',
        timestamp: Date.now(),
      }
    );
  }

  // Consume messages with manual acknowledgment
  async consume(
    queue: string,
    exchange: string,
    routingKey: string,
    handler: (msg: object) => Promise<void>
  ): Promise<void> {
    if (!this.channel) throw new Error('Not connected');
    
    await this.channel.assertExchange(exchange, 'topic', { durable: true });
    await this.channel.assertQueue(queue, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': `${exchange}.dlx`,  // dead letter routing
        'x-message-ttl': 60_000,                       // 60s TTL
      },
    });
    await this.channel.bindQueue(queue, exchange, routingKey);
    
    // Only receive 1 message at a time (prevents memory overload)
    this.channel.prefetch(1);
    
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;
      
      try {
        const content = JSON.parse(msg.content.toString());
        await handler(content);
        this.channel!.ack(msg);          // success: remove from queue
      } catch (error) {
        console.error('Processing failed:', error);
        // nack with requeue=false → sends to dead letter queue
        this.channel!.nack(msg, false, false);
      }
    });
  }

  async close(): Promise<void> {
    await this.channel?.close();
    await this.connection?.close();
  }
}

// Usage
const rabbit = new RabbitMQClient();
await rabbit.connect();

// Producer
await rabbit.publish('orders', 'order.us.created', {
  orderId: 'ord-123',
  userId: 'usr-456',
  amount: 99.99,
});

// Consumer
await rabbit.consume('inventory-service', 'orders', 'order.*.created', async (msg) => {
  const order = msg as { orderId: string; amount: number };
  await updateInventory(order.orderId);
});

Dead Letter Queue Pattern

typescript
// Set up DLX (Dead Letter Exchange) for failed messages
async function setupWithDeadLetter(channel: Channel, queueName: string): Promise<void> {
  const dlxName = `${queueName}.dlx`;
  const dlqName = `${queueName}.dead`;

  // Dead letter exchange and queue
  await channel.assertExchange(dlxName, 'direct', { durable: true });
  await channel.assertQueue(dlqName, { durable: true });
  await channel.bindQueue(dlqName, dlxName, queueName);

  // Main queue routes failures to DLX
  await channel.assertQueue(queueName, {
    durable: true,
    arguments: {
      'x-dead-letter-exchange': dlxName,
      'x-dead-letter-routing-key': queueName,
      'x-max-retries': 3,
    },
  });
}

Apache Kafka: Distributed Log

Kafka is fundamentally different from RabbitMQ. It is a distributed commit log, not a traditional message broker. Messages are stored durably for a configurable retention period (default 7 days) and consumers read from any offset—old messages are not deleted after consumption.

text
RabbitMQ mental model:
  Messages are tasks to be processed and removed

Kafka mental model:
  Messages are an ordered, immutable log of events
  Multiple independent consumers can replay from any point

Kafka Architecture

text
Producers → Topics → Consumer Groups

Topic: "order-events" (3 partitions)
  Partition 0: [event1, event2, event5, event8]
  Partition 1: [event3, event6, event9]
  Partition 2: [event4, event7, event10]

Consumer Group A (InventoryService) — 3 consumers:
  Consumer A0 reads Partition 0
  Consumer A1 reads Partition 1
  Consumer A2 reads Partition 2

Consumer Group B (AnalyticsService) — 1 consumer:
  Consumer B0 reads ALL partitions
  (reads at its own pace, independent of Group A)

Key properties:

  • Each partition is ordered. Across partitions, ordering is not guaranteed.
  • Within a consumer group, each partition is consumed by exactly one consumer.
  • Messages with the same key always go to the same partition (ordering guarantee per entity).

Kafka with KafkaJS

typescript
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka:9092'],
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

// Producer
class OrderEventProducer {
  private producer: Producer;

  constructor() {
    this.producer = kafka.producer({
      idempotent: true,  // exactly-once delivery (prevents duplicate messages on retry)
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async publishOrderCreated(order: {
    orderId: string;
    userId: string;
    amount: number;
  }): Promise<void> {
    await this.producer.send({
      topic: 'order-events',
      messages: [
        {
          key: order.userId,      // same user → same partition → ordered per user
          value: JSON.stringify({ type: 'ORDER_CREATED', ...order }),
          headers: { 'event-version': '1' },
        },
      ],
    });
  }

  async disconnect(): Promise<void> {
    await this.producer.disconnect();
  }
}

// Consumer
class InventoryConsumer {
  private consumer: Consumer;

  constructor() {
    this.consumer = kafka.consumer({
      groupId: 'inventory-service',
      sessionTimeout: 30_000,
      heartbeatInterval: 3_000,
    });
  }

  async start(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'order-events', fromBeginning: false });

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
        const event = JSON.parse(message.value!.toString());
        
        if (event.type === 'ORDER_CREATED') {
          await processOrder(event);
        }
        // Offset is committed automatically after this function resolves
        // If it throws, Kafka retries (no manual ack needed by default)
      },
    });
  }
}

Kafka Offset Management

typescript
// Manual offset commit for precise control
await consumer.run({
  autoCommit: false,
  eachBatch: async ({ batch, commitOffsetsIfNecessary, resolveOffset, heartbeat }) => {
    for (const message of batch.messages) {
      await processMessage(message);
      
      // Mark this offset as processed (doesn't commit yet)
      resolveOffset(message.offset);
      
      // Commit every 100 messages
      if (Number(message.offset) % 100 === 0) {
        await commitOffsetsIfNecessary();
      }
      
      // Prevent consumer timeout during long processing
      await heartbeat();
    }
    await commitOffsetsIfNecessary();
  },
});

// Seek to a specific offset (replay events)
await consumer.seek({
  topic: 'order-events',
  partition: 0,
  offset: '0', // replay from beginning
});

RabbitMQ vs Kafka: Decision Guide

CriterionRabbitMQKafka
Message retentionDeleted after consumptionRetained (configurable, default 7 days)
Consumer modelCompeting consumers (one gets the message)Multiple independent consumer groups
OrderingPer queuePer partition
Throughput20k–50k msg/s per node500k–1M+ msg/s per node
Latency< 1ms5–15ms (tunable)
ComplexityLow–MediumMedium–High
ReplayabilityNo (without plugins)Yes (by resetting offset)
Routing logicRich (exchanges, bindings)Limited (key-based partitioning)
Cluster managementSimplerRequires ZooKeeper/KRaft

Use RabbitMQ when:

  • You need complex routing rules (topic/header-based)
  • Messages should be processed once and deleted
  • You want request/reply patterns
  • Your team is smaller and operational simplicity matters
  • Throughput < 50k messages/second

Use Kafka when:

  • You need event sourcing or event replay
  • Multiple independent services consume the same events
  • You need an audit log of everything that happened
  • Throughput > 100k messages/second
  • You're building stream processing pipelines (Kafka Streams, Flink)

Backpressure and Rate Limiting

Without backpressure, a slow consumer that receives messages faster than it can process will accumulate an unbounded backlog and eventually run out of memory.

typescript
// RabbitMQ: prefetch limits in-flight messages
channel.prefetch(10); // consumer receives at most 10 unacknowledged messages

// Kafka: pause partition consumption when overwhelmed
await consumer.run({
  eachMessage: async ({ topic, partition, message, pause }) => {
    try {
      await processMessage(message);
    } catch (error) {
      if (isOverloaded()) {
        // Pause this partition for 5 seconds
        const resume = pause();
        setTimeout(resume, 5000);
      }
      throw error;
    }
  },
});

Idempotent Consumers

Network failures can cause message redelivery. Design consumers to handle duplicate messages safely:

typescript
// Track processed message IDs to deduplicate
class IdempotentOrderProcessor {
  constructor(
    private db: Pool,
    private processedIds: Set<string> = new Set()
  ) {}

  async process(message: { messageId: string; orderId: string }): Promise<void> {
    // Check if already processed (in-memory for speed, then DB for durability)
    if (this.processedIds.has(message.messageId)) return;

    const already = await this.db.query(
      'SELECT 1 FROM processed_messages WHERE id = $1',
      [message.messageId]
    );
    if (already.rows.length > 0) return;

    // Process in a transaction with deduplication record
    await this.db.query('BEGIN');
    try {
      await processOrder(message.orderId);
      await this.db.query(
        'INSERT INTO processed_messages (id, processed_at) VALUES ($1, NOW())',
        [message.messageId]
      );
      await this.db.query('COMMIT');
      this.processedIds.add(message.messageId);
    } catch (e) {
      await this.db.query('ROLLBACK');
      throw e;
    }
  }
}

Frequently Asked Questions

Q: Can I use a database table as a message queue instead of RabbitMQ or Kafka?

Yes, and this is often the right choice for low-volume use cases. The "transactional outbox" pattern stores events in a database table within the same transaction as your business data. A polling worker reads unprocessed rows, publishes them, and marks them as sent. This gives exactly-once semantics for event publishing (tied to your DB transaction) without introducing an external dependency. The downside is polling overhead and limited throughput (typically < 1,000 events/second). For anything higher, switch to Kafka or RabbitMQ.

Q: How do I handle ordering when I have multiple consumers in Kafka?

Kafka guarantees ordering within a partition, not across partitions. To guarantee that all events for a specific entity (a user, an order) are processed in order: use the entity ID as the message key. Kafka routes messages with the same key to the same partition, and each partition is consumed sequentially by one consumer in a group. If you have 10 partitions and 3 consumers, each consumer handles multiple partitions but each partition is only ever handled by one consumer.

Q: What happens to messages in RabbitMQ if the broker crashes?

If messages are published with persistent: true and queues are declared durable: true, messages survive broker restarts. Without persistence, messages in memory are lost on crash. For high-availability, RabbitMQ supports mirrored queues (classic) or quorum queues (recommended for new deployments)—quorum queues replicate messages to a majority of cluster nodes before acknowledging the publish. This prevents message loss even if a minority of nodes fail simultaneously.

Q: Should I use request/reply (RPC over messaging) patterns?

Sparingly. You can implement synchronous-style request/reply over a message queue using correlation IDs and reply queues, but it combines the worst of both worlds: the latency overhead of async messaging with the tight coupling of synchronous calls. If you need a synchronous answer within the same request, use HTTP/gRPC directly. Use messaging for true one-way fire-and-forget operations, event fan-out, or workload offloading where the caller doesn't need to wait for a result.