Java Project: Designing an In-Memory Key-Value Store

TT
Java Project: Designing an In-Memory Key-Value Store

Java Project: Designing an In-Memory Key-Value Store

Building an in-memory key-value store is one of the best Java projects for learning concurrency, data structures, and systems design. It exercises ConcurrentHashMap, ReadWriteLock, TTL-based expiry, LRU eviction with LinkedHashMap, and I/O for persistence. The result is a simplified Redis—a real production project with production-level concerns.


Requirements and Design

text
Core operations:
  GET key            → value or null
  SET key value      → OK
  SET key value EX s → OK (with TTL in seconds)
  DEL key            → 1 (deleted) or 0 (not found)
  EXISTS key         → 1 or 0
  KEYS pattern       → list of matching keys
  TTL key            → remaining seconds (-1 = no expiry, -2 = not found)

Advanced:
  INCR key           → increment integer value atomically
  APPEND key value   → append to string, return new length
  MGET key1 key2...  → batch get
  MSET k1 v1 k2 v2  → batch set

Memory management:
  maxMemoryMb config → reject SET when over limit (or evict with LRU)
  TTL background expiry → lazy expiry on GET + active background sweep

Persistence:
  SAVE              → write snapshot to disk
  BGSAVE            → async snapshot
  Load from snapshot on startup

Core Data Model

java
// ValueEntry.java
package com.kvstore;

import java.time.Instant;

public class ValueEntry {
    private final String value;
    private final long expiresAt;   // epoch millis, -1 = no expiry
    private volatile long lastAccessedAt;

    public ValueEntry(String value, long ttlSeconds) {
        this.value = value;
        this.expiresAt = ttlSeconds > 0
            ? Instant.now().toEpochMilli() + (ttlSeconds * 1000)
            : -1;
        this.lastAccessedAt = Instant.now().toEpochMilli();
    }

    public String getValue() {
        lastAccessedAt = Instant.now().toEpochMilli();
        return value;
    }

    public boolean isExpired() {
        return expiresAt != -1 && Instant.now().toEpochMilli() > expiresAt;
    }

    public long getTtlSeconds() {
        if (expiresAt == -1) return -1;
        long remaining = (expiresAt - Instant.now().toEpochMilli()) / 1000;
        return Math.max(0, remaining);
    }

    public long getLastAccessedAt() {
        return lastAccessedAt;
    }
}

Thread-Safe Store Implementation

