Node.jsBackendFull-Stack

WebSockets & Real-Time Communication in Node.js

TT
TopicTrick Team
WebSockets & Real-Time Communication in Node.js

WebSockets & Real-Time Communication in Node.js

HTTP is a conversation where the client always speaks first. WebSockets flip that model — once a connection is open, both the client and server can send messages at any time. This is what enables live features: task updates appearing without refresh, collaborative cursors moving across a document, notifications arriving the instant they are triggered.

This module covers the WebSocket protocol, the ws library for Node.js, authenticated connections, broadcasting patterns, and how to integrate WebSockets with an existing Express server.

This is Module 25 of the Node.js Full‑Stack Developer course.


Installing ws

bash
npm install ws

WebSocket Server Basics

js
// ws-server.js
import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (socket, request) => {
  console.log('Client connected. Total:', wss.clients.size);

  // Receive a message from the client
  socket.on('message', (data) => {
    const message = data.toString();
    console.log('Received:', message);

    // Echo back
    socket.send(`Echo: ${message}`);
  });

  socket.on('close', (code, reason) => {
    console.log('Client disconnected:', code, reason.toString());
  });

  socket.on('error', (err) => {
    console.error('Socket error:', err.message);
  });

  // Send a welcome message immediately
  socket.send(JSON.stringify({ type: 'welcome', message: 'Connected to server' }));
});

console.log('WebSocket server running on ws://localhost:8080');
js
// Client (browser)
const ws = new WebSocket('ws://localhost:8080');

ws.onopen    = () => ws.send('Hello server!');
ws.onmessage = (event) => console.log('Server says:', event.data);
ws.onclose   = (event) => console.log('Connection closed:', event.code);
ws.onerror   = (err)   => console.error('WebSocket error:', err);

Integrating WebSockets with Express

Share the same HTTP server between Express and the WebSocket server:

js
// server.js
import http from 'http';
import { WebSocketServer } from 'ws';
import app from './app.js';

const server = http.createServer(app);

// Attach WebSocket server to the same HTTP server
const wss = new WebSocketServer({ server });

wss.on('connection', (socket, request) => {
  handleConnection(socket, request);
});

server.listen(3000, () => {
  console.log('HTTP + WebSocket server running on port 3000');
});

Now http://localhost:3000 serves Express and ws://localhost:3000 handles WebSockets — same port, same process.


Message Protocol

Real applications send structured messages. Define a simple protocol with a type field:

js
// lib/ws-protocol.js

// Server → Client message types
export const SERVER_EVENTS = {
  CONNECTED:        'connected',
  TASK_CREATED:     'task:created',
  TASK_UPDATED:     'task:updated',
  TASK_DELETED:     'task:deleted',
  MEMBER_JOINED:    'workspace:member_joined',
  NOTIFICATION:     'notification',
  ERROR:            'error',
};

// Client → Server message types
export const CLIENT_EVENTS = {
  JOIN_WORKSPACE:   'workspace:join',
  LEAVE_WORKSPACE:  'workspace:leave',
  TYPING:           'task:typing',
};

export function send(socket, type, payload = {}) {
  if (socket.readyState === socket.OPEN) {
    socket.send(JSON.stringify({ type, payload, ts: Date.now() }));
  }
}

export function parse(data) {
  try {
    return JSON.parse(data.toString());
  } catch {
    return null;
  }
}

Authenticated WebSocket Connections

Verify the JWT during the WebSocket handshake:

js
// ws/connection-handler.js
import { verifyAccessToken } from '../lib/tokens.js';
import { parse, send, SERVER_EVENTS } from '../lib/ws-protocol.js';

// Map of userId → Set of sockets (one user can have multiple tabs)
export const userSockets = new Map();

// Map of workspaceId → Set of userIds
export const workspaceSubscriptions = new Map();

