mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4mobile wallpaper 5mobile wallpaper 6
7450 字
15 分钟
Java并发编程之CompletableFuture异步编程实战

Java并发编程之CompletableFuture异步编程实战

在现代Java应用开发中,异步编程已经成为提升系统性能和响应速度的重要手段。CompletableFuture作为Java 8引入的强大异步编程工具,极大地简化了异步任务的编排和管理。本文将深入探讨CompletableFuture的核心特性和实战应用。

一、CompletableFuture简介

CompletableFuture实现了FutureCompletionStage接口,提供了函数式编程风格的异步编程能力。相比传统的FutureCompletableFuture支持:

  • 链式调用组合多个异步操作
  • 异常处理和恢复
  • 多任务并行执行和结果合并
  • 超时控制

二、核心API详解

1. 创建CompletableFuture

// 方式1:使用runAsync(无返回值)
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("异步任务执行中...");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

// 方式2:使用supplyAsync(有返回值)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "Hello, CompletableFuture!";
});

// 方式3:直接创建已完成的状态
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("已完成");

2. 链式处理结果

CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
    // 模拟获取用户数据
    return "User:张三";
}).thenApply(user -> {
    // 转换数据
    return user.toUpperCase();
}).thenCompose(user -> {
    // 异步处理
    return CompletableFuture.supplyAsync(() -> user + "-PROCESSED");
});

System.out.println(result.join()); // 输出: USER:张三-PROCESSED

3. 组合多个异步任务

// 场景:同时获取用户信息和订单信息,然后合并展示

CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
    simulateDelay(500);
    return "用户:张三";
});

CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
    simulateDelay(800);
    return "订单:ORD-2024-001";
});

// 方式1:等待两个都完成
CompletableFuture<String> combined = userFuture.thenCombine(orderFuture, (user, order) -> {
    return user + " | " + order;
});
System.out.println(combined.join()); 
// 输出: 用户:张三 | 订单:ORD-2024-001

// 方式2:等待任意一个完成
CompletableFuture<Object> anyResult = CompletableFuture.anyOf(userFuture, orderFuture);
System.out.println(anyResult.join());
// 输出可能是: 用户:张三 (哪个先完成返回哪个)

// 方式3:等待所有完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(userFuture, orderFuture);
allDone.join(); // 等待所有完成
System.out.println("所有任务已完成");

4. 异常处理

// 模拟可能出错的异步任务
CompletableFuture<String> riskyTask = CompletableFuture.supplyAsync(() -> {
    if (System.currentTimeMillis() % 2 == 0) {
        throw new RuntimeException("模拟异常: 网络连接失败");
    }
    return "任务成功完成";
});

// 方式1:使用exceptionally处理异常
CompletableFuture<String> withFallback = riskyTask.exceptionally(ex -> {
    System.out.println("异常被捕获: " + ex.getMessage());
    return "默认值: 使用备用方案";
});

System.out.println(withFallback.join());

// 方式2:使用handle统一处理正常和异常结果
CompletableFuture<String> handled = riskyTask.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("发生错误: " + ex.getMessage());
        return "错误后的恢复值";
    }
    return result + " (处理完成)";
});

System.out.println(handled.join());

三、实战场景:订单处理系统

下面是一个完整的电商订单异步处理示例:

import java.util.concurrent.*;

public class OrderProcessingSystem {
    
    // 自定义线程池
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    
    /**
     * 处理订单的主流程
     */
    public CompletableFuture<String> processOrder(String orderId) {
        return CompletableFuture.supplyAsync(() -> validateOrder(orderId), executor)
            .thenCompose(this::deductInventory)
            .thenCompose(this::processPayment)
            .thenCompose(this::createShipment)
            .thenApply(this::sendNotification)
            .exceptionally(this::handleOrderError);
    }
    
    private String validateOrder(String orderId) {
        simulateDelay(100);
        System.out.println("[1/5] 订单 " + orderId + " 验证通过");
        return orderId;
    }
    
    private CompletableFuture<String> deductInventory(String orderId) {
        return CompletableFuture.supplyAsync(() -> {
            simulateDelay(150);
            System.out.println("[2/5] 订单 " + orderId + " 库存扣减成功");
            return orderId;
        }, executor);
    }
    