java
// KeyValueStore.java
package com.kvstore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class KeyValueStore {
    private static final Logger log = LoggerFactory.getLogger(KeyValueStore.class);

    private final ConcurrentHashMap<String, ValueEntry> store = new ConcurrentHashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ScheduledExecutorService expirySweeper;
    private final AtomicLong memoryEstimateBytes = new AtomicLong();
    private final long maxMemoryBytes;

    public KeyValueStore(long maxMemoryMb) {
        this.maxMemoryBytes = maxMemoryMb * 1024 * 1024;
        this.expirySweeper = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "expiry-sweeper");
            t.setDaemon(true);
            return t;
        });
        // Active expiry sweep every 100ms
        expirySweeper.scheduleAtFixedRate(this::sweepExpired, 100, 100, TimeUnit.MILLISECONDS);
    }

    // SET
    public void set(String key, String value, long ttlSeconds) {
        lock.writeLock().lock();
        try {
            if (memoryEstimateBytes.get() > maxMemoryBytes) {
                evictLRU();
            }
            ValueEntry entry = new ValueEntry(value, ttlSeconds);
            ValueEntry old = store.put(key, entry);
            // Update memory estimate (rough: 2 bytes/char + overhead)
            if (old != null) {
                memoryEstimateBytes.addAndGet(
                    -(old.getValue().length() * 2L + key.length() * 2L + 64)
                );
            }
            memoryEstimateBytes.addAndGet(value.length() * 2L + key.length() * 2L + 64);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void set(String key, String value) {
        set(key, value, -1);
    }

    // GET (lazy expiry)
    public Optional<String> get(String key) {
        lock.readLock().lock();
        try {
            ValueEntry entry = store.get(key);
            if (entry == null) return Optional.empty();
            if (entry.isExpired()) {
                // Upgrade to write lock to delete
                lock.readLock().unlock();
                lock.writeLock().lock();
                try {
                    // Re-check after acquiring write lock (double-checked locking)
                    entry = store.get(key);
                    if (entry != null && entry.isExpired()) {
                        store.remove(key);
                    }
                    return Optional.empty();
                } finally {
                    lock.readLock().lock();
                    lock.writeLock().unlock();
                }
            }
            return Optional.of(entry.getValue());
        } finally {
            lock.readLock().unlock();
        }
    }

    // DEL
    public int del(String... keys) {
        lock.writeLock().lock();
        try {
            int count = 0;
            for (String key : keys) {
                if (store.remove(key) != null) count++;
            }
            return count;
        } finally {
            lock.writeLock().unlock();
        }
    }

    // EXISTS
    public boolean exists(String key) {
        return get(key).isPresent();
    }

    // TTL
    public long ttl(String key) {
        lock.readLock().lock();
        try {
            ValueEntry entry = store.get(key);
            if (entry == null) return -2; // key not found
            if (entry.isExpired()) return -2;
            return entry.getTtlSeconds();
        } finally {
            lock.readLock().unlock();
        }
    }

    // INCR — atomic increment
    public long incr(String key) {
        lock.writeLock().lock();
        try {
            ValueEntry entry = store.get(key);
            long current = entry != null && !entry.isExpired()
                ? Long.parseLong(entry.getValue())
                : 0L;
            long newValue = current + 1;
            store.put(key, new ValueEntry(String.valueOf(newValue), -1));
            return newValue;
        } catch (NumberFormatException e) {
            throw new IllegalStateException("Value is not an integer");
        } finally {
            lock.writeLock().unlock();
        }
    }

    // KEYS with glob pattern (* and ?)
    public List<String> keys(String pattern) {
        lock.readLock().lock();
        try {
            String regex = pattern.replace(".", "\\.")
                                  .replace("*", ".*")
                                  .replace("?", ".");
            Pattern compiled = Pattern.compile("^" + regex + "$");
            return store.entrySet().stream()
                .filter(e -> !e.getValue().isExpired())
                .filter(e -> compiled.matcher(e.getKey()).matches())
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
        } finally {
            lock.readLock().unlock();
        }
    }

    // MGET
    public List<Optional<String>> mget(String... keys) {
        return Arrays.stream(keys).map(this::get).toList();
    }

    // MSET
    public void mset(Map<String, String> entries) {
        entries.forEach(this::set);
    }

    // Active expiry sweep
    private void sweepExpired() {
        store.entrySet().removeIf(e -> e.getValue().isExpired());
    }

    // LRU eviction: remove the least recently accessed 10% of keys
    private void evictLRU() {
        int evictCount = Math.max(1, store.size() / 10);
        store.entrySet().stream()
            .sorted(Comparator.comparingLong(e -> e.getValue().getLastAccessedAt()))
            .limit(evictCount)
            .map(Map.Entry::getKey)
            .forEach(store::remove);
        log.info("LRU eviction: removed {} keys", evictCount);
    }

    public int size() {
        return store.size();
    }

    public void shutdown() {
        expirySweeper.shutdown();
    }
}

Persistence: Snapshot and Restore

java
// SnapshotManager.java
package com.kvstore;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.*;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SnapshotManager {
    private static final Logger log = LoggerFactory.getLogger(SnapshotManager.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    
    private final Path snapshotPath;
    private final ExecutorService bgExecutor = Executors.newSingleThreadExecutor(
        r -> new Thread(r, "snapshot-writer")
    );

    public SnapshotManager(String path) {
        this.snapshotPath = Path.of(path);
    }

    // Synchronous snapshot (SAVE command)
    public void save(Map<String, SnapshotEntry> entries) throws IOException {
        Path tmp = snapshotPath.resolveSibling(snapshotPath.getFileName() + ".tmp");
        mapper.writeValue(tmp.toFile(), entries);
        Files.move(tmp, snapshotPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
        log.info("Snapshot saved: {} entries to {}", entries.size(), snapshotPath);
    }

    // Async snapshot (BGSAVE command)
    public CompletableFuture<Void> bgSave(Map<String, SnapshotEntry> snapshot) {
        return CompletableFuture.runAsync(() -> {
            try {
                save(snapshot);
            } catch (IOException e) {
                log.error("Background save failed", e);
            }
        }, bgExecutor);
    }

    // Load on startup
    @SuppressWarnings("unchecked")
    public Map<String, SnapshotEntry> load() throws IOException {
        if (!Files.exists(snapshotPath)) return Map.of();
        return mapper.readValue(snapshotPath.toFile(),
            mapper.getTypeFactory().constructMapType(Map.class, String.class, SnapshotEntry.class));
    }

    public record SnapshotEntry(String value, long expiresAt) {}
}

Command Protocol (RESP-inspired)

java
// CommandProcessor.java
package com.kvstore;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class CommandProcessor {
    private final KeyValueStore store;

    public CommandProcessor(KeyValueStore store) {
        this.store = store;
    }

    public String process(String commandLine) {
        String[] parts = commandLine.trim().split("\\s+");
        if (parts.length == 0) return "-ERR empty command\r\n";
        
        String cmd = parts[0].toUpperCase();
        
        try {
            return switch (cmd) {
                case "GET"    -> handleGet(parts);
                case "SET"    -> handleSet(parts);
                case "DEL"    -> handleDel(parts);
                case "EXISTS" -> handleExists(parts);
                case "TTL"    -> handleTtl(parts);
                case "INCR"   -> handleIncr(parts);
                case "KEYS"   -> handleKeys(parts);
                case "MGET"   -> handleMget(parts);
                case "DBSIZE" -> ":" + store.size() + "\r\n";
                case "PING"   -> "+PONG\r\n";
                case "QUIT"   -> "+OK\r\n";
                default -> "-ERR unknown command '" + cmd + "'\r\n";
            };
        } catch (Exception e) {
            return "-ERR " + e.getMessage() + "\r\n";
        }
    }

    private String handleGet(String[] parts) {
        if (parts.length != 2) return "-ERR wrong number of arguments\r\n";
        Optional<String> value = store.get(parts[1]);
        return value.map(v -> "$" + v.length() + "\r\n" + v + "\r\n")
                    .orElse("$-1\r\n");  // nil bulk string
    }

    private String handleSet(String[] parts) {
        if (parts.length < 3) return "-ERR wrong number of arguments\r\n";
        String key = parts[1];
        String value = parts[2];
        long ttl = -1;
        
        if (parts.length >= 5 && parts[3].equalsIgnoreCase("EX")) {
            ttl = Long.parseLong(parts[4]);
        }
        
        store.set(key, value, ttl);
        return "+OK\r\n";
    }

    private String handleDel(String[] parts) {
        if (parts.length < 2) return "-ERR wrong number of arguments\r\n";
        String[] keys = Arrays.copyOfRange(parts, 1, parts.length);
        int count = store.del(keys);
        return ":" + count + "\r\n";
    }

    private String handleExists(String[] parts) {
        if (parts.length != 2) return "-ERR wrong number of arguments\r\n";
        return ":" + (store.exists(parts[1]) ? 1 : 0) + "\r\n";
    }

    private String handleTtl(String[] parts) {
        if (parts.length != 2) return "-ERR wrong number of arguments\r\n";
        return ":" + store.ttl(parts[1]) + "\r\n";
    }

    private String handleIncr(String[] parts) {
        if (parts.length != 2) return "-ERR wrong number of arguments\r\n";
        return ":" + store.incr(parts[1]) + "\r\n";
    }

    private String handleKeys(String[] parts) {
        String pattern = parts.length > 1 ? parts[1] : "*";
        List<String> keys = store.keys(pattern);
        StringBuilder sb = new StringBuilder("*" + keys.size() + "\r\n");
        for (String key : keys) {
            sb.append("$").append(key.length()).append("\r\n").append(key).append("\r\n");
        }
        return sb.toString();
    }

    private String handleMget(String[] parts) {
        if (parts.length < 2) return "-ERR wrong number of arguments\r\n";
        String[] keys = Arrays.copyOfRange(parts, 1, parts.length);
        List<Optional<String>> values = store.mget(keys);
        StringBuilder sb = new StringBuilder("*" + values.size() + "\r\n");
        for (Optional<String> v : values) {
            if (v.isPresent()) {
                sb.append("$").append(v.get().length()).append("\r\n").append(v.get()).append("\r\n");
            } else {
                sb.append("$-1\r\n");
            }
        }
        return sb.toString();
    }
}

Running and Testing

java
// Basic usage test
public class Main {
    public static void main(String[] args) {
        KeyValueStore store = new KeyValueStore(256); // 256MB limit
        CommandProcessor processor = new CommandProcessor(store);

        // Test commands
        System.out.println(processor.process("SET user:1 Alice"));    // +OK
        System.out.println(processor.process("GET user:1"));          // $5\r\nAlice
        System.out.println(processor.process("SET session:abc tok EX 60")); // +OK (60s TTL)
        System.out.println(processor.process("TTL session:abc"));     // :60
        System.out.println(processor.process("INCR visits"));         // :1
        System.out.println(processor.process("INCR visits"));         // :2
        System.out.println(processor.process("KEYS user:*"));         // *1\r\n$6\r\nuser:1
        System.out.println(processor.process("DBSIZE"));              // :3

        store.shutdown();
    }
}
text
Performance characteristics:
  GET/SET:    ~2-5M operations/second (single-threaded)
  Concurrent: ~500k operations/second (ReadWriteLock contention under load)
  For higher concurrent throughput: use ConcurrentHashMap.compute() 
  instead of ReadWriteLock, or stripe locks across key hash buckets

Frequently Asked Questions

Q: Why use ReadWriteLock instead of synchronized or ConcurrentHashMap alone?

ConcurrentHashMap provides thread-safe individual operations but not atomic compound operations. KEYS * must iterate the entire map consistently—if another thread is inserting or deleting during iteration, results are non-deterministic. ReadWriteLock allows multiple concurrent readers (GET, TTL, EXISTS) while ensuring exclusive access for writers (SET, DEL, INCR). This is significantly better than synchronized which serializes all access. For production, consider striped locking (Google Guava's Striped<Lock>) which partitions locks by key hash, dramatically reducing contention.

Q: How does Redis handle TTL expiry more efficiently than a background sweep?

Redis uses a combination of lazy expiry (check on access) and probabilistic active expiry. The active expiry picks 20 random keys from the TTL index every 100ms; if > 25% were expired, it repeats immediately. This keeps expired keys from consuming memory without the cost of a full scan. For higher fidelity, maintain a sorted set of (expiryTime, key) pairs (a TreeMap<Long, Set<String>> in Java) and the sweeper only checks keys whose expiry time has passed.

Q: How would you add persistence (append-only log) like Redis AOF?

An Append-Only File logs every write command to a file. On startup, replay the log to reconstruct state. Implementation: open a FileOutputStream in append mode; after every SET/DEL/INCR command, write the command string to the file. For crash safety, call channel.force(false) after each write (fsync). Periodically compact the AOF by writing a new file that contains only the minimal SET commands to reconstruct current state (removing superseded or expired entries).

Q: What's the difference between this approach and using ConcurrentHashMap.compute() for atomic operations?

ConcurrentHashMap.compute(key, fn) locks only the bucket containing the key—it's much more granular than a ReadWriteLock over the whole map. For INCR, compute is ideal: store.compute(key, (k, existing) -> new ValueEntry(String.valueOf(parse(existing) + 1), -1)). The downside: compute doesn't give you a consistent view of the entire map, so operations like KEYS * still need coordination. A practical hybrid: use ConcurrentHashMap with compute for single-key writes, and a separate read lock only for full-map scans.