Database Sharding and Replication: Scaling Strategies for High-Traffic Systems

Database Sharding and Replication: Scaling Strategies for High-Traffic Systems
A single PostgreSQL instance running on the best hardware money can buy tops out around 100,000 reads/second and 10,000 writes/second under real workloads. Beyond that threshold, you need a fundamentally different architecture. Replication and sharding are the two primary tools—and understanding when to use each (and how to combine them) separates systems that scale from systems that fall over at 10× traffic.
The Scaling Spectrum
Before choosing a strategy, diagnose where the bottleneck actually is:
Read-heavy (>80% reads) → Replication (add read replicas)
Write-heavy (>40% writes) → Sharding (split writes across nodes)
Both heavy → Shard + replicate each shard
Dataset > single disk → Sharding is required regardless| Strategy | Addresses | Complexity | Consistency |
|---|---|---|---|
| Vertical scaling | Temporary relief | Low | Strong |
| Read replication | Read throughput | Medium | Eventual |
| Multi-primary | Write availability | High | Eventual |
| Range sharding | Write throughput | High | Strong per-shard |
| Hash sharding | Uniform write distribution | High | Strong per-shard |
| Consistent hashing | Dynamic cluster resizing | Very high | Strong per-shard |
Read Replication: The First Step
Replication copies every write from a primary node to one or more replicas. Replicas are read-only. Most applications are 80–95% reads, so routing reads to replicas provides dramatic relief without splitting data.
Primary/Replica Routing in TypeScript
import { Pool } from 'pg';
const primary = new Pool({
host: process.env.DB_PRIMARY_HOST,
database: 'appdb',
max: 20,
});
const replicas = [
new Pool({ host: process.env.DB_REPLICA_1_HOST, database: 'appdb', max: 20 }),
new Pool({ host: process.env.DB_REPLICA_2_HOST, database: 'appdb', max: 20 }),
];
let replicaIndex = 0;
function getReadPool(): Pool {
// Round-robin across replicas
const pool = replicas[replicaIndex % replicas.length];
replicaIndex++;
return pool;
}
export const db = {
query: (sql: string, params?: unknown[]) => primary.query(sql, params),
queryRead: (sql: string, params?: unknown[]) => getReadPool().query(sql, params),
};
// Usage
const users = await db.queryRead('SELECT * FROM users WHERE active = true');
await db.query('INSERT INTO users (email) VALUES ($1)', [email]);Replication Lag: The Hidden Problem
Async replication means replicas can be milliseconds to seconds behind the primary. This creates read-your-own-writes inconsistency:
User writes profile update → primary confirms
User immediately reads profile → replica returns stale data
User sees their own update vanish → rage-quitSolutions:
// 1. Always read your own writes from primary
async function getUserProfile(userId: string, justUpdated = false): Promise<User> {
const pool = justUpdated ? primary : getReadPool();
const result = await pool.query('SELECT * FROM users WHERE id = $1', [userId]);
return result.rows[0];
}
// 2. Sticky sessions: send the same user to the same replica
function getReplicaForUser(userId: string): Pool {
const index = hashUserId(userId) % replicas.length;
return replicas[index];
}
// 3. Read after write with lag check
async function waitForReplication(expectedVersion: number): Promise<void> {
const maxWait = 2000; // ms
const start = Date.now();
while (Date.now() - start < maxWait) {
const result = await getReadPool().query(
'SELECT pg_last_xact_replay_timestamp() as lag'
);
if (result.rows[0].version >= expectedVersion) return;
await sleep(50);
}
// Fall back to primary read
}Horizontal Sharding Strategies
Sharding partitions data across multiple independent database instances. Each shard is a complete, self-sufficient database containing a subset of the total data.
Without sharding:
┌─────────────────────────────â”
│ Single DB (1B rows) │
│ Writes: 50k/s → bottleneck │
└─────────────────────────────┘
With 4 shards:
┌──────────────┠┌──────────────â”
│ Shard 0 │ │ Shard 1 │
│ 250M rows │ │ 250M rows │
│ 12.5k/s │ │ 12.5k/s │
└──────────────┘ └──────────────┘
┌──────────────┠┌──────────────â”
│ Shard 2 │ │ Shard 3 │
│ 250M rows │ │ 250M rows │
│ 12.5k/s │ │ 12.5k/s │
└──────────────┘ └──────────────┘Range Sharding
Rows are distributed by value ranges of the shard key.
User IDs 1–25M → Shard 0
User IDs 25M–50M → Shard 1
User IDs 50M–75M → Shard 2
User IDs 75M–100M → Shard 3Pros: Range queries are efficient (all January orders on one shard). Easy to reason about.
Cons: Hot spots when recent data is always on the latest shard. New user signups all hit shard 3.
class RangeShardRouter {
private ranges = [
{ min: 0, max: 25_000_000, shard: 0 },
{ min: 25000001, max: 50_000_000, shard: 1 },
{ min: 50000001, max: 75_000_000, shard: 2 },
{ min: 75000001, max: 100_000_000, shard: 3 },
];
getShard(userId: number): number {
const range = this.ranges.find(r => userId >= r.min && userId <= r.max);
if (!range) throw new Error(`No shard for userId ${userId}`);
return range.shard;
}
}Hash Sharding
The shard key is hashed and the result modulo the shard count determines placement.
class HashShardRouter {
constructor(private shardCount: number) {}
getShard(shardKey: string): number {
const hash = this.fnv1a(shardKey);
return hash % this.shardCount;
}
private fnv1a(str: string): number {
let hash = 2166136261;
for (const char of str) {
hash ^= char.charCodeAt(0);
hash = (hash * 16777619) >>> 0; // unsigned 32-bit
}
return hash;
}
}
// Usage
const router = new HashShardRouter(4);
const shard = router.getShard(userId); // deterministic, uniformPros: Uniform distribution eliminates hot spots.
Cons: Range queries require scanning all shards. Resharding requires moving ~50% of data when adding a new shard.
Consistent Hashing
Consistent hashing places both nodes and keys on a virtual ring. Each key belongs to the nearest node clockwise. When a node is added or removed, only keys on the adjacent arc migrate.
"user:abc" (hash: 120)
↓
──────────â—──────────
/ Node A \
/ (tokens: 0,90) \
â— â—
Node D Node B
(tokens: 270,360) (tokens: 180,270)
\ /
\ Node C /
──────────â—──────────
(tokens: 90,180)
"user:abc" hash 120 → falls between 90 and 180 → routes to Node Cclass ConsistentHashRouter {
private ring = new Map<number, string>(); // position → nodeId
private sortedPositions: number[] = [];
private virtualNodes = 150; // virtual nodes per physical node
addNode(nodeId: string): void {
for (let i = 0; i < this.virtualNodes; i++) {
const pos = this.hash(`${nodeId}:vnode:${i}`);
this.ring.set(pos, nodeId);
}
this.sortedPositions = [...this.ring.keys()].sort((a, b) => a - b);
}
removeNode(nodeId: string): void {
for (let i = 0; i < this.virtualNodes; i++) {
const pos = this.hash(`${nodeId}:vnode:${i}`);
this.ring.delete(pos);
}
this.sortedPositions = [...this.ring.keys()].sort((a, b) => a - b);
}
getNode(key: string): string {
const keyHash = this.hash(key);
// Find the first position >= keyHash (wrap around)
const pos = this.sortedPositions.find(p => p >= keyHash)
?? this.sortedPositions[0];
return this.ring.get(pos)!;
}
private hash(input: string): number {
// Use a real hash function (crypto.createHash) in production
let h = 5381;
for (const c of input) h = ((h << 5) + h) ^ c.charCodeAt(0);
return Math.abs(h) % 360_000;
}
}With 150 virtual nodes per physical node, adding a new node causes only ~1/N of keys to migrate instead of ~50%.
Shard Key Selection
The shard key is the single most important architectural decision. A bad shard key causes hot spots that defeat the purpose of sharding.
| Shard Key Candidate | Distribution | Range Queries | Joins | Verdict |
|---|---|---|---|---|
user_id (hash) | Excellent | Scatter-gather | Cross-shard | Good default |
tenant_id (hash) | Good (per tenant) | Per-tenant OK | Same-tenant fine | Excellent for SaaS |
created_at (range) | Hot writes | Time-range efficient | Cross-shard | Avoid for writes |
email (hash) | Excellent | None | Cross-shard | OK, immutable |
country (range) | Skewed | Geographic OK | Cross-shard | Risky |
random UUID | Perfect | Scatter-gather | Cross-shard | Poor for reads |
Rules for shard key selection:
- High cardinality — thousands of distinct values minimum
- Immutable — changing a shard key requires migrating a row
- Even distribution — avoid keys that cluster (status, boolean flags)
- Co-locate related data — put all rows a single request needs on the same shard
// Bad: tenant's orders split across shards by order_id
// A tenant query must scatter to all shards
router.getShard(orderId); // âŒ
// Good: tenant's orders co-located by tenant_id
// All of tenant's data on one shard
router.getShard(tenantId); // ✅Cross-Shard Queries
The hardest part of sharding is queries that span multiple shards.
class ShardedOrderService {
constructor(
private pools: Pool[],
private router: HashShardRouter
) {}
// Simple: single shard lookup by shard key
async getOrder(orderId: string, userId: string): Promise<Order> {
const shard = this.router.getShard(userId);
const result = await this.pools[shard].query(
'SELECT * FROM orders WHERE id = $1 AND user_id = $2',
[orderId, userId]
);
return result.rows[0];
}
// Hard: aggregate across all shards (scatter-gather)
async getRevenueLastHour(): Promise<number> {
const queries = this.pools.map(pool =>
pool.query(
"SELECT SUM(amount) as revenue FROM orders WHERE created_at > NOW() - INTERVAL '1 hour'"
)
);
const results = await Promise.all(queries);
return results.reduce((total, r) => total + Number(r.rows[0].revenue), 0);
}
// Avoid: JOIN across shards — fetch then join in application
async getUsersWithOrders(userIds: string[]): Promise<UserWithOrders[]> {
// Group userIds by shard
const shardBuckets = new Map<number, string[]>();
for (const uid of userIds) {
const shard = this.router.getShard(uid);
if (!shardBuckets.has(shard)) shardBuckets.set(shard, []);
shardBuckets.get(shard)!.push(uid);
}
// Parallel queries per shard
const shardResults = await Promise.all(
[...shardBuckets.entries()].map(([shard, ids]) =>
this.pools[shard].query(
'SELECT u.*, json_agg(o.*) as orders FROM users u LEFT JOIN orders o ON o.user_id = u.id WHERE u.id = ANY($1) GROUP BY u.id',
[ids]
)
)
);
return shardResults.flatMap(r => r.rows);
}
}Resharding: Moving Data Without Downtime
When you outgrow your current shard count, you need to split shards. This is painful—plan for 2× headroom from the start.
Strategy: Double-write during migration
Phase 1: Write to old shards + new shards simultaneously
Phase 2: Backfill historical data to new shards
Phase 3: Verify new shards are complete
Phase 4: Switch reads to new shards
Phase 5: Stop writes to old shards
Phase 6: Decommission old shardsclass MigrationShardRouter {
private phase: 'old' | 'double-write' | 'new' = 'old';
getShard(key: string): number {
if (this.phase === 'old') return this.oldRouter.getShard(key);
if (this.phase === 'new') return this.newRouter.getShard(key);
return this.oldRouter.getShard(key); // reads still from old during double-write
}
async write(key: string, sql: string, params: unknown[]): Promise<void> {
if (this.phase === 'double-write') {
// Write to both, don't fail on new shard errors
await Promise.allSettled([
this.oldPools[this.oldRouter.getShard(key)].query(sql, params),
this.newPools[this.newRouter.getShard(key)].query(sql, params),
]);
} else {
const shard = this.getShard(key);
const pool = this.phase === 'old' ? this.oldPools[shard] : this.newPools[shard];
await pool.query(sql, params);
}
}
}Combining Replication and Sharding
Production systems at scale use both: shard writes across nodes, replicate each shard for read throughput and high availability.
Application
│
┌─────────┼─────────â”
â–¼ â–¼ â–¼
Shard 0 Shard 1 Shard 2
Primary Primary Primary
│ │ │
Replica Replica Replica
Replica Replica ReplicaEach primary handles writes for its key range. Each replica provides read throughput and serves as failover target.
Monitoring Shard Health
interface ShardMetrics {
shardId: number;
replicationLagMs: number;
queryLatencyP99Ms: number;
connectionPoolUtilization: number; // 0-1
diskUsagePercent: number;
}
async function collectShardMetrics(pools: Pool[]): Promise<ShardMetrics[]> {
return Promise.all(
pools.map(async (pool, shardId) => {
const [lag, disk] = await Promise.all([
pool.query(`
SELECT EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp())) * 1000 AS lag_ms
`),
pool.query(`
SELECT pg_database_size(current_database()) / 1073741824.0 AS size_gb
`),
]);
return {
shardId,
replicationLagMs: lag.rows[0].lag_ms ?? 0,
queryLatencyP99Ms: 0, // pulled from APM
connectionPoolUtilization: pool.totalCount / pool.options.max!,
diskUsagePercent: (lag.rows[0].size_gb / 500) * 100, // 500GB max per shard
};
})
);
}Alert thresholds worth setting:
| Metric | Warning | Critical |
|---|---|---|
| Replication lag | > 1 second | > 10 seconds |
| Disk usage | > 70% | > 85% |
| Connection pool | > 80% | > 95% |
| Cross-shard query rate | > 5% of queries | > 15% |
| Shard skew (max/min rows) | > 1.5× | > 3× |
Frequently Asked Questions
Q: When should I shard vs. use a distributed database like CockroachDB or PlanetScale?
Managed distributed databases handle sharding transparently but add latency overhead (20–50ms per query in some configurations) and significant cost. Self-managed sharding gives you full control but requires operational expertise. Start with read replicas and a single primary. Consider managed distributed databases when you need >100k writes/second or multi-region active-active. Implement custom sharding only when managed solutions are cost-prohibitive or when you need extreme control over data placement.
Q: Can I shard an existing production database without downtime?
Yes, with the double-write migration pattern. You run both old and new shards simultaneously for days or weeks, backfill historical data in background batches, verify checksums match, then flip reads to the new topology. The critical constraint is that your application must tolerate writing to two places at once. Budget 4–8 weeks of engineering time for a production sharding migration on a system with >100GB of data.
Q: How many shards should I start with?
Start with a power of 2 (4, 8, 16) to make future doubling clean. Estimate your 3-year data growth and size shards for that target, not today's data. A common mistake is starting with too few shards and resharding 18 months later under production pressure. At 100GB today with 50% YoY growth, sizing for 16 shards at 150GB each gives comfortable headroom to 2.4TB before another reshard.
Q: What happens to transactions that span multiple shards?
Single-shard transactions work identically to a non-sharded database—full ACID guarantees. Cross-shard transactions require distributed transactions (2PC/XA), which are slow and complex, or application-level compensation (saga pattern). The best solution is to design your data model and shard key so that business operations never need cross-shard transactions. If a feature consistently requires cross-shard writes, that is a signal that your shard key is wrong.