export function handleConnection(socket, request) {
  // Extract token from query string: ws://localhost:3000?token=eyJ...
  const url    = new URL(request.url, 'http://localhost');
  const token  = url.searchParams.get('token');

  if (!token) {
    send(socket, SERVER_EVENTS.ERROR, { message: 'Authentication required' });
    return socket.close(4001, 'Unauthenticated');
  }

  let user;
  try {
    user = verifyAccessToken(token);
  } catch {
    send(socket, SERVER_EVENTS.ERROR, { message: 'Invalid token' });
    return socket.close(4001, 'Invalid token');
  }

  // Attach user info to the socket
  socket.userId = user.sub;
  socket.role   = user.role;

  // Register in userSockets map
  if (!userSockets.has(user.sub)) {
    userSockets.set(user.sub, new Set());
  }
  userSockets.get(user.sub).add(socket);

  send(socket, SERVER_EVENTS.CONNECTED, { userId: user.sub });

  socket.on('message', (data) => handleMessage(socket, parse(data)));

  socket.on('close', () => {
    // Clean up
    const sockets = userSockets.get(socket.userId);
    sockets?.delete(socket);
    if (sockets?.size === 0) userSockets.delete(socket.userId);

    // Remove from all workspace subscriptions
    for (const [wsId, users] of workspaceSubscriptions.entries()) {
      users.delete(socket.userId);
      if (users.size === 0) workspaceSubscriptions.delete(wsId);
    }
  });
}

function handleMessage(socket, message) {
  if (!message) return;

  switch (message.type) {
    case 'workspace:join':
      joinWorkspace(socket, message.payload.workspaceId);
      break;
    case 'workspace:leave':
      leaveWorkspace(socket, message.payload.workspaceId);
      break;
    default:
      send(socket, SERVER_EVENTS.ERROR, { message: `Unknown event: ${message.type}` });
  }
}

function joinWorkspace(socket, workspaceId) {
  if (!workspaceSubscriptions.has(workspaceId)) {
    workspaceSubscriptions.set(workspaceId, new Set());
  }
  workspaceSubscriptions.get(workspaceId).add(socket.userId);
}

function leaveWorkspace(socket, workspaceId) {
  workspaceSubscriptions.get(workspaceId)?.delete(socket.userId);
}

Broadcasting to Workspace Members

js
// lib/ws-broadcast.js
import { userSockets, workspaceSubscriptions } from '../ws/connection-handler.js';
import { send } from './ws-protocol.js';

/**
 * Send a message to all connected clients subscribed to a workspace.
 * @param {string} workspaceId
 * @param {string} type - Event type
 * @param {object} payload
 * @param {string} [excludeUserId] - Don't send to this user (the one who triggered it)
 */
export function broadcastToWorkspace(workspaceId, type, payload, excludeUserId) {
  const subscribers = workspaceSubscriptions.get(workspaceId);
  if (!subscribers) return;

  for (const userId of subscribers) {
    if (userId === excludeUserId) continue;

    const sockets = userSockets.get(userId);
    if (!sockets) continue;

    for (const socket of sockets) {
      send(socket, type, payload);
    }
  }
}

/**
 * Send a message to all sockets of a specific user (all their tabs).
 */
export function sendToUser(userId, type, payload) {
  const sockets = userSockets.get(userId);
  if (!sockets) return;
  for (const socket of sockets) {
    send(socket, type, payload);
  }
}

Triggering Broadcasts from the API

When a task is created via the REST API, broadcast the event to all workspace subscribers:

js
// features/tasks/tasks.service.js
import { broadcastToWorkspace } from '../../lib/ws-broadcast.js';
import { SERVER_EVENTS } from '../../lib/ws-protocol.js';

export async function createTask(data, createdByUserId) {
  const task = await Task.create(data);
  const populated = await task.populate('createdBy assignee', 'name avatar');

  // Broadcast to all other workspace members in real time
  broadcastToWorkspace(
    data.workspaceId.toString(),
    SERVER_EVENTS.TASK_CREATED,
    populated.toObject(),
    createdByUserId  // exclude the creator — they already see it
  );

  return populated;
}

