Database Sharding and Replication: Scaling Strategies for High-Traffic Systems

TT
Database Sharding and Replication: Scaling Strategies for High-Traffic Systems

Database Sharding and Replication: Scaling Strategies for High-Traffic Systems

A single PostgreSQL instance running on the best hardware money can buy tops out around 100,000 reads/second and 10,000 writes/second under real workloads. Beyond that threshold, you need a fundamentally different architecture. Replication and sharding are the two primary tools—and understanding when to use each (and how to combine them) separates systems that scale from systems that fall over at 10× traffic.

The Scaling Spectrum

Before choosing a strategy, diagnose where the bottleneck actually is:

text
Read-heavy (>80% reads)   → Replication (add read replicas)
Write-heavy (>40% writes) → Sharding (split writes across nodes)
Both heavy                → Shard + replicate each shard
Dataset > single disk     → Sharding is required regardless
StrategyAddressesComplexityConsistency
Vertical scalingTemporary reliefLowStrong
Read replicationRead throughputMediumEventual
Multi-primaryWrite availabilityHighEventual
Range shardingWrite throughputHighStrong per-shard
Hash shardingUniform write distributionHighStrong per-shard
Consistent hashingDynamic cluster resizingVery highStrong per-shard

Read Replication: The First Step

Replication copies every write from a primary node to one or more replicas. Replicas are read-only. Most applications are 80–95% reads, so routing reads to replicas provides dramatic relief without splitting data.

Primary/Replica Routing in TypeScript

typescript
import { Pool } from 'pg';

const primary = new Pool({
  host: process.env.DB_PRIMARY_HOST,
  database: 'appdb',
  max: 20,
});

const replicas = [
  new Pool({ host: process.env.DB_REPLICA_1_HOST, database: 'appdb', max: 20 }),
  new Pool({ host: process.env.DB_REPLICA_2_HOST, database: 'appdb', max: 20 }),
];

let replicaIndex = 0;

function getReadPool(): Pool {
  // Round-robin across replicas
  const pool = replicas[replicaIndex % replicas.length];
  replicaIndex++;
  return pool;
}

export const db = {
  query: (sql: string, params?: unknown[]) => primary.query(sql, params),
  queryRead: (sql: string, params?: unknown[]) => getReadPool().query(sql, params),
};

// Usage
const users = await db.queryRead('SELECT * FROM users WHERE active = true');
await db.query('INSERT INTO users (email) VALUES ($1)', [email]);

Replication Lag: The Hidden Problem

Async replication means replicas can be milliseconds to seconds behind the primary. This creates read-your-own-writes inconsistency:

text
User writes profile update → primary confirms
User immediately reads profile → replica returns stale data
User sees their own update vanish → rage-quit

Solutions:

typescript
// 1. Always read your own writes from primary
async function getUserProfile(userId: string, justUpdated = false): Promise<User> {
  const pool = justUpdated ? primary : getReadPool();
  const result = await pool.query('SELECT * FROM users WHERE id = $1', [userId]);
  return result.rows[0];
}

// 2. Sticky sessions: send the same user to the same replica
function getReplicaForUser(userId: string): Pool {
  const index = hashUserId(userId) % replicas.length;
  return replicas[index];
}

// 3. Read after write with lag check
async function waitForReplication(expectedVersion: number): Promise<void> {
  const maxWait = 2000; // ms
  const start = Date.now();
  
  while (Date.now() - start < maxWait) {
    const result = await getReadPool().query(
      'SELECT pg_last_xact_replay_timestamp() as lag'
    );
    if (result.rows[0].version >= expectedVersion) return;
    await sleep(50);
  }
  // Fall back to primary read
}

Horizontal Sharding Strategies

Sharding partitions data across multiple independent database instances. Each shard is a complete, self-sufficient database containing a subset of the total data.

text
Without sharding:
┌─────────────────────────────┐
│  Single DB (1B rows)        │
│  Writes: 50k/s → bottleneck │
└─────────────────────────────┘

