Page cover

Thread Pool

1. 线程池核心概念

Java 通过 Executor 框架(尤其是 ThreadPoolExecutor 类)提供了可配置可复用的线程管理机制,以避免频繁创建/销毁线程导致的开销,并能在任务激增时更好地控制系统负载。


2. 工作流程与执行顺序

下面以 ThreadPoolExecutor 为例,示意一个典型的任务提交与执行流程:

          ┌────────────────────────────────────────────────────────┐
          │                    提交任务(execute/submit)         │
          └────────────────────────────────────────────────────────┘

                    [当前运行线程数 < corePoolSize ?]
                                ├── Yes --> 创建新线程执行任务
                                ▼  No
                    [工作队列(workQueue)是否已满?]
                                ├── No --> 任务入队
                                ▼  Yes
                [当前运行线程数 < maximumPoolSize ?]
                                ├── Yes --> 创建新线程执行任务
                                ▼  No
          [触发 拒绝策略 (RejectedExecutionHandler)]

解读

  1. 小于核心线程数(corePoolSize)

    • 直接创建新线程来执行任务,无需排队。

  2. 达到或超过核心线程数

    • 先尝试把任务放入工作队列(workQueue)。

  3. 当队列已满且当前线程数还没达到 maximumPoolSize

    • 继续创建新线程(但不超过 max)。

  4. 若队列已满并且已达 maximumPoolSize

    • 执行拒绝策略(默认 AbortPolicy 会抛异常)。

*简而言之:*先用“核心线程”,再排队;排队满了就扩大线程数到最大值;再无法受理就拒绝。


3. 关键参数与配置

ThreadPoolExecutor 的常见构造方法(简化):

public ThreadPoolExecutor(
        int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
    ...
}

下表概括各参数作用和调优思路:

参数

意义

典型调优策略

corePoolSize

核心线程数,除非允许超时空闲,否则即便空闲也会保留这些线程

- 通常与系统可用 CPU 核数、任务性质(CPU密集/IO密集)相关。<br/>- 避免设置过大导致线程切换。

maximumPoolSize

最大线程数,当工作队列塞满后可继续创建线程,直到达到这个上限

- 不宜过高,否则易引发严重上下文切换或 OOM。<br/>- 建议在合理评估负载后设置。

keepAliveTime

线程空闲时,超过此时间就会被终止(非核心线程默认会被回收;若开启核心线程超时,也回收空闲核心线程)

- 对波动较大的任务量可适当调高,以避免频繁创建/销毁线程。

workQueue

存放待执行任务的队列,通常使用 LinkedBlockingQueueSynchronousQueueArrayBlockingQueue

- 选择无界/有界队列取决于任务规模与内存限制。<br/>- 大小过大可能拖累响应时间;太小会频繁触发扩容或拒绝策略。

threadFactory

线程工厂,用于创建工作线程,可自定义线程名字、是否后台线程等

- 可自定义便于排查问题,比如给线程名加上业务标识。

handler (拒绝策略)

当任务无法受理时的处理方式,可选:<br/>• AbortPolicy(抛异常)<br/>• DiscardPolicy(丢任务)<br/>• DiscardOldestPolicy(丢最旧任务)<br/>• CallerRunsPolicy(在提交者线程执行)

- 结合业务需求选择合适策略。<br/>- 或自定义 handler,做降级/日志记录等操作。


4. 线程池监控与优化

4.1 监控方法

  1. ThreadPoolExecutor 提供的监控API

    • getPoolSize():当前池中线程总数

    • getActiveCount():正在执行任务的线程数

    • getQueue().size():队列中的等待任务数

    • getCompletedTaskCount():已完成任务总数

  2. JMX 监控

    • 通过 java.lang:type=Threading 或使用自定义 MBean 来实时观察线程使用情况。

  3. 外部监控工具

    • VisualVMJConsolePrometheus + Grafana,对线程池指标进行采集和可视化。

4.2 常见优化要点

  1. 合理设置核心线程数

    • 根据CPU核数、任务的计算/IO比重来估算,避免过多线程相互抢占 CPU。

  2. 限制最大线程数

    • 避免在流量高峰时创建海量线程,造成系统不稳定或 OOM。

  3. 选取合适队列

    • 大小队列可做“削峰填谷”;但太大可能导致过长延迟,太小容易频繁拒绝任务。

  4. 使用拒绝策略做“保护”

    • 高并发场景下,建议使用 CallerRunsPolicy 或自定义方案,让提交者承担一部分工作或拒绝低优先级任务。

  5. 核心线程超时

    • 通过 allowCoreThreadTimeOut(true) 在空闲时也回收核心线程,适合波动非常大的场景,节省资源。

  6. 命名线程

    • 自定义 ThreadFactory:给线程池的线程起易识别的名字,方便排查问题。


5. 小结

  1. 工作流程: 先用核心线程处理任务 → 队列排队 → 超过队列则扩容至最大线程数 → 再不行就拒绝。

  2. 关键参数

    • corePoolSizemaximumPoolSize 决定并发线程数量上下限;

    • workQueue 决定排队策略和长度;

    • 拒绝策略影响系统在饱和时如何应对。

  3. 监控与调优

    • 动态监控线程池运行状态(线程数、队列长度等);

    • 合理设置线程数、队列大小、拒绝策略,确保稳定性与吞吐量。

  4. 实战建议

    • 优先使用框架内置线程池(如 Executors.newFixedThreadPool / newCachedThreadPool 等),根据业务需求做更细调;

    • 对于核心业务,需定制 ThreadPoolExecutor 并结合监控与限流/降级措施,避免在极端情况下导致系统崩溃。

通过以上要点的掌握,可有效使用 Java 线程池完成并发任务的高效调度和资源管理,实现“高并发,低延迟”的应用目标。

6. 高级线程池用法

CachedThreadPool

  • 默认 最大 Interger. MAX_VALUE 线程数的线程池

  • ExecutorService service = Executors.newCachedThreadPool()

FixedThreadPool

  • 定长线程池,最优数量:Runtime.getRuntime().availableProcessors()

  • 不会释放线程

  • ExecutorService service Executors.newFixedThreadPool(10);

SingleThreadPool

  • 同时只有一个线程运行,可以按照指定顺序(FIFO, LIFO, 优先级)执行

  • ExecutorService service = Executors.newSingleThreadExecutor();

ScheduleThreadPool

  • 定时执行线程池

  • ExecutorService service = Executors.newScheduledThreadPool(10);

WorkStealingPool

  • Fork/Join

        public static void main(String[] args) throws IOException {
            // 根据cpu是几核来开启几个线程
            ExecutorService service = Executors.newWorkStealingPool();
            // 查看当前计算机是几核
            System.out.println(Runtime.getRuntime().availableProcessors());
            service.execute(new R(1000));
            service.execute(new R(2000));
            service.execute(new R(3000));
            service.execute(new R(1000));
            service.execute(new R(2000));
    
            // WorkStealing是精灵线程(守护线程、后台线程),主线程不阻塞,看不到输出。
            // 虚拟机不停止,守护线程不停止
            System.in.read();
        }
    
        static class R implements Runnable {
            int time;
    
            public R(int time) {
                this.time = time;
            }
    
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(time + ":" + Thread.currentThread().getName());
            }
        }
    1000:ForkJoinPool-1-worker-1
    1000:ForkJoinPool-1-worker-4
    2000:ForkJoinPool-1-worker-2
    2000:ForkJoinPool-1-worker-5
    3000:ForkJoinPool-1-worker-3

Last updated

Was this helpful?