Java CompletableFuture: Mastering Async Callbacks

TT
Java CompletableFuture: Mastering Async Callbacks

Java CompletableFuture: Mastering Async Callbacks

Before CompletableFuture (Java 8), async code in Java required Future.get() which blocked the calling thread, or nested callbacks that produced unreadable code. CompletableFuture enables non-blocking composition: chain async operations, combine multiple futures, handle errors in-line, and set timeouts—all without blocking a thread while waiting.


The Problem with Blocking Futures

java
// Java 5 Future: get() blocks until complete
ExecutorService executor = Executors.newFixedThreadPool(10);
Future<String> future = executor.submit(() -> fetchUserFromDB(userId));
String user = future.get(); // ← BLOCKS calling thread

If 1,000 requests each block one thread waiting for a DB response, you need 1,000 threads—expensive and doesn't scale. CompletableFuture runs tasks asynchronously and composes results without blocking.


Creating CompletableFutures

java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// Run task asynchronously using ForkJoinPool.commonPool() (default)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return fetchUserFromDB(userId); // runs in background thread
});

// Run with a custom executor (preferred in production)
ExecutorService executor = Executors.newFixedThreadPool(20);
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(
    () -> fetchUserFromDB(userId),
    executor
);

// Already completed (useful for testing)
CompletableFuture<String> done = CompletableFuture.completedFuture("result");

// Fire and forget (no return value)
CompletableFuture<Void> run = CompletableFuture.runAsync(
    () -> sendAnalyticsEvent(event),
    executor
);

Always use a custom executor in production. The default ForkJoinPool.commonPool() is shared across the entire JVM—if your async tasks block (DB calls, HTTP requests), they starve other users of the common pool.


Chaining Operations

java
// thenApply: transform result (sync, runs in same thread)
CompletableFuture<String> nameFuture = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .thenApply(user -> user.getName());  // transforms User → String

// thenApplyAsync: transform result (async, runs in executor)
CompletableFuture<String> asyncName = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .thenApplyAsync(user -> expensiveTransform(user), executor);

// thenCompose: flatMap — when the transformation itself is async
// Use this when the next step returns a CompletableFuture
CompletableFuture<Order> orderFuture = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .thenCompose(user -> fetchLatestOrder(user.getId()));
// ↑ avoids CompletableFuture<CompletableFuture<Order>>

// thenAccept: consume result, no return
CompletableFuture<Void> logFuture = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .thenAccept(user -> log.info("Fetched user: {}", user.getId()));

// thenRun: run after completion, no input or output
CompletableFuture<Void> notify = future
    .thenRun(() -> sendCompletionNotification());

Full Pipeline Example

java
public CompletableFuture<OrderConfirmation> processOrder(String userId, Cart cart) {
    return CompletableFuture
        // Step 1: fetch user
        .supplyAsync(() -> userService.findById(userId), executor)
        // Step 2: validate credit (async DB call)
        .thenCompose(user -> creditService.checkCredit(user, cart.getTotal()))
        // Step 3: transform credit result into an order
        .thenApply(creditResult -> orderFactory.create(creditResult, cart))
        // Step 4: persist order (async DB write)
        .thenCompose(order -> orderRepository.save(order))
        // Step 5: send confirmation (fire and don't wait)
        .thenApply(savedOrder -> {
            notificationService.sendConfirmation(savedOrder);
            return new OrderConfirmation(savedOrder.getId(), savedOrder.getTotal());
        });
}

Combining Multiple Futures

java
// allOf: wait for ALL to complete (no result — check each future individually)
CompletableFuture<User> userFuture = fetchUser(userId);
CompletableFuture<List<Order>> ordersFuture = fetchOrders(userId);
CompletableFuture<Preferences> prefsFuture = fetchPreferences(userId);

CompletableFuture<Void> all = CompletableFuture.allOf(userFuture, ordersFuture, prefsFuture);
all.thenRun(() -> {
    User user = userFuture.join();       // join() = get() but unchecked exception
    List<Order> orders = ordersFuture.join();
    Preferences prefs = prefsFuture.join();
    buildDashboard(user, orders, prefs);
});