With 4 shards:
┌──────────────┐  ┌──────────────┐
│  Shard 0     │  │  Shard 1     │
│  250M rows   │  │  250M rows   │
│  12.5k/s     │  │  12.5k/s     │
└──────────────┘  └──────────────┘
┌──────────────┐  ┌──────────────┐
│  Shard 2     │  │  Shard 3     │
│  250M rows   │  │  250M rows   │
│  12.5k/s     │  │  12.5k/s     │
└──────────────┘  └──────────────┘

Range Sharding

Rows are distributed by value ranges of the shard key.

text
User IDs 1–25M        → Shard 0
User IDs 25M–50M      → Shard 1
User IDs 50M–75M      → Shard 2
User IDs 75M–100M     → Shard 3

Pros: Range queries are efficient (all January orders on one shard). Easy to reason about.
Cons: Hot spots when recent data is always on the latest shard. New user signups all hit shard 3.

typescript
class RangeShardRouter {
  private ranges = [
    { min: 0,        max: 25_000_000, shard: 0 },
    { min: 25000001, max: 50_000_000, shard: 1 },
    { min: 50000001, max: 75_000_000, shard: 2 },
    { min: 75000001, max: 100_000_000, shard: 3 },
  ];

  getShard(userId: number): number {
    const range = this.ranges.find(r => userId >= r.min && userId <= r.max);
    if (!range) throw new Error(`No shard for userId ${userId}`);
    return range.shard;
  }
}

Hash Sharding

The shard key is hashed and the result modulo the shard count determines placement.

typescript
class HashShardRouter {
  constructor(private shardCount: number) {}

  getShard(shardKey: string): number {
    const hash = this.fnv1a(shardKey);
    return hash % this.shardCount;
  }

  private fnv1a(str: string): number {
    let hash = 2166136261;
    for (const char of str) {
      hash ^= char.charCodeAt(0);
      hash = (hash * 16777619) >>> 0; // unsigned 32-bit
    }
    return hash;
  }
}

// Usage
const router = new HashShardRouter(4);
const shard = router.getShard(userId); // deterministic, uniform

Pros: Uniform distribution eliminates hot spots.
Cons: Range queries require scanning all shards. Resharding requires moving ~50% of data when adding a new shard.

Consistent Hashing

Consistent hashing places both nodes and keys on a virtual ring. Each key belongs to the nearest node clockwise. When a node is added or removed, only keys on the adjacent arc migrate.

text
          "user:abc" (hash: 120)
               ↓
    ──────────●────────── 
  /    Node A           \
 /    (tokens: 0,90)     \
●                         ●
Node D                   Node B
(tokens: 270,360)        (tokens: 180,270)
 \                       /
  \   Node C            /
   ──────────●──────────
         (tokens: 90,180)

"user:abc" hash 120 → falls between 90 and 180 → routes to Node C
typescript
class ConsistentHashRouter {
  private ring = new Map<number, string>(); // position → nodeId
  private sortedPositions: number[] = [];
  private virtualNodes = 150; // virtual nodes per physical node

  addNode(nodeId: string): void {
    for (let i = 0; i < this.virtualNodes; i++) {
      const pos = this.hash(`${nodeId}:vnode:${i}`);
      this.ring.set(pos, nodeId);
    }
    this.sortedPositions = [...this.ring.keys()].sort((a, b) => a - b);
  }

  removeNode(nodeId: string): void {
    for (let i = 0; i < this.virtualNodes; i++) {
      const pos = this.hash(`${nodeId}:vnode:${i}`);
      this.ring.delete(pos);
    }
    this.sortedPositions = [...this.ring.keys()].sort((a, b) => a - b);
  }

  getNode(key: string): string {
    const keyHash = this.hash(key);
    // Find the first position >= keyHash (wrap around)
    const pos = this.sortedPositions.find(p => p >= keyHash)
      ?? this.sortedPositions[0];
    return this.ring.get(pos)!;
  }