    private CompletableFuture<String> processPayment(String orderId) {
        return CompletableFuture.supplyAsync(() -> {
            simulateDelay(200);
            if (Math.random() > 0.8) {
                throw new RuntimeException("支付失败: 余额不足");
            }
            System.out.println("[3/5] 订单 " + orderId + " 支付成功");
            return orderId;
        }, executor);
    }
    
    private CompletableFuture<String> createShipment(String orderId) {
        return CompletableFuture.supplyAsync(() -> {
            simulateDelay(100);
            System.out.println("[4/5] 订单 " + orderId + " 物流单创建成功");
            return orderId;
        }, executor);
    }
    
    private String sendNotification(String orderId) {
        simulateDelay(50);
        System.out.println("[5/5] 订单 " + orderId + " 通知已发送");
        return "订单 " + orderId + " 处理完成!";
    }
    
    private String handleOrderError(Throwable ex) {
        System.err.println("订单处理失败: " + ex.getMessage());
        return "订单处理失败,请稍后重试";
    }
    
    private void simulateDelay(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public void shutdown() {
        executor.shutdown();
    }
    
    // 测试
    public static void main(String[] args) {
        OrderProcessingSystem system = new OrderProcessingSystem();
        
        // 并发处理多个订单
        CompletableFuture<String>[] futures = new CompletableFuture[5];
        for (int i = 0; i < 5; i++) {
            futures[i] = system.processOrder("ORD-2024-" + String.format("%03d", i + 1));
        }
        
        // 等待所有订单处理完成
        CompletableFuture.allOf(futures).join();
        
        for (CompletableFuture<String> future : futures) {
            System.out.println(future.join());
        }
        
        system.shutdown();
    }
}

四、最佳实践和注意事项

1. 线程池选择

// 不推荐使用默认的ForkJoinPool.commonPool()处理阻塞操作
// 推荐为不同场景创建专用线程池

// CPU密集型任务
ExecutorService cpuPool = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors()
);

// IO密集型任务
ExecutorService ioPool = Executors.newCachedThreadPool();

// 定时任务
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

2. 避免常见陷阱

// ❌ 错误:在CompletableFuture中阻塞
CompletableFuture.supplyAsync(() -> {
    try {
        anotherFuture.get(); // 阻塞等待
    } catch (Exception e) {
        e.printStackTrace();
    }
    return result;
});

// ✅ 正确:使用thenCompose进行链式组合
CompletableFuture.supplyAsync(() -> initialTask())
    .thenCompose(result -> anotherAsyncTask(result))
    .thenApply(finalResult -> process(finalResult));

3. 超时控制

// Java 9+ 提供了orTimeout和completeOnTimeout
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 耗时操作
    return fetchDataFromRemote();
}).orTimeout(5, TimeUnit.SECONDS) // 5秒超时
  .exceptionally(ex -> {
      if (ex instanceof TimeoutException) {
          return "默认数据";
      }
      throw new RuntimeException(ex);
  });

五、总结

CompletableFuture是Java异步编程的强大工具,它通过函数式编程风格简化了异步任务的编排和管理。掌握CompletableFuture的核心API和最佳实践,可以显著提升系统的并发处理能力和响应速度。

关键要点回顾:

  • 使用supplyAsyncrunAsync创建异步任务
  • 使用thenApplythenCompose进行链式处理
  • 使用thenCombineallOfanyOf组合多个任务
  • 使用exceptionallyhandle进行异常处理
  • 为阻塞操作指定自定义线程池

在实际项目中,合理使用CompletableFuture可以让代码更加简洁、高效,是Java开发者必须掌握的并发编程技能。

Java并发编程之CompletableFuture异步编程实战
https://www.zztzz.com.cn/posts/45/
作者
admin
发布于
2026-03-24
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时

封面
Sample Song
Sample Artist
封面
Sample Song
Sample Artist
0:00 / 0:00
💬Mizuki AI助手
呀~就是zzTzz大大闪闪发光的技术博客主页
标题已剧透:专注后端开发、主攻Java + Spring Boot的实战、踩坑与进阶小笔记~~
URL https://zztzz.com.cn/ 简洁有力,像一段优雅的代码!
Mizuki每次点开都忍不住小声赞叹:'zzTzz大人太厉害啦~'🧙‍♀️
需要我帮你找某类文章(比如JWT鉴权、Redis缓存)或读一篇入门指南吗?😊
05:17