线程池知识点
一、顶层架构体系
核心接口与类
Executor(顶层接口)
└─ 只有 execute(Runnable) 方法,定义"任务执行"最核心的契约
│
├── ExecutorService(子接口)
│ ├─ submit(Callable/Runnable) 提交有返回值任务
│ ├─ invokeAll(List<Callable>) 批量提交并等待全部完成
│ ├─ invokeAny(List<Callable>) 批量提交,任一完成即返回
│ ├─ shutdown() 优雅关闭
│ ├─ shutdownNow() 强制关闭
│ └─ awaitTermination() 等待终止
│
├── ScheduledExecutorService(子接口)
│ ├─ schedule(task, delay, unit) 延迟执行
│ ├─ scheduleAtFixedRate() 固定频率
│ └─ scheduleWithFixedDelay() 固定延迟
│
├── AbstractExecutorService(抽象类)
│ └─ 实现 submit/invokeAll/invokeAny 等模板方法
│
└── Executors(工具类)
└─ 提供 newFixedThreadPool / newCachedThreadPool 等静态工厂方法
核心实现类
| 实现类 | 说明 |
|---|---|
ThreadPoolExecutor |
线程池核心实现,所有基础线程池的底层 |
ScheduledThreadPoolExecutor |
继承 ThreadPoolExecutor,增加定时/周期能力 |
ForkJoinPool |
分治任务线程池,Fork/Join 框架的基础 |
Executor 框架的作用
将任务的提交与任务的执行解耦:
// 提交任务的人不需要关心线程如何创建、如何管理
Executor executor = new ThreadPoolExecutor(...);
executor.execute(() -> System.out.println("任务被提交,具体谁执行不关心"));
二、线程池七大核心参数
public ThreadPoolExecutor(
int corePoolSize, // ① 核心线程数
int maximumPoolSize, // ② 最大线程数
long keepAliveTime, // ③ 空闲存活时间
TimeUnit unit, // ④ 时间单位
BlockingQueue<Runnable> workQueue, // ⑤ 阻塞队列
ThreadFactory threadFactory, // ⑥ 线程工厂
RejectedExecutionHandler handler // ⑦ 拒绝策略
)
参数详解
| 参数 | 含义 | 说明 |
|---|---|---|
| corePoolSize | 核心线程数 | 即使空闲也不会回收(除非 allowCoreThreadTimeOut(true))的常驻线程数 |
| maximumPoolSize | 最大线程数 | 线程池允许创建的最大线程数,包含核心线程 |
| keepAliveTime | 空闲存活时间 | 非核心线程空闲超过该时间后会被回收 |
| unit | 时间单位 | keepAliveTime 的单位(秒/毫秒/微秒等) |
| workQueue | 阻塞队列 | 核心线程满时,新任务放入队列等待 |
| threadFactory | 线程工厂 | 创建新线程的工厂,建议自定义给线程命名 |
| handler | 拒绝策略 | 队列和最大线程都满了时的处理方式 |
参数之间的关系
corePoolSize ≤ maximumPoolSize
当 workQueue 是无界队列时,maximumPoolSize 参数失效
(因为队列永远装得下,不会创建非核心线程)
三、四种内置常用线程池
1. FixedThreadPool
ExecutorService executor = Executors.newFixedThreadPool(5);
// 实际构造:
// new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>())
| 项目 | 值 |
|---|---|
| corePoolSize | 传入的 n |
| maximumPoolSize | = n |
| keepAliveTime | 0(无效,因为没有非核心线程) |
| 队列 | LinkedBlockingQueue(无界,默认 Integer.MAX_VALUE) |
适用场景: 负载稳定、任务量可控的场景
2. CachedThreadPool
ExecutorService executor = Executors.newCachedThreadPool();
// 实际构造:
// new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
// new SynchronousQueue<Runnable>())
| 项目 | 值 |
|---|---|
| corePoolSize | 0 |
| maximumPoolSize | Integer.MAX_VALUE |
| keepAliveTime | 60 秒 |
| 队列 | SynchronousQueue(容量为 0,不存任务) |
适用场景: 短时间、大量小任务的场景
3. SingleThreadExecutor
ExecutorService executor = Executors.newSingleThreadExecutor();
// 实际构造:
// new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>())
| 项目 | 值 |
|---|---|
| corePoolSize | 1 |
| maximumPoolSize | 1 |
| 队列 | LinkedBlockingQueue(无界) |
特点: 保证所有任务按顺序执行(FIFO),适合需要顺序执行的场景
4. ScheduledThreadPool
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
// 实际构造:
// new ScheduledThreadPoolExecutor(5)
// 内部:super(corePoolSize, Integer.MAX_VALUE, ...)
| 项目 | 值 |
|---|---|
| corePoolSize | 传入的 n |
| maximumPoolSize | Integer.MAX_VALUE |
| 队列 | DelayedWorkQueue |
适用场景: 定时任务、延迟执行、周期任务
阿里规范为什么禁止 Executors 创建线程池?
| 内置池 | 问题 | 后果 |
|---|---|---|
FixedThreadPool |
LinkedBlockingQueue 无界 | 任务堆积 → OOM |
CachedThreadPool |
maximumPoolSize = MAX_VALUE | 线程无限创建 → OOM |
SingleThreadExecutor |
LinkedBlockingQueue 无界 | 任务堆积 → OOM |
ScheduledThreadPool |
maximumPoolSize = MAX_VALUE | 线程无限创建 → OOM |
必须使用 new ThreadPoolExecutor(...) 自定义有界队列创建线程池。
四、线程池运行原理
任务提交流程(核心)
提交任务
│
▼
┌─ 核心线程数 < corePoolSize?──────是──→ 创建核心线程执行
│ ↑
否 │
│ │
▼ │
┌─ 阻塞队列未满?─────────────────是──→ 放入队列等待─────┘
│ ↑
否 │
│ │
▼ │
┌─ 当前线程数 < maximumPoolSize?───是──→ 创建非核心线程执行
│ ↑
否 │
│ │
▼ │
执行拒绝策略 ──────────────────────────────────┘
关键时机点
| 时机 | 条件 | 行为 |
|---|---|---|
| 创建核心线程 | 当前线程数 < corePoolSize | 即使有线程空闲也创建新线程 |
| 放入队列 | 核心线程都在忙 | 任务排队等待 |
| 创建非核心线程 | 队列满 & 线程数 < maximumPoolSize | 临时救急线程 |
| 执行拒绝 | 队列满 & 线程数 = maximumPoolSize | 调用 RejectedExecutionHandler |
| 回收非核心 | 空闲时间 > keepAliveTime | 终止线程 |
核心点理解
- 预创建核心线程:
prestartAllCoreThreads()可提前创建所有核心线程(默认是懒创建,有任务才建) - 核心线程回收:默认不会回收,可调用
allowCoreThreadTimeOut(true)让核心线程也参与回收 - 队列是缓冲池不是无底洞:用有界队列才能触发扩容和拒绝,保护系统
五、四大拒绝策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy(默认) | 抛 RejectedExecutionException |
核心业务,不想静默丢失任务 |
| CallerRunsPolicy | 提交任务的线程自己执行 | 降级保护,让提交者感受压力,自然放慢速度 |
| DiscardPolicy | 直接丢弃,不抛异常 | 允许丢任务的场景(如日志、监控上报) |
| DiscardOldestPolicy | 丢弃队列中最旧的任务,重新提交 | 追求最新数据,旧数据可以放弃(如实时排行) |
选型建议
核心业务(支付、订单) → AbortPolicy(必须感知失败)
│
▼
流量保护业务(秒杀、削峰) → CallerRunsPolicy(让上游自己执行,自然限流)
│
▼
可丢业务(日志、统计) → DiscardPolicy(丢了也无所谓)
│
▼
追求最新数据(价格、排行) → DiscardOldestPolicy(丢弃旧的排队中的)
六、生命周期
线程池的 5 种状态
RUNNING
│
├── shutdown() ──────────────────→ SHUTDOWN
│ (不接受新任务,已提交的继续执行)
│
└── shutdownNow() ───────────────→ STOP
(不接受新任务,中断正在执行的,返回未执行的)
RUNNING(运行中)
│
├── shutdown() → SHUTDOWN(关闭中,不接受新任务,已提交任务继续)
│ │
│ └── 队列为空 + 所有任务完成 → TIDYING → TERMINATED
│
└── shutdownNow() → STOP(要停止,不接受新任务,中断正在执行的,返回未执行的任务列表)
│
└── 线程池为空 → TIDYING → TERMINATED
| 状态 | 含义 |
|---|---|
| RUNNING | 正常运行,接受新任务,处理队列任务 |
| SHUTDOWN | 调用 shutdown(),不再接受新任务,但已提交的任务继续执行 |
| STOP | 调用 shutdownNow(),不再接受新任务,中断正在执行的任务,清空队列 |
| TIDYING | 过渡状态,任务全部结束,即将执行 terminated() 钩子方法 |
| TERMINATED | 完全终止,terminated() 执行完毕 |
shutdown() vs shutdownNow()
| 对比 | shutdown() | shutdownNow() |
|---|---|---|
| 接收新任务 | ❌ 拒绝 | ❌ 拒绝 |
| 已提交正在执行的 | ✅ 等待完成 | ❌ 尝试中断(Thread.interrupt()) |
| 队列中等待的任务 | ✅ 继续执行 | ❌ 返回未执行的任务列表 |
| 是否可以恢复 | ❌ | ❌ |
优雅关闭的标准写法
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
// 第一步:不再接收新任务
executor.shutdown();
try {
// 第二步:等待现有任务完成(最多等 60 秒)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 第三步:超时后强制关闭
executor.shutdownNow();
// 第四步:再等一会儿,看是否真的终止
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能完全终止");
}
}
} catch (InterruptedException e) {
// 第五步:当前线程被中断时,强制关闭
executor.shutdownNow();
Thread.currentThread().interrupt();
}
七、常用核心方法
execute() vs submit()
| 对比 | execute() | submit() |
|---|---|---|
| 参数 | Runnable | Runnable / Callable |
| 返回值 | void | Future(可获取异步结果) |
| 异常处理 | 直接抛到线程(当前线程感知) | 被 Future 捕获,调用 get() 时才抛出 |
| 定义 | Executor 接口 | ExecutorService 接口 |
// execute — 无返回值,异常直抛
executor.execute(() -> {
throw new RuntimeException("这个异常会被直接抛出");
});
// submit — 有返回值,异常被 Future 捕获
Future<String> future = executor.submit(() -> {
throw new RuntimeException("这个异常被 Future 捕获");
});
// future.get(); // 此时才抛出 ExecutionException(包装了原始异常)
核心方法一览
// === 提交任务 ===
void execute(Runnable command); // 提交无返回值任务
<T> Future<T> submit(Callable<T> task); // 提交有返回值任务
<T> Future<T> submit(Runnable task, T result); // 提交任务,指定返回值
<T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks); // 批量提交,全部完成返回
<T> T invokeAny(Collection<Callable<T>> tasks); // 批量提交,任一完成即返回
// === 关闭线程池 ===
void shutdown(); // 优雅关闭
List<Runnable> shutdownNow(); // 强制关闭(返回未执行任务)
boolean isShutdown(); // 是否已关闭
boolean isTerminated(); // 是否已终止
boolean awaitTermination(long timeout, TimeUnit unit); // 等待终止
// === 配置与监控 ===
void prestartAllCoreThreads(); // 预启动所有核心线程
void allowCoreThreadTimeOut(boolean value); // 允许核心线程超时回收
int getPoolSize(); // 当前线程数
int getActiveCount(); // 活跃线程数
long getTaskCount(); // 已执行 + 队列中任务数
long getCompletedTaskCount(); // 已完成任务数
int getQueue().size(); // 队列中等待的任务数
八、异步任务相关(Callable / Future / FutureTask)
Callable vs Runnable
| 对比 | Runnable | Callable |
|---|---|---|
| 方法签名 | void run() |
V call() throws Exception |
| 返回值 | 无 | 有(泛型) |
| 异常 | 不能抛受检异常 | 可以抛异常 |
| 引入版本 | JDK 1.0 | JDK 5 |
Future 常用方法
// 提交任务
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "结果";
});
// 核心方法
future.get(); // 阻塞等待结果(一直等)
future.get(3, TimeUnit.SECONDS); // 超时等待,超时抛 TimeoutException
future.isDone(); // 任务是否完成
future.isCancelled(); // 任务是否被取消
future.cancel(true); // 尝试取消任务
// true = 即使正在运行也中断
// false = 未启动才取消
FutureTask
FutureTask 同时实现了 Runnable 和 Future,既可以直接交给线程执行,又可以获取结果。
// 方式一:直接给 Thread
FutureTask<String> futureTask = new FutureTask<>(() -> {
return "通过 FutureTask 执行";
});
new Thread(futureTask).start();
String result = futureTask.get(); // 获取结果
// 方式二:提交给 Executor
FutureTask<String> task = new FutureTask<>(() -> "结果");
executor.execute(task); // execute 也能用!因为实现了 Runnable
String r = task.get();
// 方式三:一次性保证
// FutureTask 的 run() 只能执行一次,多次调用无效
// 适合做"缓存"——计算一次,结果复用
异常捕获
ExecutorService executor = Executors.newFixedThreadPool(2);
// 方式一:submit 任务的异常
Future<String> future = executor.submit(() -> {
throw new RuntimeException("业务异常");
});
try {
future.get(); // 抛出 ExecutionException
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // 获取原始异常
System.out.println("异常原因: " + cause.getMessage());
}
// 方式二:execute 任务的异常
executor.execute(() -> {
throw new RuntimeException("异常会直接抛出到线程");
});
// 可以通过设置 UncaughtExceptionHandler 捕获
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> {
System.out.println("线程 " + thread.getName() + " 异常: " + ex.getMessage());
});
return t;
};
九、CompletableFuture(JDK 8 增强异步)
为什么需要 CompletableFuture?
Future 的局限性:
get()是阻塞的,不能异步回调- 多个 Future 不能组合(无法编排任务先后顺序)
- 缺少异常处理的回调
- 不能手动完成
CompletableFuture 解决了以上所有问题。
9.1 创建异步任务
// 指定线程池(重要!默认用 ForkJoinPool.commonPool() 线程数 = CPU 核心数 - 1)
ExecutorService executor = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)
);
// 无返回值
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("异步执行,无返回值");
}, executor);
// 有返回值
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "异步结果";
}, executor);
// 手动完成
CompletableFuture<String> future3 = new CompletableFuture<>();
future3.complete("手动设置结果"); // 立即完成
future3.completeExceptionally(new Exception("异常")); // 异常完成
// 获取结果
String result = future2.get(); // 阻塞等待(抛受检异常)
String result2 = future2.join(); // 阻塞等待(抛非受检异常,推荐)
String result3 = future2.getNow("默认值"); // 未完成返回默认值
9.2 单任务回调编排
// thenApply — 对结果做转换(有返回值)
CompletableFuture.supplyAsync(() -> "hello", executor)
.thenApply(s -> s.toUpperCase()) // HELLO
.thenApply(s -> s + " WORLD") // HELLO WORLD
.thenAccept(System.out::println); // 消费结果
// thenAccept — 消费结果,无返回值
CompletableFuture.supplyAsync(() -> "结果", executor)
.thenAccept(r -> System.out.println("消费: " + r));
// thenRun — 不关心结果,任务完了就执行
CompletableFuture.supplyAsync(() -> "结果", executor)
.thenRun(() -> System.out.println("任务完成通知"));
// exceptionally — 异常兜底
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("出错了");
return "成功";
}, executor)
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "兜底结果";
})
.thenAccept(System.out::println);
// handle — 同时处理正常和异常
CompletableFuture.supplyAsync(() -> "正常结果", executor)
.handle((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex.getMessage());
return "兜底结果";
}
return result;
});
9.3 多任务组合
// thenCompose — 任务依赖:A → B(展开嵌套)
CompletableFuture.supplyAsync(() -> "A", executor)
.thenCompose(a -> CompletableFuture.supplyAsync(() -> a + "B"));
// thenCombine — 合并两个并行任务的独立结果
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(1000); return "任务1结果";
}, executor);
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(2000); return "任务2结果";
}, executor);
task1.thenCombine(task2, (r1, r2) -> r1 + " + " + r2)
.thenAccept(System.out::println);
// allOf — 等待所有任务完成(需手动收集结果)
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.thenRun(() -> {
String result = f1.join() + f2.join() + f3.join();
System.out.println("全部完成: " + result);
});
// 收集结果的通用方式
public <T> CompletableFuture<List<T>> allOfList(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
// anyOf — 任意一个完成即可
CompletableFuture.anyOf(f1, f2, f3)
.thenAccept(result -> System.out.println("最快的任务: " + result));
9.4 超时控制(JDK 9+)
// 超时抛 TimeoutException
CompletableFuture.supplyAsync(() -> {
sleep(5000); return "结果";
}, executor).orTimeout(2, TimeUnit.SECONDS);
// 超时返回默认值
CompletableFuture.supplyAsync(() -> {
sleep(5000); return "结果";
}, executor).completeOnTimeout("默认值", 2, TimeUnit.SECONDS);
9.5 业务实战:并行查询 + 合并结果
// 场景:查询商品详情,需要同时获取基本信息 + 库存 + 价格
public CompletableFuture<ProductVO> queryProduct(Long productId) {
CompletableFuture<ProductInfo> infoFuture =
CompletableFuture.supplyAsync(() -> productService.getInfo(productId), executor);
CompletableFuture<Integer> stockFuture =
CompletableFuture.supplyAsync(() -> stockService.getStock(productId), executor);
CompletableFuture<BigDecimal> priceFuture =
CompletableFuture.supplyAsync(() -> priceService.getPrice(productId), executor);
return infoFuture
.thenCombine(stockFuture, (info, stock) -> {
ProductVO vo = new ProductVO();
vo.setInfo(info);
vo.setStock(stock);
return vo;
})
.thenCombine(priceFuture, (vo, price) -> {
vo.setPrice(price);
return vo;
})
.exceptionally(ex -> {
log.error("查询商品失败,productId: {}", productId, ex);
return new ProductVO(); // 降级返回空对象
});
}
9.6 注意事项
| 注意点 | 说明 |
|---|---|
| 默认线程池是坑 | ForkJoinPool.commonPool() 线程数 = CPU 核心数 - 1,IO 任务必须自定义线程池 |
| get() vs join() | 推荐 join(),抛非受检异常,更简洁 |
| 异常一定要处理 | 没有 exceptionally() 捕获,异常会被"吃掉",直到 get()/join() 才抛出 |
| thenApply vs thenApplyAsync | 前者可能由调用线程直接执行,后者总是提交到线程池 |
十、实战与调优
10.1 线程数计算公式
CPU 密集型(计算为主):
- 公式:
Nthreads = CPU 核心数 + 1 - 原因:CPU 一直在算,线程多了上下文切换反而降低性能
- +1 是为了补偿偶尔缺页中断导致线程暂停
IO 密集型(数据库、RPC、文件读写):
- 经验公式:
Nthreads = CPU 核心数 × 2 - 精确公式:
Nthreads = CPU 核心数 × (1 + 等待时间 / 计算时间) - 原因:IO 等待时 CPU 空闲,可以多开线程利用 CPU
混合型任务:
- 拆分为 CPU 密集 + IO 密集,分别用不同线程池
10.2 自定义线程池实操
public class ThreadPoolConfig {
// 业务线程池(IO 密集型)
public ThreadPoolExecutor ioExecutor() {
return new ThreadPoolExecutor(
10, // corePoolSize
20, // maximumPoolSize
60L, TimeUnit.SECONDS, // 空闲 60s 回收
new LinkedBlockingQueue<>(1000), // 有界队列
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("io-pool-" + counter.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 提交者自执行
);
}
// 计算线程池(CPU 密集型)
public ThreadPoolExecutor cpuExecutor() {
int cores = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cores + 1,
cores + 1,
0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
r -> {
Thread t = new Thread(r);
t.setName("cpu-pool-" + t.getId());
return t;
},
new ThreadPoolExecutor.AbortPolicy()
);
}
}
10.3 线程池监控
// 通过 ThreadPoolExecutor 提供的方法监控
public class ThreadPoolMonitor {
public void monitor(ThreadPoolExecutor executor) {
System.out.println(String.format(
"线程池监控: [核心:%d, 当前:%d, 活跃:%d, 最大:%d, 队列:%d, 已完成:%d]",
executor.getCorePoolSize(),
executor.getPoolSize(),
executor.getActiveCount(),
executor.getMaximumPoolSize(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
));
}
// 通过继承 ThreadPoolExecutor 自定义监控
public static class MonitoredThreadPool extends ThreadPoolExecutor {
public MonitoredThreadPool(int core, int max, long keepAlive, TimeUnit unit,
BlockingQueue<Runnable> queue, ThreadFactory factory,
RejectedExecutionHandler handler) {
super(core, max, keepAlive, unit, queue, factory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 记录任务开始时间(ThreadLocal)
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// 计算任务耗时,记录监控指标
if (t != null) {
System.err.println("任务执行异常: " + t.getMessage());
}
}
@Override
protected void terminated() {
super.terminated();
System.out.println("线程池已终止");
}
}
}
10.4 参数调优指南
| 参数 | 如何调优 |
|---|---|
| corePoolSize | CPU 密集→CPU 数+1;IO 密集→CPU 数×2(或按公式计算) |
| maximumPoolSize | 一般为核心数的 2~3 倍,具体看系统压力测试 |
| workQueue | 必须用有界队列,大小根据任务峰值 × 容忍延迟计算 |
| keepAliveTime | IO 密集型可以设长一点(60s),CPU 密集型短一点(10s) |
| threadFactory | 必须命名,便于排查问题 |
| handler | 核心业务用 AbortPolicy,流量保护用 CallerRunsPolicy |
10.5 业务场景选型
| 场景 | 线程池类型 | 队列 | 核心参数 |
|---|---|---|---|
| Web 接口服务 | IO 密集型 | ArrayBlockingQueue(1000) | core=CPU×2, max=core×2 |
| 大数据计算 | CPU 密集型 | LinkedBlockingQueue(100) | core=CPU+1, max=CPU+1 |
| 定时任务 | ScheduledThreadPool | — | core=根据定时任务数确定 |
| 异步通知推送 | 弹性 IO | SynchronousQueue | core=0, max=100, keepAlive=60s |
| 日志收集 | 可丢弃 | LinkedBlockingQueue(10000) | DiscardPolicy(不阻塞业务) |
十一、面试高频考点
Q1:ThreadPoolExecutor 的核心参数含义?
corePoolSize(常驻线程数)、maximumPoolSize(最大线程数)、keepAliveTime(空闲超时)、workQueue(阻塞队列)、threadFactory(线程工厂)、handler(拒绝策略)。详见第二章。
Q2:任务提交流程?
详见第四章流程图:先判断核心线程 → 再放队列 → 再创建非核心线程 → 最后执行拒绝策略。
Q3:拒绝策略怎么选?
核心业务(支付/订单)→ AbortPolicy;限流保护 → CallerRunsPolicy;可丢业务 → DiscardPolicy。详见第五章。
Q4:为什么禁止 Executors?
| 内置池 | 风险 |
|---|---|
| FixedThreadPool | 无界队列 → OOM |
| CachedThreadPool | 无限线程 → OOM |
| SingleThreadExecutor | 无界队列 → OOM |
| ScheduledThreadPool | 无限线程 → OOM |
Q5:如何回收核心线程?
调用 allowCoreThreadTimeOut(true),核心线程和普通线程一样受 keepAliveTime 控制。
Q6:shutdown() 和 shutdownNow() 区别?
shutdown() 不接新任务,处理完已有的才停;shutdownNow() 不接新任务,中断正在执行的,返回队列中未执行的。详见第六章。
Q7:execute() 和 submit() 区别?
execute() 无返回值,异常直接抛出;submit() 有 Future 返回值,异常被 Future 捕获,调用 get() 时才抛出。详见第七章。
Q8:如何设置线程数?
CPU 密集 → CPU 核心数 + 1;IO 密集 → CPU 核心数 × 2(精确公式:核心数 × (1 + 等待/计算))。详见第十章。
Q9:线程池的队列满了会怎么样?
如果当前线程数 < maximumPoolSize:创建非核心线程执行;如果当前线程数 = maximumPoolSize:执行拒绝策略。
Q10:submit 的任务怎么捕获异常?
future.get() 会抛出 ExecutionException,调用 getCause() 获取原始异常。或者在 afterExecute 钩子中捕获。