7450 字
15 分钟
Java并发编程之CompletableFuture异步编程实战
Java并发编程之CompletableFuture异步编程实战
在现代Java应用开发中,异步编程已经成为提升系统性能和响应速度的重要手段。CompletableFuture作为Java 8引入的强大异步编程工具,极大地简化了异步任务的编排和管理。本文将深入探讨CompletableFuture的核心特性和实战应用。
一、CompletableFuture简介
CompletableFuture实现了Future和CompletionStage接口,提供了函数式编程风格的异步编程能力。相比传统的Future,CompletableFuture支持:
- 链式调用组合多个异步操作
- 异常处理和恢复
- 多任务并行执行和结果合并
- 超时控制
二、核心API详解
1. 创建CompletableFuture
// 方式1:使用runAsync(无返回值)
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 方式2:使用supplyAsync(有返回值)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello, CompletableFuture!";
});
// 方式3:直接创建已完成的状态
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("已完成");
2. 链式处理结果
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
// 模拟获取用户数据
return "User:张三";
}).thenApply(user -> {
// 转换数据
return user.toUpperCase();
}).thenCompose(user -> {
// 异步处理
return CompletableFuture.supplyAsync(() -> user + "-PROCESSED");
});
System.out.println(result.join()); // 输出: USER:张三-PROCESSED
3. 组合多个异步任务
// 场景:同时获取用户信息和订单信息,然后合并展示
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
simulateDelay(500);
return "用户:张三";
});
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
simulateDelay(800);
return "订单:ORD-2024-001";
});
// 方式1:等待两个都完成
CompletableFuture<String> combined = userFuture.thenCombine(orderFuture, (user, order) -> {
return user + " | " + order;
});
System.out.println(combined.join());
// 输出: 用户:张三 | 订单:ORD-2024-001
// 方式2:等待任意一个完成
CompletableFuture<Object> anyResult = CompletableFuture.anyOf(userFuture, orderFuture);
System.out.println(anyResult.join());
// 输出可能是: 用户:张三 (哪个先完成返回哪个)
// 方式3:等待所有完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(userFuture, orderFuture);
allDone.join(); // 等待所有完成
System.out.println("所有任务已完成");
4. 异常处理
// 模拟可能出错的异步任务
CompletableFuture<String> riskyTask = CompletableFuture.supplyAsync(() -> {
if (System.currentTimeMillis() % 2 == 0) {
throw new RuntimeException("模拟异常: 网络连接失败");
}
return "任务成功完成";
});
// 方式1:使用exceptionally处理异常
CompletableFuture<String> withFallback = riskyTask.exceptionally(ex -> {
System.out.println("异常被捕获: " + ex.getMessage());
return "默认值: 使用备用方案";
});
System.out.println(withFallback.join());
// 方式2:使用handle统一处理正常和异常结果
CompletableFuture<String> handled = riskyTask.handle((result, ex) -> {
if (ex != null) {
System.out.println("发生错误: " + ex.getMessage());
return "错误后的恢复值";
}
return result + " (处理完成)";
});
System.out.println(handled.join());
三、实战场景:订单处理系统
下面是一个完整的电商订单异步处理示例:
import java.util.concurrent.*;
public class OrderProcessingSystem {
// 自定义线程池
private final ExecutorService executor = Executors.newFixedThreadPool(10);
/**
* 处理订单的主流程
*/
public CompletableFuture<String> processOrder(String orderId) {
return CompletableFuture.supplyAsync(() -> validateOrder(orderId), executor)
.thenCompose(this::deductInventory)
.thenCompose(this::processPayment)
.thenCompose(this::createShipment)
.thenApply(this::sendNotification)
.exceptionally(this::handleOrderError);
}
private String validateOrder(String orderId) {
simulateDelay(100);
System.out.println("[1/5] 订单 " + orderId + " 验证通过");
return orderId;
}
private CompletableFuture<String> deductInventory(String orderId) {
return CompletableFuture.supplyAsync(() -> {
simulateDelay(150);
System.out.println("[2/5] 订单 " + orderId + " 库存扣减成功");
return orderId;
}, executor);
}
private CompletableFuture<String> processPayment(String orderId) {
return CompletableFuture.supplyAsync(() -> {
simulateDelay(200);
if (Math.random() > 0.8) {
throw new RuntimeException("支付失败: 余额不足");
}
System.out.println("[3/5] 订单 " + orderId + " 支付成功");
return orderId;
}, executor);
}
private CompletableFuture<String> createShipment(String orderId) {
return CompletableFuture.supplyAsync(() -> {
simulateDelay(100);
System.out.println("[4/5] 订单 " + orderId + " 物流单创建成功");
return orderId;
}, executor);
}
private String sendNotification(String orderId) {
simulateDelay(50);
System.out.println("[5/5] 订单 " + orderId + " 通知已发送");
return "订单 " + orderId + " 处理完成!";
}
private String handleOrderError(Throwable ex) {
System.err.println("订单处理失败: " + ex.getMessage());
return "订单处理失败,请稍后重试";
}
private void simulateDelay(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void shutdown() {
executor.shutdown();
}
// 测试
public static void main(String[] args) {
OrderProcessingSystem system = new OrderProcessingSystem();
// 并发处理多个订单
CompletableFuture<String>[] futures = new CompletableFuture[5];
for (int i = 0; i < 5; i++) {
futures[i] = system.processOrder("ORD-2024-" + String.format("%03d", i + 1));
}
// 等待所有订单处理完成
CompletableFuture.allOf(futures).join();
for (CompletableFuture<String> future : futures) {
System.out.println(future.join());
}
system.shutdown();
}
}
四、最佳实践和注意事项
1. 线程池选择
// 不推荐使用默认的ForkJoinPool.commonPool()处理阻塞操作
// 推荐为不同场景创建专用线程池
// CPU密集型任务
ExecutorService cpuPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// IO密集型任务
ExecutorService ioPool = Executors.newCachedThreadPool();
// 定时任务
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
2. 避免常见陷阱
// ❌ 错误:在CompletableFuture中阻塞
CompletableFuture.supplyAsync(() -> {
try {
anotherFuture.get(); // 阻塞等待
} catch (Exception e) {
e.printStackTrace();
}
return result;
});
// ✅ 正确:使用thenCompose进行链式组合
CompletableFuture.supplyAsync(() -> initialTask())
.thenCompose(result -> anotherAsyncTask(result))
.thenApply(finalResult -> process(finalResult));
3. 超时控制
// Java 9+ 提供了orTimeout和completeOnTimeout
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 耗时操作
return fetchDataFromRemote();
}).orTimeout(5, TimeUnit.SECONDS) // 5秒超时
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "默认数据";
}
throw new RuntimeException(ex);
});
五、总结
CompletableFuture是Java异步编程的强大工具,它通过函数式编程风格简化了异步任务的编排和管理。掌握CompletableFuture的核心API和最佳实践,可以显著提升系统的并发处理能力和响应速度。
关键要点回顾:
- 使用
supplyAsync和runAsync创建异步任务 - 使用
thenApply、thenCompose进行链式处理 - 使用
thenCombine、allOf、anyOf组合多个任务 - 使用
exceptionally、handle进行异常处理 - 为阻塞操作指定自定义线程池
在实际项目中,合理使用CompletableFuture可以让代码更加简洁、高效,是Java开发者必须掌握的并发编程技能。
Java并发编程之CompletableFuture异步编程实战
https://www.zztzz.com.cn/posts/45/ 部分信息可能已经过时