  private hash(input: string): number {
    // Use a real hash function (crypto.createHash) in production
    let h = 5381;
    for (const c of input) h = ((h << 5) + h) ^ c.charCodeAt(0);
    return Math.abs(h) % 360_000;
  }
}

With 150 virtual nodes per physical node, adding a new node causes only ~1/N of keys to migrate instead of ~50%.


Shard Key Selection

The shard key is the single most important architectural decision. A bad shard key causes hot spots that defeat the purpose of sharding.

Shard Key CandidateDistributionRange QueriesJoinsVerdict
user_id (hash)ExcellentScatter-gatherCross-shardGood default
tenant_id (hash)Good (per tenant)Per-tenant OKSame-tenant fineExcellent for SaaS
created_at (range)Hot writesTime-range efficientCross-shardAvoid for writes
email (hash)ExcellentNoneCross-shardOK, immutable
country (range)SkewedGeographic OKCross-shardRisky
random UUIDPerfectScatter-gatherCross-shardPoor for reads

Rules for shard key selection:

  1. High cardinality — thousands of distinct values minimum
  2. Immutable — changing a shard key requires migrating a row
  3. Even distribution — avoid keys that cluster (status, boolean flags)
  4. Co-locate related data — put all rows a single request needs on the same shard
typescript
// Bad: tenant's orders split across shards by order_id
// A tenant query must scatter to all shards
router.getShard(orderId); // ❌

// Good: tenant's orders co-located by tenant_id
// All of tenant's data on one shard
router.getShard(tenantId); // ✅

Cross-Shard Queries

The hardest part of sharding is queries that span multiple shards.

typescript
class ShardedOrderService {
  constructor(
    private pools: Pool[],
    private router: HashShardRouter
  ) {}

  // Simple: single shard lookup by shard key
  async getOrder(orderId: string, userId: string): Promise<Order> {
    const shard = this.router.getShard(userId);
    const result = await this.pools[shard].query(
      'SELECT * FROM orders WHERE id = $1 AND user_id = $2',
      [orderId, userId]
    );
    return result.rows[0];
  }

  // Hard: aggregate across all shards (scatter-gather)
  async getRevenueLastHour(): Promise<number> {
    const queries = this.pools.map(pool =>
      pool.query(
        "SELECT SUM(amount) as revenue FROM orders WHERE created_at > NOW() - INTERVAL '1 hour'"
      )
    );
    const results = await Promise.all(queries);
    return results.reduce((total, r) => total + Number(r.rows[0].revenue), 0);
  }

  // Avoid: JOIN across shards — fetch then join in application
  async getUsersWithOrders(userIds: string[]): Promise<UserWithOrders[]> {
    // Group userIds by shard
    const shardBuckets = new Map<number, string[]>();
    for (const uid of userIds) {
      const shard = this.router.getShard(uid);
      if (!shardBuckets.has(shard)) shardBuckets.set(shard, []);
      shardBuckets.get(shard)!.push(uid);
    }

    // Parallel queries per shard
    const shardResults = await Promise.all(
      [...shardBuckets.entries()].map(([shard, ids]) =>
        this.pools[shard].query(
          'SELECT u.*, json_agg(o.*) as orders FROM users u LEFT JOIN orders o ON o.user_id = u.id WHERE u.id = ANY($1) GROUP BY u.id',
          [ids]
        )
      )
    );

    return shardResults.flatMap(r => r.rows);
  }
}

Resharding: Moving Data Without Downtime

When you outgrow your current shard count, you need to split shards. This is painful—plan for 2× headroom from the start.

text
Strategy: Double-write during migration

Phase 1: Write to old shards + new shards simultaneously
Phase 2: Backfill historical data to new shards
Phase 3: Verify new shards are complete
Phase 4: Switch reads to new shards
Phase 5: Stop writes to old shards
Phase 6: Decommission old shards
typescript
class MigrationShardRouter {
  private phase: 'old' | 'double-write' | 'new' = 'old';

  getShard(key: string): number {
    if (this.phase === 'old') return this.oldRouter.getShard(key);
    if (this.phase === 'new') return this.newRouter.getShard(key);
    return this.oldRouter.getShard(key); // reads still from old during double-write
  }

