Message Queues: RabbitMQ, Kafka, and Async

Message Queues: RabbitMQ, Kafka, and Async
Synchronous HTTP calls couple services together—if the downstream service is slow or unavailable, the upstream service blocks or fails. Message queues decouple producers from consumers: the producer publishes a message and moves on; consumers process it asynchronously at their own pace. RabbitMQ and Kafka are the dominant solutions, but they have fundamentally different architectures suited to different use cases.
Core Concepts
Without message queue (tight coupling):
OrderService → HTTP → InventoryService → HTTP → NotificationService
↑ if any link is slow, everything backs up
With message queue (loose coupling):
OrderService → publish → [order-created queue] → InventoryService
↘ NotificationService
↘ AnalyticsService
↑ OrderService doesn't know or care about consumers| Concept | Meaning |
|---|---|
| Producer | Service that sends messages |
| Consumer | Service that receives and processes messages |
| Queue | Buffer that holds messages until consumed |
| Topic | Named channel for related messages (Kafka term) |
| Exchange | Routes messages to queues (RabbitMQ term) |
| Consumer group | Set of consumers that share work (each message goes to one member) |
| Acknowledgment | Consumer signals it has processed a message |
| Dead letter queue | Where unprocessable messages are sent after retries exhausted |
RabbitMQ: Traditional Message Broker
RabbitMQ is a general-purpose message broker built on AMQP. Producers send to exchanges; exchanges route to queues based on routing rules. Consumers subscribe to queues.
Exchange Types
Direct exchange: route by exact routing key match
Producer → exchange (key: "order.created") → queue "orders"
Fanout exchange: broadcast to all bound queues
Producer → exchange → queue "inventory"
→ queue "notifications"
→ queue "analytics"
Topic exchange: route by pattern matching
Producer → exchange (key: "order.us.created")
→ queue bound to "order.*.created" ✓ matches
→ queue bound to "order.us.*" ✓ matches
→ queue bound to "payment.#" ✗ no match
Headers exchange: route by message header attributes
Producer → exchange (header: region=EU, type=premium)
→ queue matching {region: EU} ✓ matchesRabbitMQ with Node.js (amqplib)
import amqp, { Channel, Connection } from 'amqplib';
class RabbitMQClient {
private connection: Connection | null = null;
private channel: Channel | null = null;
async connect(url = 'amqp://localhost'): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
}
// Publish a message
async publish(exchange: string, routingKey: string, message: object): Promise<void> {
if (!this.channel) throw new Error('Not connected');
await this.channel.assertExchange(exchange, 'topic', { durable: true });
this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // survive broker restart
contentType: 'application/json',
timestamp: Date.now(),
}
);
}
// Consume messages with manual acknowledgment
async consume(
queue: string,
exchange: string,
routingKey: string,
handler: (msg: object) => Promise<void>
): Promise<void> {
if (!this.channel) throw new Error('Not connected');
await this.channel.assertExchange(exchange, 'topic', { durable: true });
await this.channel.assertQueue(queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': `${exchange}.dlx`, // dead letter routing
'x-message-ttl': 60_000, // 60s TTL
},
});
await this.channel.bindQueue(queue, exchange, routingKey);
// Only receive 1 message at a time (prevents memory overload)
this.channel.prefetch(1);
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString());
await handler(content);
this.channel!.ack(msg); // success: remove from queue
} catch (error) {
console.error('Processing failed:', error);
// nack with requeue=false → sends to dead letter queue
this.channel!.nack(msg, false, false);
}
});
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
// Usage
const rabbit = new RabbitMQClient();
await rabbit.connect();
// Producer
await rabbit.publish('orders', 'order.us.created', {
orderId: 'ord-123',
userId: 'usr-456',
amount: 99.99,
});
// Consumer
await rabbit.consume('inventory-service', 'orders', 'order.*.created', async (msg) => {
const order = msg as { orderId: string; amount: number };
await updateInventory(order.orderId);
});Dead Letter Queue Pattern
// Set up DLX (Dead Letter Exchange) for failed messages
async function setupWithDeadLetter(channel: Channel, queueName: string): Promise<void> {
const dlxName = `${queueName}.dlx`;
const dlqName = `${queueName}.dead`;
// Dead letter exchange and queue
await channel.assertExchange(dlxName, 'direct', { durable: true });
await channel.assertQueue(dlqName, { durable: true });
await channel.bindQueue(dlqName, dlxName, queueName);
// Main queue routes failures to DLX
await channel.assertQueue(queueName, {
durable: true,
arguments: {
'x-dead-letter-exchange': dlxName,
'x-dead-letter-routing-key': queueName,
'x-max-retries': 3,
},
});
}Apache Kafka: Distributed Log
Kafka is fundamentally different from RabbitMQ. It is a distributed commit log, not a traditional message broker. Messages are stored durably for a configurable retention period (default 7 days) and consumers read from any offset—old messages are not deleted after consumption.
RabbitMQ mental model:
Messages are tasks to be processed and removed
Kafka mental model:
Messages are an ordered, immutable log of events
Multiple independent consumers can replay from any pointKafka Architecture
Producers → Topics → Consumer Groups
Topic: "order-events" (3 partitions)
Partition 0: [event1, event2, event5, event8]
Partition 1: [event3, event6, event9]
Partition 2: [event4, event7, event10]
Consumer Group A (InventoryService) — 3 consumers:
Consumer A0 reads Partition 0
Consumer A1 reads Partition 1
Consumer A2 reads Partition 2
Consumer Group B (AnalyticsService) — 1 consumer:
Consumer B0 reads ALL partitions
(reads at its own pace, independent of Group A)Key properties:
- Each partition is ordered. Across partitions, ordering is not guaranteed.
- Within a consumer group, each partition is consumed by exactly one consumer.
- Messages with the same key always go to the same partition (ordering guarantee per entity).
Kafka with KafkaJS
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka:9092'],
retry: {
initialRetryTime: 100,
retries: 8,
},
});
// Producer
class OrderEventProducer {
private producer: Producer;
constructor() {
this.producer = kafka.producer({
idempotent: true, // exactly-once delivery (prevents duplicate messages on retry)
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async publishOrderCreated(order: {
orderId: string;
userId: string;
amount: number;
}): Promise<void> {
await this.producer.send({
topic: 'order-events',
messages: [
{
key: order.userId, // same user → same partition → ordered per user
value: JSON.stringify({ type: 'ORDER_CREATED', ...order }),
headers: { 'event-version': '1' },
},
],
});
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
// Consumer
class InventoryConsumer {
private consumer: Consumer;
constructor() {
this.consumer = kafka.consumer({
groupId: 'inventory-service',
sessionTimeout: 30_000,
heartbeatInterval: 3_000,
});
}
async start(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'order-events', fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
const event = JSON.parse(message.value!.toString());
if (event.type === 'ORDER_CREATED') {
await processOrder(event);
}
// Offset is committed automatically after this function resolves
// If it throws, Kafka retries (no manual ack needed by default)
},
});
}
}Kafka Offset Management
// Manual offset commit for precise control
await consumer.run({
autoCommit: false,
eachBatch: async ({ batch, commitOffsetsIfNecessary, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
await processMessage(message);
// Mark this offset as processed (doesn't commit yet)
resolveOffset(message.offset);
// Commit every 100 messages
if (Number(message.offset) % 100 === 0) {
await commitOffsetsIfNecessary();
}
// Prevent consumer timeout during long processing
await heartbeat();
}
await commitOffsetsIfNecessary();
},
});
// Seek to a specific offset (replay events)
await consumer.seek({
topic: 'order-events',
partition: 0,
offset: '0', // replay from beginning
});RabbitMQ vs Kafka: Decision Guide
| Criterion | RabbitMQ | Kafka |
|---|---|---|
| Message retention | Deleted after consumption | Retained (configurable, default 7 days) |
| Consumer model | Competing consumers (one gets the message) | Multiple independent consumer groups |
| Ordering | Per queue | Per partition |
| Throughput | 20k–50k msg/s per node | 500k–1M+ msg/s per node |
| Latency | < 1ms | 5–15ms (tunable) |
| Complexity | Low–Medium | Medium–High |
| Replayability | No (without plugins) | Yes (by resetting offset) |
| Routing logic | Rich (exchanges, bindings) | Limited (key-based partitioning) |
| Cluster management | Simpler | Requires ZooKeeper/KRaft |
Use RabbitMQ when:
- You need complex routing rules (topic/header-based)
- Messages should be processed once and deleted
- You want request/reply patterns
- Your team is smaller and operational simplicity matters
- Throughput < 50k messages/second
Use Kafka when:
- You need event sourcing or event replay
- Multiple independent services consume the same events
- You need an audit log of everything that happened
- Throughput > 100k messages/second
- You're building stream processing pipelines (Kafka Streams, Flink)
Backpressure and Rate Limiting
Without backpressure, a slow consumer that receives messages faster than it can process will accumulate an unbounded backlog and eventually run out of memory.
// RabbitMQ: prefetch limits in-flight messages
channel.prefetch(10); // consumer receives at most 10 unacknowledged messages
// Kafka: pause partition consumption when overwhelmed
await consumer.run({
eachMessage: async ({ topic, partition, message, pause }) => {
try {
await processMessage(message);
} catch (error) {
if (isOverloaded()) {
// Pause this partition for 5 seconds
const resume = pause();
setTimeout(resume, 5000);
}
throw error;
}
},
});Idempotent Consumers
Network failures can cause message redelivery. Design consumers to handle duplicate messages safely:
// Track processed message IDs to deduplicate
class IdempotentOrderProcessor {
constructor(
private db: Pool,
private processedIds: Set<string> = new Set()
) {}
async process(message: { messageId: string; orderId: string }): Promise<void> {
// Check if already processed (in-memory for speed, then DB for durability)
if (this.processedIds.has(message.messageId)) return;
const already = await this.db.query(
'SELECT 1 FROM processed_messages WHERE id = $1',
[message.messageId]
);
if (already.rows.length > 0) return;
// Process in a transaction with deduplication record
await this.db.query('BEGIN');
try {
await processOrder(message.orderId);
await this.db.query(
'INSERT INTO processed_messages (id, processed_at) VALUES ($1, NOW())',
[message.messageId]
);
await this.db.query('COMMIT');
this.processedIds.add(message.messageId);
} catch (e) {
await this.db.query('ROLLBACK');
throw e;
}
}
}Frequently Asked Questions
Q: Can I use a database table as a message queue instead of RabbitMQ or Kafka?
Yes, and this is often the right choice for low-volume use cases. The "transactional outbox" pattern stores events in a database table within the same transaction as your business data. A polling worker reads unprocessed rows, publishes them, and marks them as sent. This gives exactly-once semantics for event publishing (tied to your DB transaction) without introducing an external dependency. The downside is polling overhead and limited throughput (typically < 1,000 events/second). For anything higher, switch to Kafka or RabbitMQ.
Q: How do I handle ordering when I have multiple consumers in Kafka?
Kafka guarantees ordering within a partition, not across partitions. To guarantee that all events for a specific entity (a user, an order) are processed in order: use the entity ID as the message key. Kafka routes messages with the same key to the same partition, and each partition is consumed sequentially by one consumer in a group. If you have 10 partitions and 3 consumers, each consumer handles multiple partitions but each partition is only ever handled by one consumer.
Q: What happens to messages in RabbitMQ if the broker crashes?
If messages are published with persistent: true and queues are declared durable: true, messages survive broker restarts. Without persistence, messages in memory are lost on crash. For high-availability, RabbitMQ supports mirrored queues (classic) or quorum queues (recommended for new deployments)—quorum queues replicate messages to a majority of cluster nodes before acknowledging the publish. This prevents message loss even if a minority of nodes fail simultaneously.
Q: Should I use request/reply (RPC over messaging) patterns?
Sparingly. You can implement synchronous-style request/reply over a message queue using correlation IDs and reply queues, but it combines the worst of both worlds: the latency overhead of async messaging with the tight coupling of synchronous calls. If you need a synchronous answer within the same request, use HTTP/gRPC directly. Use messaging for true one-way fire-and-forget operations, event fan-out, or workload offloading where the caller doesn't need to wait for a result.