export async function updateTask(id, workspaceId, data, updatedByUserId) {
  const task = await Task.findOneAndUpdate({ _id: id, workspaceId }, data, { new: true })
    .populate('assignee', 'name avatar');

  broadcastToWorkspace(
    workspaceId.toString(),
    SERVER_EVENTS.TASK_UPDATED,
    task.toObject(),
    updatedByUserId
  );

  return task;
}

Client-Side WebSocket Hook (React)

js
// client/src/hooks/useWebSocket.js
import { useEffect, useRef, useCallback } from 'react';
import { getAccessToken } from '../lib/api';

export function useWebSocket(handlers) {
  const wsRef       = useRef(null);
  const handlersRef = useRef(handlers);
  handlersRef.current = handlers;

  const connect = useCallback(() => {
    const token = getAccessToken();
    if (!token) return;

    const ws = new WebSocket(`ws://localhost:3000?token=${token}`);
    wsRef.current = ws;

    ws.onopen = () => console.log('WebSocket connected');

    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      const handler = handlersRef.current[message.type];
      if (handler) handler(message.payload);
    };

    ws.onclose = (event) => {
      if (event.code !== 1000) {
        // Reconnect after 3 seconds (unless intentional close)
        setTimeout(connect, 3000);
      }
    };

    ws.onerror = (err) => console.error('WebSocket error:', err);
  }, []);

  useEffect(() => {
    connect();
    return () => wsRef.current?.close(1000, 'Component unmounted');
  }, [connect]);

  const send = useCallback((type, payload) => {
    if (wsRef.current?.readyState === WebSocket.OPEN) {
      wsRef.current.send(JSON.stringify({ type, payload }));
    }
  }, []);

  return { send };
}
jsx
// client/src/pages/WorkspacePage.jsx
import { useWebSocket } from '../hooks/useWebSocket';

export function WorkspacePage({ workspaceId }) {
  const [tasks, setTasks] = useState([]);

  const { send } = useWebSocket({
    'task:created': (task) => setTasks(prev => [task, ...prev]),
    'task:updated': (task) => setTasks(prev => prev.map(t => t._id === task._id ? task : t)),
    'task:deleted': ({ id }) => setTasks(prev => prev.filter(t => t._id !== id)),
  });

  useEffect(() => {
    // Subscribe to this workspace's real-time events
    send('workspace:join', { workspaceId });
    return () => send('workspace:leave', { workspaceId });
  }, [workspaceId, send]);

  // ...render tasks
}

Ping / Pong — Keeping Connections Alive

Browsers and proxies close idle WebSocket connections. Send periodic pings:

js
// ws/heartbeat.js
import { WebSocketServer } from 'ws';

export function setupHeartbeat(wss, intervalMs = 30_000) {
  const interval = setInterval(() => {
    wss.clients.forEach((socket) => {
      if (!socket.isAlive) return socket.terminate();
      socket.isAlive = false;
      socket.ping();
    });
  }, intervalMs);

  wss.on('connection', (socket) => {
    socket.isAlive = true;
    socket.on('pong', () => { socket.isAlive = true; });
  });

  wss.on('close', () => clearInterval(interval));
}
js
// server.js
import { setupHeartbeat } from './ws/heartbeat.js';
setupHeartbeat(wss);

Node.js Full‑Stack Course — Module 25 of 32

Your application now pushes real-time updates to all connected clients. Continue to Module 26 to build a complete chat application with Socket.IO.


    Summary

    WebSockets turn your API from a request-response system into a live, bidirectional communication channel:

    • Attach a WebSocketServer to the same http.createServer(app) instance as Express — one port, both protocols
    • Authenticate connections during the handshake by reading the JWT from the query string
    • Use a userSockets map (userId → Set of sockets) to track connections; support multiple tabs per user
    • Use a workspaceSubscriptions map (workspaceId → Set of userIds) to manage room-like subscriptions
    • Broadcast from service layer methods — after a DB write, call broadcastToWorkspace() to push the change
    • Use ping/pong heartbeats to detect and clean up dead connections
    • On the client, use a custom React hook that reconnects automatically on unexpected close

    Continue to Module 26: Chat App with Socket.IO →