  async write(key: string, sql: string, params: unknown[]): Promise<void> {
    if (this.phase === 'double-write') {
      // Write to both, don't fail on new shard errors
      await Promise.allSettled([
        this.oldPools[this.oldRouter.getShard(key)].query(sql, params),
        this.newPools[this.newRouter.getShard(key)].query(sql, params),
      ]);
    } else {
      const shard = this.getShard(key);
      const pool = this.phase === 'old' ? this.oldPools[shard] : this.newPools[shard];
      await pool.query(sql, params);
    }
  }
}

Combining Replication and Sharding

Production systems at scale use both: shard writes across nodes, replicate each shard for read throughput and high availability.

text
                    Application
                        │
              ┌─────────┼─────────┐
              â–¼         â–¼         â–¼
          Shard 0    Shard 1    Shard 2
          Primary    Primary    Primary
            │           │          │
         Replica     Replica    Replica
         Replica     Replica    Replica

Each primary handles writes for its key range. Each replica provides read throughput and serves as failover target.


Monitoring Shard Health

typescript
interface ShardMetrics {
  shardId: number;
  replicationLagMs: number;
  queryLatencyP99Ms: number;
  connectionPoolUtilization: number; // 0-1
  diskUsagePercent: number;
}

async function collectShardMetrics(pools: Pool[]): Promise<ShardMetrics[]> {
  return Promise.all(
    pools.map(async (pool, shardId) => {
      const [lag, disk] = await Promise.all([
        pool.query(`
          SELECT EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp())) * 1000 AS lag_ms
        `),
        pool.query(`
          SELECT pg_database_size(current_database()) / 1073741824.0 AS size_gb
        `),
      ]);
      return {
        shardId,
        replicationLagMs: lag.rows[0].lag_ms ?? 0,
        queryLatencyP99Ms: 0, // pulled from APM
        connectionPoolUtilization: pool.totalCount / pool.options.max!,
        diskUsagePercent: (lag.rows[0].size_gb / 500) * 100, // 500GB max per shard
      };
    })
  );
}

Alert thresholds worth setting:

MetricWarningCritical
Replication lag> 1 second> 10 seconds
Disk usage> 70%> 85%
Connection pool> 80%> 95%
Cross-shard query rate> 5% of queries> 15%
Shard skew (max/min rows)> 1.5×> 3×

Frequently Asked Questions

Q: When should I shard vs. use a distributed database like CockroachDB or PlanetScale?

Managed distributed databases handle sharding transparently but add latency overhead (20–50ms per query in some configurations) and significant cost. Self-managed sharding gives you full control but requires operational expertise. Start with read replicas and a single primary. Consider managed distributed databases when you need >100k writes/second or multi-region active-active. Implement custom sharding only when managed solutions are cost-prohibitive or when you need extreme control over data placement.

Q: Can I shard an existing production database without downtime?

Yes, with the double-write migration pattern. You run both old and new shards simultaneously for days or weeks, backfill historical data in background batches, verify checksums match, then flip reads to the new topology. The critical constraint is that your application must tolerate writing to two places at once. Budget 4–8 weeks of engineering time for a production sharding migration on a system with >100GB of data.

Q: How many shards should I start with?

Start with a power of 2 (4, 8, 16) to make future doubling clean. Estimate your 3-year data growth and size shards for that target, not today's data. A common mistake is starting with too few shards and resharding 18 months later under production pressure. At 100GB today with 50% YoY growth, sizing for 16 shards at 150GB each gives comfortable headroom to 2.4TB before another reshard.

Q: What happens to transactions that span multiple shards?

Single-shard transactions work identically to a non-sharded database—full ACID guarantees. Cross-shard transactions require distributed transactions (2PC/XA), which are slow and complex, or application-level compensation (saga pattern). The best solution is to design your data model and shard key so that business operations never need cross-shard transactions. If a feature consistently requires cross-shard writes, that is a signal that your shard key is wrong.