// Typed helper for collecting results (common pattern)
public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
    return CompletableFuture
        .allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream().map(CompletableFuture::join).toList());
}

// Usage
List<CompletableFuture<UserProfile>> profileFutures = userIds.stream()
    .map(id -> fetchProfile(id))
    .toList();
CompletableFuture<List<UserProfile>> allProfiles = allOf(profileFutures);

// anyOf: complete when THE FIRST one completes (result is Object — cast needed)
CompletableFuture<Object> fastest = CompletableFuture.anyOf(
    fetchFromCache(key),
    fetchFromDB(key),
    fetchFromAPI(key)
);
String result = (String) fastest.join();

Error Handling

java
// exceptionally: recover from error with a fallback value
CompletableFuture<User> safe = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .exceptionally(ex -> {
        log.warn("Failed to fetch user {}: {}", userId, ex.getMessage());
        return User.anonymous(); // fallback value
    });

// handle: transform either result or exception (always runs)
CompletableFuture<UserResponse> handled = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .handle((user, ex) -> {
        if (ex != null) {
            return UserResponse.error(ex.getMessage());
        }
        return UserResponse.success(user);
    });

// whenComplete: side effects on completion (doesn't change result)
CompletableFuture<User> withLogging = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .whenComplete((user, ex) -> {
        if (ex != null) {
            metrics.increment("user.fetch.error");
        } else {
            metrics.increment("user.fetch.success");
        }
        // Note: result is unchanged regardless of what happens here
    });

// Propagate specific exception types
CompletableFuture<Order> order = CompletableFuture
    .supplyAsync(() -> createOrder(cart), executor)
    .exceptionallyCompose(ex -> {
        if (ex.getCause() instanceof InsufficientCreditException) {
            return CompletableFuture.failedFuture(
                new ApiException(400, "Insufficient credit")
            );
        }
        return CompletableFuture.failedFuture(ex);
    });

Timeouts

java
// Java 9+: orTimeout — complete exceptionally after timeout
CompletableFuture<User> withTimeout = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .orTimeout(3, TimeUnit.SECONDS);
// Throws TimeoutException after 3 seconds if not complete

// completeOnTimeout — complete with default value after timeout
CompletableFuture<Recommendations> recs = CompletableFuture
    .supplyAsync(() -> fetchRecommendations(userId), executor)
    .completeOnTimeout(Recommendations.empty(), 500, TimeUnit.MILLISECONDS);
// Returns empty recommendations if service takes > 500ms (graceful degradation)

// Timeout with fallback using exceptionally
CompletableFuture<List<Product>> products = CompletableFuture
    .supplyAsync(() -> fetchProductsFromSlowService(), executor)
    .orTimeout(2, TimeUnit.SECONDS)
    .exceptionally(ex -> {
        if (ex instanceof TimeoutException) {
            return cachedProducts.getOrDefault(category, List.of());
        }
        throw new CompletionException(ex);
    });

Custom Executor Configuration

java
// For I/O-bound tasks (DB, HTTP): large pool, allow blocking
ExecutorService ioExecutor = Executors.newFixedThreadPool(
    50, // large pool: most threads will be blocked on I/O
    new ThreadFactoryBuilder()
        .setNameFormat("io-pool-%d")
        .setDaemon(true)
        .build()
);

// For CPU-bound tasks: sized to available processors
ExecutorService cpuExecutor = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors(),
    new ThreadFactoryBuilder()
        .setNameFormat("cpu-pool-%d")
        .build()
);

// Virtual threads (Java 21+): no pool sizing needed
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
// Virtual threads are cheap — one per task is fine even for thousands of tasks
// Perfect for I/O-bound workloads
CompletableFuture<User> user = CompletableFuture.supplyAsync(
    () -> fetchUserFromDB(userId),
    virtualExecutor
);

Practical Pattern: Parallel Service Calls

java
public class UserDashboardService {
    private final ExecutorService executor;
    private final UserService userService;
    private final OrderService orderService;
    private final RecommendationService recommendationService;

