WebSockets & Real-Time Communication in Node.js

WebSockets & Real-Time Communication in Node.js
HTTP is a conversation where the client always speaks first. WebSockets flip that model — once a connection is open, both the client and server can send messages at any time. This is what enables live features: task updates appearing without refresh, collaborative cursors moving across a document, notifications arriving the instant they are triggered.
This module covers the WebSocket protocol, the ws library for Node.js, authenticated connections, broadcasting patterns, and how to integrate WebSockets with an existing Express server.
This is Module 25 of the Node.js Full‑Stack Developer course.
Installing ws
npm install wsWebSocket Server Basics
// ws-server.js
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (socket, request) => {
console.log('Client connected. Total:', wss.clients.size);
// Receive a message from the client
socket.on('message', (data) => {
const message = data.toString();
console.log('Received:', message);
// Echo back
socket.send(`Echo: ${message}`);
});
socket.on('close', (code, reason) => {
console.log('Client disconnected:', code, reason.toString());
});
socket.on('error', (err) => {
console.error('Socket error:', err.message);
});
// Send a welcome message immediately
socket.send(JSON.stringify({ type: 'welcome', message: 'Connected to server' }));
});
console.log('WebSocket server running on ws://localhost:8080');// Client (browser)
const ws = new WebSocket('ws://localhost:8080');
ws.onopen = () => ws.send('Hello server!');
ws.onmessage = (event) => console.log('Server says:', event.data);
ws.onclose = (event) => console.log('Connection closed:', event.code);
ws.onerror = (err) => console.error('WebSocket error:', err);Integrating WebSockets with Express
Share the same HTTP server between Express and the WebSocket server:
// server.js
import http from 'http';
import { WebSocketServer } from 'ws';
import app from './app.js';
const server = http.createServer(app);
// Attach WebSocket server to the same HTTP server
const wss = new WebSocketServer({ server });
wss.on('connection', (socket, request) => {
handleConnection(socket, request);
});
server.listen(3000, () => {
console.log('HTTP + WebSocket server running on port 3000');
});Now http://localhost:3000 serves Express and ws://localhost:3000 handles WebSockets — same port, same process.
Message Protocol
Real applications send structured messages. Define a simple protocol with a type field:
// lib/ws-protocol.js
// Server → Client message types
export const SERVER_EVENTS = {
CONNECTED: 'connected',
TASK_CREATED: 'task:created',
TASK_UPDATED: 'task:updated',
TASK_DELETED: 'task:deleted',
MEMBER_JOINED: 'workspace:member_joined',
NOTIFICATION: 'notification',
ERROR: 'error',
};
// Client → Server message types
export const CLIENT_EVENTS = {
JOIN_WORKSPACE: 'workspace:join',
LEAVE_WORKSPACE: 'workspace:leave',
TYPING: 'task:typing',
};
export function send(socket, type, payload = {}) {
if (socket.readyState === socket.OPEN) {
socket.send(JSON.stringify({ type, payload, ts: Date.now() }));
}
}
export function parse(data) {
try {
return JSON.parse(data.toString());
} catch {
return null;
}
}Authenticated WebSocket Connections
Verify the JWT during the WebSocket handshake:
// ws/connection-handler.js
import { verifyAccessToken } from '../lib/tokens.js';
import { parse, send, SERVER_EVENTS } from '../lib/ws-protocol.js';
// Map of userId → Set of sockets (one user can have multiple tabs)
export const userSockets = new Map();
// Map of workspaceId → Set of userIds
export const workspaceSubscriptions = new Map();
export function handleConnection(socket, request) {
// Extract token from query string: ws://localhost:3000?token=eyJ...
const url = new URL(request.url, 'http://localhost');
const token = url.searchParams.get('token');
if (!token) {
send(socket, SERVER_EVENTS.ERROR, { message: 'Authentication required' });
return socket.close(4001, 'Unauthenticated');
}
let user;
try {
user = verifyAccessToken(token);
} catch {
send(socket, SERVER_EVENTS.ERROR, { message: 'Invalid token' });
return socket.close(4001, 'Invalid token');
}
// Attach user info to the socket
socket.userId = user.sub;
socket.role = user.role;
// Register in userSockets map
if (!userSockets.has(user.sub)) {
userSockets.set(user.sub, new Set());
}
userSockets.get(user.sub).add(socket);
send(socket, SERVER_EVENTS.CONNECTED, { userId: user.sub });
socket.on('message', (data) => handleMessage(socket, parse(data)));
socket.on('close', () => {
// Clean up
const sockets = userSockets.get(socket.userId);
sockets?.delete(socket);
if (sockets?.size === 0) userSockets.delete(socket.userId);
// Remove from all workspace subscriptions
for (const [wsId, users] of workspaceSubscriptions.entries()) {
users.delete(socket.userId);
if (users.size === 0) workspaceSubscriptions.delete(wsId);
}
});
}
function handleMessage(socket, message) {
if (!message) return;
switch (message.type) {
case 'workspace:join':
joinWorkspace(socket, message.payload.workspaceId);
break;
case 'workspace:leave':
leaveWorkspace(socket, message.payload.workspaceId);
break;
default:
send(socket, SERVER_EVENTS.ERROR, { message: `Unknown event: ${message.type}` });
}
}
function joinWorkspace(socket, workspaceId) {
if (!workspaceSubscriptions.has(workspaceId)) {
workspaceSubscriptions.set(workspaceId, new Set());
}
workspaceSubscriptions.get(workspaceId).add(socket.userId);
}
function leaveWorkspace(socket, workspaceId) {
workspaceSubscriptions.get(workspaceId)?.delete(socket.userId);
}Broadcasting to Workspace Members
// lib/ws-broadcast.js
import { userSockets, workspaceSubscriptions } from '../ws/connection-handler.js';
import { send } from './ws-protocol.js';
/**
* Send a message to all connected clients subscribed to a workspace.
* @param {string} workspaceId
* @param {string} type - Event type
* @param {object} payload
* @param {string} [excludeUserId] - Don't send to this user (the one who triggered it)
*/
export function broadcastToWorkspace(workspaceId, type, payload, excludeUserId) {
const subscribers = workspaceSubscriptions.get(workspaceId);
if (!subscribers) return;
for (const userId of subscribers) {
if (userId === excludeUserId) continue;
const sockets = userSockets.get(userId);
if (!sockets) continue;
for (const socket of sockets) {
send(socket, type, payload);
}
}
}
/**
* Send a message to all sockets of a specific user (all their tabs).
*/
export function sendToUser(userId, type, payload) {
const sockets = userSockets.get(userId);
if (!sockets) return;
for (const socket of sockets) {
send(socket, type, payload);
}
}Triggering Broadcasts from the API
When a task is created via the REST API, broadcast the event to all workspace subscribers:
// features/tasks/tasks.service.js
import { broadcastToWorkspace } from '../../lib/ws-broadcast.js';
import { SERVER_EVENTS } from '../../lib/ws-protocol.js';
export async function createTask(data, createdByUserId) {
const task = await Task.create(data);
const populated = await task.populate('createdBy assignee', 'name avatar');
// Broadcast to all other workspace members in real time
broadcastToWorkspace(
data.workspaceId.toString(),
SERVER_EVENTS.TASK_CREATED,
populated.toObject(),
createdByUserId // exclude the creator — they already see it
);
return populated;
}
export async function updateTask(id, workspaceId, data, updatedByUserId) {
const task = await Task.findOneAndUpdate({ _id: id, workspaceId }, data, { new: true })
.populate('assignee', 'name avatar');
broadcastToWorkspace(
workspaceId.toString(),
SERVER_EVENTS.TASK_UPDATED,
task.toObject(),
updatedByUserId
);
return task;
}Client-Side WebSocket Hook (React)
// client/src/hooks/useWebSocket.js
import { useEffect, useRef, useCallback } from 'react';
import { getAccessToken } from '../lib/api';
export function useWebSocket(handlers) {
const wsRef = useRef(null);
const handlersRef = useRef(handlers);
handlersRef.current = handlers;
const connect = useCallback(() => {
const token = getAccessToken();
if (!token) return;
const ws = new WebSocket(`ws://localhost:3000?token=${token}`);
wsRef.current = ws;
ws.onopen = () => console.log('WebSocket connected');
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
const handler = handlersRef.current[message.type];
if (handler) handler(message.payload);
};
ws.onclose = (event) => {
if (event.code !== 1000) {
// Reconnect after 3 seconds (unless intentional close)
setTimeout(connect, 3000);
}
};
ws.onerror = (err) => console.error('WebSocket error:', err);
}, []);
useEffect(() => {
connect();
return () => wsRef.current?.close(1000, 'Component unmounted');
}, [connect]);
const send = useCallback((type, payload) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify({ type, payload }));
}
}, []);
return { send };
}// client/src/pages/WorkspacePage.jsx
import { useWebSocket } from '../hooks/useWebSocket';
export function WorkspacePage({ workspaceId }) {
const [tasks, setTasks] = useState([]);
const { send } = useWebSocket({
'task:created': (task) => setTasks(prev => [task, ...prev]),
'task:updated': (task) => setTasks(prev => prev.map(t => t._id === task._id ? task : t)),
'task:deleted': ({ id }) => setTasks(prev => prev.filter(t => t._id !== id)),
});
useEffect(() => {
// Subscribe to this workspace's real-time events
send('workspace:join', { workspaceId });
return () => send('workspace:leave', { workspaceId });
}, [workspaceId, send]);
// ...render tasks
}Ping / Pong — Keeping Connections Alive
Browsers and proxies close idle WebSocket connections. Send periodic pings:
// ws/heartbeat.js
import { WebSocketServer } from 'ws';
export function setupHeartbeat(wss, intervalMs = 30_000) {
const interval = setInterval(() => {
wss.clients.forEach((socket) => {
if (!socket.isAlive) return socket.terminate();
socket.isAlive = false;
socket.ping();
});
}, intervalMs);
wss.on('connection', (socket) => {
socket.isAlive = true;
socket.on('pong', () => { socket.isAlive = true; });
});
wss.on('close', () => clearInterval(interval));
}// server.js
import { setupHeartbeat } from './ws/heartbeat.js';
setupHeartbeat(wss);Node.js Full‑Stack Course — Module 25 of 32
Your application now pushes real-time updates to all connected clients. Continue to Module 26 to build a complete chat application with Socket.IO.
Summary
WebSockets turn your API from a request-response system into a live, bidirectional communication channel:
- Attach a
WebSocketServerto the samehttp.createServer(app)instance as Express — one port, both protocols - Authenticate connections during the handshake by reading the JWT from the query string
- Use a
userSocketsmap (userId → Set of sockets) to track connections; support multiple tabs per user - Use a
workspaceSubscriptionsmap (workspaceId → Set of userIds) to manage room-like subscriptions - Broadcast from service layer methods — after a DB write, call
broadcastToWorkspace()to push the change - Use
ping/pongheartbeats to detect and clean up dead connections - On the client, use a custom React hook that reconnects automatically on unexpected close
Continue to Module 26: Chat App with Socket.IO →