    public CompletableFuture<Dashboard> buildDashboard(String userId) {
        // Fire all three requests in parallel
        CompletableFuture<User> userFuture = CompletableFuture
            .supplyAsync(() -> userService.getUser(userId), executor)
            .orTimeout(2, TimeUnit.SECONDS);

        CompletableFuture<List<Order>> ordersFuture = CompletableFuture
            .supplyAsync(() -> orderService.getRecentOrders(userId, 10), executor)
            .orTimeout(2, TimeUnit.SECONDS)
            .exceptionally(ex -> List.of()); // degraded: empty orders on failure

        CompletableFuture<List<Product>> recsFuture = CompletableFuture
            .supplyAsync(() -> recommendationService.getFor(userId), executor)
            .completeOnTimeout(List.of(), 500, TimeUnit.MILLISECONDS); // non-critical: 500ms max

        // Combine when all complete
        return CompletableFuture.allOf(userFuture, ordersFuture, recsFuture)
            .thenApply(v -> Dashboard.builder()
                .user(userFuture.join())
                .orders(ordersFuture.join())
                .recommendations(recsFuture.join())
                .build());
    }
}

Sequential execution: 2s + 2s + 0.5s = 4.5 seconds
Parallel with allOf: max(2s, 2s, 0.5s) = 2 seconds


CompletableFuture vs Virtual Threads (Java 21)

java
// CompletableFuture style (works on all Java 8+)
CompletableFuture<User> cf = CompletableFuture
    .supplyAsync(() -> fetchUser(userId), executor)
    .thenCompose(user -> fetchOrders(user.getId()))
    .thenApply(orders -> buildResponse(orders));

// Virtual threads style (Java 21+, structured concurrency in preview)
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    StructuredTaskScope.Subtask<User> userTask = scope.fork(() -> fetchUser(userId));
    StructuredTaskScope.Subtask<List<Order>> ordersTask = scope.fork(() -> fetchOrders(userId));
    
    scope.join().throwIfFailed();
    
    return buildResponse(userTask.get(), ordersTask.get());
}

Virtual threads with structured concurrency are more readable for new code on Java 21+. CompletableFuture remains the standard for Java 8–20 and when you need complex combinators (allOf, anyOf, thenCompose chains).


Frequently Asked Questions

Q: What is the difference between thenApply and thenCompose?

thenApply is like map—it transforms the result with a synchronous function: T → U. thenCompose is like flatMap—it chains to another async operation: T → CompletableFuture<U>. Use thenCompose when the next step is itself async (a DB call, HTTP request); use thenApply when the transformation is synchronous (parsing, mapping). Using thenApply with a function that returns a CompletableFuture produces CompletableFuture<CompletableFuture<T>>—a double-wrapped future that almost certainly isn't what you want.

Q: When should I use join() vs get()?

Both block until the future completes. get() throws checked exceptions (ExecutionException, InterruptedException); join() throws unchecked CompletionException. In async chains (inside thenApply, thenCompose lambdas), use join() because lambdas can't declare checked exceptions. In code that can handle interruption (e.g., in a thread pool that should stop cleanly), use get() so InterruptedException can be caught and handled. Neither should be called in a reactive/async thread—they block the calling thread.

Q: How do I propagate MDC (logging context) or request-scoped data across async boundaries?

Thread-local storage (including SLF4J MDC) doesn't transfer to new threads automatically. Wrap your executor to copy the context:

java
ExecutorService wrappedExecutor = new MDCPropagatingExecutor(executor);
// Or manually:
Map<String, String> mdc = MDC.getCopyOfContextMap();
CompletableFuture.supplyAsync(() -> {
    MDC.setContextMap(mdc); // restore in the new thread
    return doWork();
}, executor);

Q: What happens to exceptions thrown inside thenApply or thenCompose?

Exceptions thrown inside stage callbacks are wrapped in a CompletionException and propagate to the next exceptional stage (exceptionally, handle, whenComplete). If no exceptional stage exists, the exception surfaces when you call join() or get(). This means you must have error handling somewhere in the chain or call site—otherwise exceptions are silently swallowed (the future just remains in a failed state). Always add .whenComplete or .exceptionally at the end of production chains to log failures even if you don't recover from them.