Java(七):7.0 Java并发编程

7.0 Java并发编程

欢迎来到Java并发编程的世界。在正式踏上这段旅程之前,我们首先需要回答一个根本性的问题:在已经能够编写出功能完善的单线程程序的情况下,我们为什么还要投入精力去学习和使用看似复杂得多的并发编程呢,注意,该案例里的所有代码都不建议手敲,我们学过的Hutool中有提供并发相关的方法,我们会着重讲解那部分的语法点,对于前面的知识,我们可以理解里面的API,以后再来查阅即可!

7.1 [基础] 并发编程入门

7.1.1 为什么需要并发编程?

简单来说,驱动我们使用并发编程的根本原因,是对效率的极致追求。

充分利用硬件性能

随着硬件技术的飞速发展,现代计算机早已进入了多核CPU时代。这意味着我们的设备拥有多个能够独立执行计算任务的“大脑”。如果程序依旧是传统的单线程模型,那么在任何时刻,它最多只能利用其中一个“大脑”。无论硬件多么强大,其他的核心都将处于空闲状态,这无疑是对计算资源的巨大浪费。

并发编程允许我们将一个大任务拆分成多个子任务,并将这些子任务交给不同的线程,这些线程可以被操作系统调度到不同的CPU核心上实现真正的并行执行。

对于计算密集的任务(例如,大规模数据分析、图像渲染),并发能够成倍地缩短处理时间,带来性能的质变。

优化程序响应能力

程序在运行过程中,并非所有时间都在进行计算。很多时候,线程需要等待某些操作完成,例如读取磁盘文件、调用网络接口、等待数据库返回结果等。这些操作统称为I/O操作。

在单线程模型中,一旦线程发起I/O请求,它就会进入阻塞状态,必须等到操作完成后才能继续。如果这是一个图形界面的主线程,那么在阻塞期间,整个应用界面会卡住,无法响应用户操作,体验将非常糟糕。

并发编程可以完美解决这个问题。当一个线程因为I/O操作而阻塞时,CPU可以立即切换去执行其他线程,从而让应用保持流畅。

在需要与用户交互或处理网络请求的场景下,并发能够避免因等待而产生的“假死”现象,极大地提升应用的响应速度和吞吐量。

总而言之,我们学习和使用并发编程,并非为了炫技,而是解决实际工程问题的必要手段。它是通往高性能、高响应能力软件架构的必经之路。

7.1.2 [基础] 核心概念辨析

在深入学习之前,理清并发世界中的几个基础概念至关重要,它们是我们后续讨论的基石。

进程与线程

我们可以将进程想象成一个正在运行的应用程序,比如你打开的音乐播放器或浏览器。操作系统会为每个进程分配一块独立的内存空间,它们之间的数据默认是隔离的,互不干扰。

线程则是进程内部的一条执行路径。一个进程可以包含一个或多个线程,它们共享进程的内存资源(如代码、数据)。可以把进程比作一个工厂,而线程就是工厂里的流水线,多条流水线可以同时工作,共享工厂的资源来完成生产任务。

[面试题] 进程是操作系统进行资源分配的基本单位,而线程CPU进行任务调度的基本单位。一个进程至少拥有一个线程,即主线程。

并发与并行

这两个词在日常交流中经常被混用,但在技术领域它们有明确的区别。

  • 并发 :指的是在一个时间段内,多个任务都在向前推进。在单核CPU上,操作系统通过快速地在多个任务之间切换来实现并发,宏观上看,感觉像是所有任务在同时执行,但微观上任意时刻只有一个任务在被执行。这好比一个人在同时做饭、接电话和看孩子,他需要在几件事情之间不停切换。

  • 并行:指的是在同一时刻,有多个任务在真地同时执行。这必须依赖多核CPU才能实现,每个核心在同一瞬间处理一个独立的任务。这好比一个团队里有多个人,每个人都在同时炒自己面前的一盘菜。

并行是并发的一种更理想、更高效的实现形式。并发是逻辑层面的概念,而并行是物理层面的实现。我们的目标是编写并发程序,并利用多核硬件让它尽可能地并行执行。

同步与异步

这两个概念描述的是方法调用的行为模式。

  • 同步:同步调用就像去餐厅点餐,你点完菜后,必须在座位上一直等待,直到服务员把菜端上来,你才能去做别的事情(比如吃饭)。
  • 在程序中,一个同步方法调用,调用方必须等待该方法执行完毕并返回结果,才能继续执行下一步。
  • 异步:异步调用则像是点外卖,你下完单就可以立刻去做别的事情了,比如看电视、打游戏。外卖送到后,你会收到一个通知(比如电话或门铃)
    • 在程序中,一个异步方法调用会立即返回,调用方无需等待方法执行完成,可以在未来的某个时刻通过回调函数、状态查询或通知等方式获取结果。

7.1.3 [基础] 线程的生命周期与状态

一个线程从被创建到最终消亡,会经历一系列不同的状态,理解这些状态是诊断和调试并发问题的基础。在Java中,线程的生命周期被明确地定义为六种状态,它们都封装在Thread.State这个枚举类中。

img

线程的六种状态
  • 新建 (NEW)
    当一个Thread对象被创建出来,但还没有调用其start()方法时,它就处于新建状态。此时,它仅仅是一个普通的Java对象,操作系统还没有为它分配任何线程资源。

  • 可运行 (RUNNABLE)
    这是一个复合状态。一旦我们调用了线程的start()方法,它就进入了可运行状态。此时,它可能正在CPU上执行,也可能正在等待操作系统的CPU时间片分配。Java虚拟机将这两种情况(ReadyRunning)统一归为RUNNABLE状态。

  • 阻塞 (BLOCKED)
    当一个线程试图进入一个synchronized同步代码块或方法,但该代码块的锁已经被其他线程持有时,该线程就会进入阻塞状态。它会暂停执行,直到成功获取到锁,才会转为可运行状态。

  • 等待 (WAITING)
    当线程调用了没有设置超时时间的Object.wait()方法、Thread.join()方法或LockSupport.park()方法时,会进入该状态。处于等待状态的线程需要等待一个明确的外部信号才能被唤醒。例如,另一个线程调用了Object.notify()Object.notifyAll()

  • 计时等待 (TIMED_WAITING)
    等待状态类似,但这个状态是有时间限制的。当线程调用了带有超时参数的方法,如Thread.sleep(long millis)Object.wait(long timeout)Thread.join(long millis)时,就会进入此状态。线程会等待指定的时间,时间结束后会自动唤醒,或者在等待期间被其他线程提前唤醒。

  • 终止 (TERMINATED)
    当线程的run()方法执行完毕,或者因未捕获的异常而退出时,线程就进入了终止状态。这标志着线程的整个生命周期已经结束。

核心要点:线程的状态反映了它在特定时刻的活动情况。通过Thread.getState()方法,我们可以获取到线程的当前状态,这对于监控和分析多线程程序的行为非常有帮助。例如,如果发现大量线程处于阻塞状态,可能意味着存在严重的锁竞争。

7.1.4 [基础] 创建与管理线程

了解了线程的状态后,我们来看看如何实际地创建并控制一个线程。

两种核心方式的对比

在Java中,我们主要通过两种方式来定义一个线程所要执行的任务。

  • 实现 Runnable 接口
    这是最推荐的方式。我们创建一个类并实现Runnable接口,然后重写其run()方法。这个Runnable实例代表了一个“任务”,它可以被提交给一个Thread对象来执行。

  • 继承 Thread
    我们也可以创建一个类并直接继承Thread类,然后重写其run()方法。

[面试题] 为什么推荐使用实现Runnable接口的方式?

  1. 解耦:实现Runnable接口的方式将“任务”(Runnable)与“执行者”(Thread)分离开来,结构更清晰。一个任务可以被多个不同的线程执行。
  2. 避免单继承局限:Java类只支持单继承。如果我们的类继承了Thread,就无法再继承其他任何类,这在复杂的系统设计中限制了扩展性。实现接口则没有这个限制。
  3. 资源共享:多个线程可以共享同一个Runnable实例,从而方便地共享数据。
线程常用操作方法解析

下表简要概述了几个最核心的线程操作方法,以便快速查阅。

方法核心作用简要说明
start()启动新线程异步执行run()方法,这是正确的启动方式。
run()定义任务逻辑直接调用等于普通方法,不会创建新线程。
sleep(long)暂停当前线程静态方法,暂停期间不释放锁
join()等待目标线程结束调用t.join()会使当前线程等待t线程执行完毕。
interrupt()发送中断信号协作式中断,设置中断标志位或抛出InterruptedException

interrupt()机制取代了已被废弃的stop()方法,因为stop()方法会立即终止线程并释放所有锁,这可能导致对象状态不一致,是极其不安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.example;

import java.util.concurrent.TimeUnit;

/**
* Main 类,简洁演示核心线程操作方法。
*/
public class Main {

public static void main(String[] args) throws InterruptedException {
System.out.println("主线程: 开始演示.");

// 1. start() 和 run() - 演示启动新线程
Thread workerThread = new Thread(new WorkerTask("工作者"));
System.out.println("主线程: 启动工作线程...");
workerThread.start(); // 启动新线程

// 2. join() - 演示等待线程结束
System.out.println("主线程: 等待工作线程结束...");
workerThread.join(); // 等待 workerThread 完成
System.out.println("主线程: 工作线程已结束.");

System.out.println("\n主线程: 演示 sleep() 和 interrupt().");

// 3. sleep() 和 interrupt() - 演示暂停和中断
Thread interruptibleThread = new Thread(new InterruptibleTask("可中断"));
System.out.println("主线程: 启动可中断线程...");
interruptibleThread.start();

// 稍作等待让线程进入睡眠
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("主线程: 中断可中断线程...");
interruptibleThread.interrupt(); // 发送中断信号

// 等待可中断线程结束
interruptibleThread.join();
System.out.println("主线程: 可中断线程已结束.");

System.out.println("主线程: 演示结束.");
}

/**
* 简单的线程任务,模拟工作。
*/
static class WorkerTask implements Runnable {
private final String name;

WorkerTask(String name) {
this.name = name;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": " + name + " 开始工作.");
try {
TimeUnit.SECONDS.sleep(1); // 模拟工作耗时
System.out.println(Thread.currentThread().getName() + ": " + name + " 工作完成.");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": " + name + " 工作被中断!");
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}

/**
* 一个会睡眠并可能被中断的任务。
*/
static class InterruptibleTask implements Runnable {
private final String name;

InterruptibleTask(String name) {
this.name = name;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": " + name + " 开始.");
try {
System.out.println(Thread.currentThread().getName() + ": " + name + " 将要睡眠 2 秒...");
// sleep() 暂停当前线程,不释放锁
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + ": " + name + " 睡眠结束."); // 如果未被中断,会打印此句
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": " + name + " 的睡眠被中断!");
// 捕获到中断,可以做清理或停止操作。这里我们直接打印并允许线程结束。
// Thread.currentThread().interrupt(); // 如果需要让上层知道中断,可以重设中断状态
} finally {
System.out.println(Thread.currentThread().getName() + ": " + name + " 结束.");
}
}
}
}
守护线程

在Java中,线程分为用户线程和守护线程。守护线程是一种特殊的线程,其使命是为其他用户线程服务,典型的例子是Java垃圾回收(GC)线程。

核心特性:当一个Java虚拟机中不存在任何存活的用户线程时,虚拟机就会退出。这意味着,如果只剩下守护线程,它们会被立即终止,无论其任务是否执行完毕。我们可以通过thread.setDaemon(true)方法在线程启动前将其设置为守护线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.example;

import java.util.concurrent.TimeUnit;

/**
* Main 类,极简演示守护线程。
*/
public class Main {

public static void main(String[] args) throws Exception {
System.out.println("主线程: 启动守护线程...");

// 1. 创建并启动一个守护线程
Thread daemonThread = new Thread(new DaemonTask("守护者"), "DaemonThread");
daemonThread.setDaemon(true); // 设置为守护线程
daemonThread.start();

System.out.println("主线程: 守护线程已启动.");

// 模拟主线程(用户线程)的短暂活动后结束
TimeUnit.SECONDS.sleep(2);

System.out.println("主线程: 主线程即将结束.");
// 当主线程结束,且没有其他用户线程时,JVM会退出,守护线程也会立即终止。
}

/**
* 模拟一个守护线程的任务。
*/
static class DaemonTask implements Runnable {
private final String name;

DaemonTask(String name) {
this.name = name;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": " + name + " 作为守护线程启动.");
try {
// 守护线程通常是后台服务,会一直运行,直到用户线程结束
while (true) {
System.out.println(Thread.currentThread().getName() + ": " + name + " 正在服务...");
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": " + name + " 被中断!");
Thread.currentThread().interrupt();
}
// 注意:由于守护线程会被非正常终止,这里的“守护任务结束”可能不会被打印。
System.out.println(Thread.currentThread().getName() + ": " + name + " 任务执行完毕.");
}
}
}

7.2 [核心] 线程安全与同步控制

Java并发安全问题的根源,均可归结于JMM(Java内存模型)在屏蔽硬件底层差异时,未能天然保证的三大特性。理解它们是掌握并发编程的前提。

7.2.1 [核心] 线程安全问题的根源

三大特性速查表

为了快速定位和理解并发问题,我们可以将这三大特性总结如下:

特性核心定义问题的根源典型场景 / 面试题
原子性操作不可分割,要么全做完,要么不做。复合操作(如 i++)在CPU层面由多条指令构成。i++ 在多线程下的计数错误。
**可见性 **一个线程对共享变量的修改,对其他线程立即可见。CPU多级缓存导致各线程工作内存数据不一致。while(!stop) 循环中,stop 标志的修改无法被读取线程看到。
有序性程序按代码的书写顺序执行。编译器和处理器的指令重排序优化。双重检查锁定(DCL)单例模式中获取到半初始化的对象。

特性详解与代码示例

下面我们结合代码,对每个特性进行深入剖析。

1. 原子性

[高频面试点] 讲解一下 i++ 的线程不安全问题。

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 示例:一个非线程安全的计数器
    public class UnsafeCounter {
    private int count = 0;

    // 该方法在多线程环境下调用会产生问题
    public void increment() {
    count++; // 这是一个典型的非原子操作
    }

    public int getCount() {
    return count;
    }
    }
  • 讲解:
    1. count++ 看似一步,实则在底层包含了 读取-修改-写入 三个独立的步骤。
    2. 问题: 在多线程环境下,两个线程可能同时读取到旧值(例如 5),各自计算得到 6,然后先后将 6 写回。我们期望的结果是 7,但实际结果却是 6,造成了数据更新的丢失。

2. 可见性

[高频面试点] 什么是可见性问题?举例说明。

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 示例:一个可能无法停止的线程
    public class VisibilityProblem {
    // 若没有可见性保障,stop的修改对子线程可能永远不可见
    private static boolean stop = false;

    public static void main(String[] args) throws InterruptedException {
    new Thread(() -> {
    int i = 0;
    // 子线程从自己的工作内存中读取stop,可能一直是false
    while (!stop) {
    i++;
    }
    System.out.println("Loop finished, i = " + i);
    }).start();

    Thread.sleep(1000); // 确保子线程已启动并运行
    stop = true; // 主线程修改stop,但子线程可能看不到
    System.out.println("Main thread set stop to true.");
    }
    }
  • 讲解:
    1. 为了提高效率,执行循环的子线程可能会将 stop 变量的值 false 缓存到自己的高速工作内存中。
    2. 问题: 即使主线程在 1 秒后将主内存中的 stop 修改为 true,子线程也可能因为一直从自己的缓存读取数据而无法感知到这一变化,从而导致无限循环。

3. 有序性

[高频面试点] 请解释指令重排序,并说明它如何导致DCL(双重检查锁定)单例模式失效。

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 示例:线程不安全的双重检查锁定(DCL)
    public class UnsafeDCLSingleton {
    // instance 必须使用 volatile 修饰才能保证线程安全
    private static UnsafeDCLSingleton instance;

    private UnsafeDCLSingleton() {} // 私有构造

    public static UnsafeDCLSingleton getInstance() {
    if (instance == null) { // 第一次检查
    synchronized (UnsafeDCLSingleton.class) {
    if (instance == null) { // 第二次检查
    // 问题根源:该操作可能被重排序
    instance = new UnsafeDCLSingleton();
    }
    }
    }
    return instance;
    }
    }
  • 讲解:
    1. instance = new UnsafeDCLSingleton() 并非原子操作,它大致可分为三步:
      • A. 为对象分配内存空间。
      • B. 调用构造函数,初始化对象。
      • C. 将 instance 引用指向分配的内存地址。
    2. 正常的执行顺序是 A -> B -> C。但由于指令重排序,实际执行顺序可能是 A -> C -> B。
    3. 问题: 如果线程T1按 A->C->B 顺序执行,在完成C(instance引用已不为null)但还未执行B(对象未初始化)时,线程T2调用getInstance()。T2会发现instance不为null,从而直接返回一个尚未初始化的“半成品”对象,后续使用将引发严重错误。

7.2.2 [高频] synchronized 关键字

synchronized 是Java提供的内置锁机制,它是一种悲观锁可重入锁。其核心目标是为代码块或方法提供互斥访问,从而一次性解决原子性、可见性和有序性问题。

**悲观锁:**悲观锁认为并发访问总会发生冲突,因此在访问资源前就加锁,阻止其他线程访问,直到锁被释放。

可重入:可重入是指函数在执行过程中,允许被同一个线程再次调用而不会产生副作用或数据损坏,就像从未被调用过一样。

核心用法与锁对象(Monitor)

正确使用synchronized的关键在于理解**“锁住的是哪个对象”**。不同的使用方式,其锁定的对象(即Monitor)也不同。

使用方式 (Usage)锁定的对象 (Locked Object / Monitor)讲解
修饰实例方法当前类的实例对象 (this)当一个线程进入该方法时,它锁定了这个对象实例。其他线程无法同时访问该实例的任何其他synchronized方法。
修饰静态方法当前类的 Class 对象 (Xxx.class)它锁定的是整个,这是一个全局锁。无论有多少个实例,任何线程进入该方法都会阻止其他线程进入这个类的任何synchronized静态方法。
修饰代码块括号内 () 指定的任意对象这是最灵活的方式,可以精确控制锁的范围和粒度。锁定的就是括号里指定的那个对象实例。

代码示例与场景分析

1. 修饰实例方法

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 锁住的是 bankAccount 这个实例
    public class BankAccount {
    private double balance;

    // 锁是 this,即 BankAccount 的实例对象
    public synchronized void deposit(double amount) {
    // 这里的操作是线程安全的
    balance += amount;
    }
    }
  • 场景分析: 如果有两个线程同时操作accountA.deposit(100),它们会因为争抢accountA这个对象的锁而串行执行。但如果一个线程操作accountA.deposit(100),另一个线程操作accountB.deposit(100),它们之间不会有任何影响,因为它们获取的是不同实例(accountAaccountB)的锁。

2. 修饰静态方法

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    // 锁住的是整个 Logger class
    public class Logger {
    // 锁是 Logger.class 这个 Class 对象
    public static synchronized void log(String message) {
    // ... 写入日志文件
    }
    }
  • 场景分析: 无论有多少个地方调用Logger.log("..."),由于锁是Logger.class这个全局唯一的对象,所有调用都会串行执行,确保了日志写入操作不会交叉混乱。

3. 修饰代码块

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class FineGrainedLock {
    // 推荐做法:使用一个私有的、final的、不可变的对象作为锁
    private final Object lock = new Object();

    public void performAction() {
    // ... 非线程安全的代码 ...

    // 只对关键部分加锁,减小锁的粒度,提高性能
    synchronized (lock) {
    // 这里的操作是线程安全的
    }

    // ... 其他非线程安全的代码 ...
    }
    }
  • 场景分析: 这是推荐的用法,因为它将锁的范围限制在最小的必要代码片段上。使用一个专门的lock对象(而不是this)可以避免外部代码无意中获取了this的锁而导致死锁或性能问题,这是一种良好的封装实践。
[高频面试点] 深入理解 synchronized

Q1: synchronized 是可重入锁吗?如何实现的?

A: 是的,它是可重入的。

  • 表现: 一个线程在持有锁的情况下,可以自由进入由同一个锁保护的其他同步代码区,不会自己把自己锁死。
  • 实现原理: 每个对象监视器(Monitor)内部都有一个计数器。当一个线程首次获取锁时,计数器变为1,此后该线程每重入一次,计数器就加1。每次退出同步代码区,计数器减1。当计数器归零时,锁被完全释放。

Q2: synchronized 的底层是如何实现的?

A: 它依赖于Java对象头和对象的监视器(Monitor)实现。

  • 同步代码块: JVM通过编译后的字节码指令 monitorentermonitorexit 来支持。当执行monitorenter时,线程尝试获取对象的Monitor所有权。monitorexit则释放所有权。
  • 同步方法: 方法的实现则更为隐蔽,它依赖于方法访问标志位ACC_SYNCHRONIZED。当JVM调用一个有此标志的方法时,会自动执行获取锁、执行方法体、释放锁的操作。
  • 锁优化 (Java 6+): 为了提升性能,synchronized引入了锁升级机制,演进路径为:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁。锁会根据竞争情况自动升级,但通常不能降级。这使得synchronized在无竞争或低竞争场景下的性能得到极大提升。

7.2.3 [高频] volatile 关键字

如果说 synchronized 是重量级的“保险箱”,那么 volatile 就是轻量级的“公示牌”。它是一个变量修饰符,不提供互斥锁定,但能以更低的成本确保多线程环境下的可见性有序性

核心要点: volatile 不保证原子性。这是理解和使用它的关键前提。

两大核心作用

1. 保证可见性

  • 实现原理: 当一个线程写入 volatile 变量时,JMM会强制将该线程工作内存中的值立即刷新回主内存。当另一个线程读取 volatile 变量时,JMM会强制其使本地工作内存的缓存失效,并从主内存中重新加载最新值。
  • 效果: 通过这种机制,volatile 变量的任何修改,都能对其他线程立即可见,有效解决了CPU缓存导致的数据不一致问题。

2. 禁止指令重排

  • 实现原理: JMM会为 volatile 变量的读写操作插入特定的内存屏障
    • 在写操作前插入屏障,保证之前的任何操作都已完成。
    • 在写操作后插入屏障,保证其结果对其他线程可见。
    • 在读操作前插入屏障,保证本地缓存失效,从主存读取。
    • 在读操作后插入屏障,保证后续操作能看到 volatile 变量的值。
  • 效果: 这些屏障确保了 volatile 变量相关的操作不会被编译器或处理器随意重排序,从而维护了多线程环境下的程序逻辑。最经典的应用就是修复了双重检查锁定(DCL)单例模式的隐患。

[高频面试点] volatile vs. synchronized

这是Java并发面试中最经典的问题之一。下表清晰地展示了它们的区别:

对比维度volatilesynchronized
作用级别变量级别(Variable Level)代码块 / 方法级别(Block / Method Level)
保证的特性✅ 可见性<br>✅ 有序性<br>❌ 不保证原子性✅ 原子性<br>✅ 可见性<br>✅ 有序性
是否阻塞否,是一种非阻塞的同步机制是,未获取到锁的线程会进入阻塞状态
底层实现内存屏障(Memory Barrier)对象监视器(Monitor),涉及锁的获取与释放
性能开销较低,不涉及线程上下文切换较高,涉及锁竞争、上下文切换和调度
适用场景读多写少,且写入不依赖变量原值的场景需要保证原子性的复合操作场景

适用场景与避坑指南

Q: 既然 volatile 这么轻量,为什么不都用它呢?因为它不保证原子性,解释一下?

A: volatile 的“可见性”仅保证你每次拿到的都是最新值,但无法保护“读取-修改-写入”这个完整的过程不被打断。

  • 代码示例:
    1
    2
    3
    4
    5
    6
    7
    8
    public class VolatileAtomicityTest {
    // 使用 volatile 也无法保证 ++ 操作的原子性
    public volatile int count = 0;

    public void increment() {
    count++;
    }
    }
  • 避坑指南:
    1. 线程A读取 volatile count 的值为5
    2. 此时线程B也来读取 volatile count 的值,由于可见性保证,它也读到5
    3. 线程A在自己的工作内存中执行+1操作,得到6,并立即写回主存。
    4. 线程B也在自己的工作内存中执行+1操作,得到6,并立即写回主存。
    5. 最终结果: 主存中的count值为6,而不是期望的7volatile确保了每次读/写都是最新的,但无法阻止两个线程基于同一个旧值进行计算。

正确的应用场景:

  1. 状态标志位: 当一个线程修改状态,而其他线程依赖此状态来决定是否继续执行时,这是 volatile 的完美应用。

    1
    2
    volatile boolean shutdownRequested;

public void shutdown()

public void doWork() {
    while (!shutdownRequested) {
        // ... do stuff
    }
}
1
2
3
4
5
2.  **双重检查锁定(DCL)**: 在单例模式中,为 `instance` 变量加上 `volatile` 是必不可少的,它可以防止因指令重排而导致其他线程获取到未完全初始化的对象。

```java
// 正确的DCL实现
private static volatile Singleton instance;

7.2.4 [进阶] JUC 锁机制:Lock 接口

synchronized 关键字虽然简单易用,但其功能相对单一,无法满足所有复杂的并发场景。为此,JUC提供了一套全新的锁机制——Lock接口,它将锁的获取和释放操作对象化,提供了比synchronized更丰富的控制能力。

核心实现:ReentrantLock

ReentrantLockLock 接口最主要的实现类,它提供了与 synchronized 相同的可重入性互斥性,但功能更为强大。

  • [避坑指南] 标准使用范式
    使用 Lock 必须手动释放锁,且为了保证在任何情况下(即使发生异常)锁都能被释放,必须将 unlock() 操作放在 finally 块中。这是铁律。

  • 代码示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;

    public class LockExample {
    private final Lock lock = new ReentrantLock();

    public void performAction() {
    // 1. 获取锁
    lock.lock();
    try {
    // 2. 将需要同步控制的代码放入 try 块
    System.out.println(Thread.currentThread().getName() + " 获取了锁,执行关键操作...");
    Thread.sleep(500); // 模拟业务耗时
    } catch (InterruptedException e) {
    // 处理中断
    } finally {
    // 3. 必须在 finally 块中释放锁
    lock.unlock();
    System.out.println(Thread.currentThread().getName() + " 释放了锁。");
    }
    }
    }
ReentrantLock 的高级特性

ReentrantLock之所以被称为 synchronized 的增强版,在于其提供了以下独有功能:

  1. 可中断获取锁 (lockInterruptibly): 线程在等待获取锁的过程中,可以响应中断信号,从而避免长时间无意义的等待。而等待synchronized锁的线程是无法被中断的。
  2. 可超时获取锁 (tryLock): tryLock(long time, TimeUnit unit) 允许线程在指定时间内尝试获取锁,如果超时仍未获取到,则会返回false,线程可以继续执行其他逻辑,这在处理死锁问题时非常有用。
  3. 公平性选择 (Fairness): ReentrantLock的构造函数new ReentrantLock(boolean fair)允许我们创建公平锁非公平锁
    • 公平锁: 严格按照线程请求的先后顺序(FIFO)分配锁。优点是不会产生饥饿现象,缺点是吞吐量较低。
    • 非公平锁 (默认): 允许新来的线程“插队”,直接尝试获取锁。优点是吞吐量更高,缺点是可能导致某些线程长时间获取不到锁(饥饿)。
    • synchronized 关键字则一直是非公平的。
读写锁:ReadWriteLock

在“读多写少”的场景下,如果依然使用ReentrantLocksynchronized这样的排他锁,会让所有读操作也串行执行,极大地限制了并发性能。ReadWriteLock正是为了解决这个问题而生。

  • 核心思想: 读写分离。
    • 读锁(共享锁): 多个线程可以同时持有读锁,并发读取数据。
    • 写锁(排他锁): 一次只能有一个线程持有写锁,且在持有期间,其他任何读、写线程都必须等待。
  • 代码示例 (ReentrantReadWriteLock是其标准实现):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;

    // 使用读写锁实现一个线程安全的缓存
    public class Cache<K, V> {
    private final Map<K, V> map = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock rLock = rwLock.readLock(); // 获取读锁
    private final Lock wLock = rwLock.writeLock(); // 获取写锁

    public V get(K key) {
    rLock.lock(); // 加读锁,允许多个读线程并发
    try {
    return map.get(key);
    } finally {
    rLock.unlock();
    }
    }

    public void put(K key, V value) {
    wLock.lock(); // 加写锁,一次只允许一个写线程
    try {
    map.put(key, value);
    } finally {
    wLock.unlock();
    }
    }
    }

[高频][面试题] ReentrantLock 与 synchronized 的全方位对比
对比维度ReentrantLock (JUC Lock)synchronized (JVM Keyword)
实现层面JDK 层面实现的 APIJVM 内置的 关键字
锁的释放必须手动finally 块中调用unlock()释放JVM 自动释放(代码块执行完毕或异常退出)
功能特性功能丰富:可中断、可超时、可选择公平性、可绑定多个Condition功能单一:仅有非公平的可重入锁
性能Java 6后两者性能基本持平,官方更推荐synchronizedJava 6后经过锁升级等大量优化,性能不再是短板
使用便利性相对复杂,需要try-finally模板代码简单直观,不易出错
底层原理基于 AQS (AbstractQueuedSynchronizer) 框架基于对象头的 Monitormonitorenter/exit 指令

总结: 在现代Java开发中,synchronized因其简单、不易出错且性能优异,依然是绝大多数场景下的首选。只有在需要ReentrantLock提供的高级特性(如可中断、超时、公平锁)时,才考虑使用它。


7.2.5 [进阶] 原子类(Atomic Classes)

当我们需要对单个变量进行线程安全的操作时,使用synchronizedReentrantLock虽然可行,但显得过于“重”,因为它们会引起线程的阻塞和唤醒,开销较大。为此,JUC提供了一套“无锁”的解决方案——原子类,它们位于java.util.concurrent.atomic包下。

无锁编程的利器:CAS (Compare-And-Swap)

原子类的神奇之处在于,它们不使用锁,却能实现线程安全,其背后的核心技术就是CAS(比较并交换)

  • CAS思想: 这是一种乐观锁的实现。它假设在操作期间数据不会被其他线程修改,因此不会加锁。在真正要更新数据时,它会执行以下三个步骤:
  1. 读取内存中的当前值 V
    2. 计算出要更新的新值 N
    3. 在写入新值前,再次读取内存中的值,检查它是否依然是V。如果是,说明没有其他线程修改过,就将值更新为N。如果不是,说明在此期间数据已被修改,本次更新失败,然后通常会进行重试(自旋),直到成功为止。
  • 硬件支持: CAS操作并非由Java实现,而是依赖于CPU提供的一条原子指令(如cmpxchg),这保证了“比较并交换”这个过程本身是不可分割的,从而实现了高效的无锁并发控制。
常用原子类一览
原子类 (Atomic Class)作用常用方法
AtomicInteger以原子方式更新 intget(), set(), incrementAndGet(), getAndIncrement(), compareAndSet()
AtomicLong以原子方式更新 longAtomicInteger类似
AtomicBoolean以原子方式更新 booleanget(), set(), compareAndSet()
AtomicReference<V>以原子方式更新对象引用get(), set(), compareAndSet()
代码示例:原子计数器

让我们用AtomicInteger来重写之前那个线程不安全的计数器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AtomicCounter {
// 使用 AtomicInteger 替代 int
private final AtomicInteger count = new AtomicInteger(0);

// incrementAndGet() 是一个原子操作
public void increment() {
count.incrementAndGet();
}

public int getCount() {
return count.get();
}

public static void main(String[] args) throws InterruptedException {
final int threadCount = 1000;
AtomicCounter counter = new AtomicCounter();
ExecutorService executor = Executors.newCachedThreadPool();

// 1000个线程,每个线程调用1000次increment
for (int i = 0; i < threadCount; i++) {
executor.execute(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
}

executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

// 期望结果是 1000 * 1000 = 1,000,000
System.out.println("Final Count: " + counter.getCount());
}
}
  • 讲解: count.incrementAndGet()方法内部封装了CAS操作。它会循环尝试“获取当前值,加1,然后比较并设置新值”,直到成功为止。整个过程没有使用任何锁,但却能保证在多线程环境下计数的准确性,性能远高于使用synchronized的版本。

[高频面试点] CAS 的 ABA 问题

Q: CAS 有什么缺点?什么是 ABA 问题?

A: CAS 的主要缺点是 ABA 问题

  • 定义: 一个变量的值原来是A,被另一个线程改成了B,然后又被改回了A。CAS检查时会发现它的值仍然是A,于是认为它没有被修改过,从而执行更新操作。但在某些业务场景下,这个“失而复得”的过程可能已经破坏了数据的一致性。
  • 比喻: 你去取款机取钱,余额是100元。你操作时卡了一下,此时你老婆用手机银行给你转了50元,余额变为150元,但她马上又消费了50元,余额又变回100元。当你恢复操作时,CAS检查发现余额还是100元,就认为没问题,继续你的取款操作。但实际上,账户的流水已经发生了变化。

Q: 如何解决 ABA 问题?

A: JUC 提供了 AtomicStampedReference 类来解决。

  • 原理: 它在CAS的基础上,额外增加了一个版本号(Stamp)。每次变量更新时,不仅要比较值,还要比较版本号。当值被修改时,版本号也会随之改变。这样,即使值从A变B再变回A,版本号也已经不同了,CAS操作就会失败。
1
2
3
4
// AtomicStampedReference<V> 构造时需要一个初始引用和一个初始版本号
AtomicStampedReference<String> stampedRef = new AtomicStampedReference<>("A", 1);
// 比较并设置时,需要同时提供期望的引用、新引用、期望的版本号、新版本号
boolean success = stampedRef.compareAndSet("A", "B", 1, 2);

7.2.6 [高频] ThreadLocal 的妙用与陷阱

ThreadLocal 提供了一种与众不同的解决并发问题思路。它不加锁,而是通过为每个线程提供一个独立的变量副本,来实现数据的线程隔离。这是一种“以空间换时间”的策略:为每个线程都分配一块内存,从而避免了因共享资源而产生的同步等待时间。

核心思想与底层原理
  • 一句话概括: ThreadLocal 使得访问某个变量的每个线程都有自己的、独立的初始值,线程之间互不影响。
  • 底层原理:
    1. 每个 Thread 对象内部都有一个名为 threadLocals 的成员变量,它的类型是 ThreadLocal.ThreadLocalMap
    2. 当我们调用 threadLocal.set(value) 时,实际上是获取当前线程ThreadLocalMap,然后以**threadLocal对象自身为Key**,以**value为Value**,存入这个Map中。
    3. 调用 threadLocal.get() 时,也是先获取当前线程的 ThreadLocalMap,再以 threadLocal 对象为Key,从中取出对应的Value。
    4. 关键: 数据实际上是存储在线程自己的Map里,ThreadLocal对象只是一个用来从中存取数据的“钥匙”。

妙用:实际应用场景
  1. 数据库连接管理: 在Web应用中,一个请求通常由一个线程处理。我们可以将该请求生命周期内使用的数据库连接存入ThreadLocal,这样该线程中的所有方法(无论调用链路多深)都能方便地获取到同一个连接,避免了频繁创建销毁连接的开销,也便于事务控制。

  2. 用户身份信息传递: 在处理用户请求时,可以将用户信息(如User对象)存入ThreadLocal。这样,业务逻辑的各个层面(Controller, Service, DAO)都能直接获取当前用户,无需在方法参数中层层传递。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class UserContextHolder {
    // 创建一个ThreadLocal来持有用户信息
    private static final ThreadLocal<User> holder = new ThreadLocal<>();

    public static void setUser(User user) {
    holder.set(user);
    }

    public static User getUser() {
    return holder.get();
    }

    public static void clear() {
    holder.remove();
    }
    }
  3. 保障线程不安全的工具类: SimpleDateFormat 是一个典型的线程不安全类。通过 ThreadLocal 为每个线程创建一个独立的实例,就可以在不加锁的情况下安全地使用它。(注:Java 8及以后版本,推荐使用线程安全的java.time.format.DateTimeFormatter)。


[高频][面试题] ThreadLocal 内存泄漏问题

这是ThreadLocal最核心,也是最高频的面试题。

Q: ThreadLocal 为什么会发生内存泄漏?

A: 这个问题的根源在于ThreadLocalMap的特殊设计和线程池的生命周期。

  1. 弱引用作Key: ThreadLocalMap中的每个Entry,其Key(即ThreadLocal对象)是一个弱引用(WeakReference),而其Value是强引用
  2. Key被回收: 当外部不再有对ThreadLocal对象的强引用时(例如myThreadLocal = null),在下一次GC时,这个弱引用的Key就会被回收,导致ThreadLocalMap中出现Key为nullEntry
  3. Value无法回收: 尽管Key变成了null,但这个Entry中的Value仍然被Entry对象本身强引用着,而Entry又被ThreadLocalMap强引用,ThreadLocalMap又被线程对象强引用。
  4. 内存泄漏: 如果这个线程是一个长生命周期的线程(比如线程池中的核心线程),它会一直存活。那么这个Key为null、但Value不为nullEntry就会一直存在于线程的Map中,无法被GC回收,从而造成内存泄漏
避坑指南:如何正确使用 ThreadLocal

Q: 如何防止 ThreadLocal 内存泄漏?

A: 核心原则是:确保在线程结束前,手动清理掉 ThreadLocal 变量

最佳实践: 在使用完ThreadLocal后,务必在finally块中调用其remove()方法。

  • 正确使用范式:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void processBusiness() {
    // 获取一个数据库连接
    Connection conn = ConnectionManager.getConnection();
    // 将连接存入ThreadLocal
    threadLocalConnection.set(conn);
    try {
    // ... 执行业务逻辑,随时可以通过 threadLocalConnection.get() 获取连接
    } finally {
    // 最关键的一步:无论如何都要在finally中清理
    threadLocalConnection.remove();
    ConnectionManager.releaseConnection(conn);
    }
    }
  • 补充说明: 虽然ThreadLocalMap在调用get(), set(), remove()时,会“顺便”清理一些Key为nullEntry(所谓的“启发式清理”),但这并非100%可靠。我们绝不能依赖这种机制,必须养成手动调用remove()的习惯。

7.3 [核心] 线程间协作与通信

之前的章节,我们聚焦于如何通过锁等机制,防止多个线程在访问共享资源时互相干扰。这好比是为马路设置红绿灯,避免撞车。而本章我们将学习如何让线程之间互相通知、协调步调,共同完成一项任务。这就像是建立一套物流系统,让生产线和包装线能够无缝对接。

这个领域最经典、最基础的模型,就是“生产者-消费者”模型。

7.3.1 经典的“生产者-消费者”模型

想象一个场景:一个线程(生产者)负责生产数据(比如爬取网页、处理数据),另一个线程(消费者)负责处理这些数据(比如存入数据库、进行分析)。它们之间通过一个共享的缓冲区(比如一个队列)来传递数据。

  • 当缓冲区满了,生产者就应该停止生产,并等待消费者消费掉一些数据。
  • 当缓冲区空了,消费者就应该停止消费,并等待生产者生产出新的数据。

要实现这种精确的等待和唤醒,就需要用到Object类提供的三个基础方法:wait(), notify(), notifyAll()

传统线程通信方式:wait() / notify() / notifyAll()

这三个方法是Java底层线程通信的基石,定义在Object类中,意味着任何Java对象都可以充当“通信信箱”。

方法 (Method)核心作用 (Core Function)详细说明
wait()使当前线程进入等待状态,并立即释放它所持有的锁线程会进入该对象的“等待队列”(Wait Set),直到被notify/notifyAll唤醒或被中断。
notify()从“等待队列”中随机唤醒一个正在等待的线程。被唤醒的线程不会立即执行,而是进入“锁竞争队列”,只有重新获取到锁后才能继续执行。
notifyAll()唤醒“等待队列”中所有正在等待的线程。所有被唤醒的线程会一起去竞争锁。这是更推荐、更安全的方式,能有效避免“信号丢失”问题。

[核心规则] 这三个方法都必须在synchronized同步块中调用,并且synchronized锁定的对象必须与调用wait/notify的对象是同一个。否则会抛出IllegalMonitorStateException

代码示例

下面是一个使用wait/notifyAll实现的、固定容量的生产者-消费者模型的完整示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerTraditional {
private final Queue<String> buffer = new LinkedList<>();
private final int capacity;
private final Object lock = new Object(); // 我们的“通信信箱”

public ProducerConsumerTraditional(int capacity) {
this.capacity = capacity;
}

// 生产者
public void produce() throws InterruptedException {
int i = 0;
while (true) {
synchronized (lock) {
// [关键点1] 使用 while 循环判断条件
while (buffer.size() == capacity) {
System.out.println("缓冲区已满,生产者 " + Thread.currentThread().getName() + " 进入等待...");
lock.wait(); // 缓冲区满,释放锁并等待
}

String item = "Item-" + (i++);
buffer.add(item);
System.out.println("生产者 " + Thread.currentThread().getName() + " 生产了: " + item);

// [关键点2] 唤醒所有等待的线程(可能是消费者)
lock.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(500); // 模拟生产耗时
}
}

// 消费者
public void consume() throws InterruptedException {
while (true) {
synchronized (lock) {
while (buffer.isEmpty()) {
System.out.println("缓冲区为空,消费者 " + Thread.currentThread().getName() + " 进入等待...");
lock.wait(); // 缓冲区空,释放锁并等待
}

String item = buffer.poll();
System.out.println("消费者 " + Thread.currentThread().getName() + " 消费了: " + item);

lock.notifyAll(); // 唤醒所有等待的线程(可能是生产者)
}
TimeUnit.MILLISECONDS.sleep(1000); // 模拟消费耗时
}
}

public static void main(String[] args) {
ProducerConsumerTraditional pc = new ProducerConsumerTraditional(5);
// 创建一个生产者
new Thread(runnableWrapper(pc::produce), "P1").start();
// 创建两个消费者
new Thread(runnableWrapper(pc::consume), "C1").start();
new Thread(runnableWrapper(pc::consume), "C2").start();
}

// 辅助方法,用于将抛出受检异常的Lambda包装成Runnable
private static Runnable runnableWrapper(InterruptibleTask task) {
return () -> {
try {
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
}

@FunctionalInterface
interface InterruptibleTask {
void run() throws InterruptedException;
}
}

[高频][避坑指南] 为什么要用 while 循环判断条件而不是 if

这是使用wait/notify机制时最关键、最容易出错的地方。

Q: 在调用 wait() 之前,为什么必须用 while 循环检查条件,而不能用 if

A: 主要原因是为了防止 “虚假唤醒”

  1. 什么是虚假唤醒: JVM规范允许,一个线程在没有被任何notify/notifyAll调用的情况下,也可能从wait()状态中被“意外”唤醒。虽然这种情况非常罕见,但我们的代码必须具备防御能力。
  2. if 的隐患: 如果使用if (condition),当一个线程被虚假唤醒时,它会跳过if判断,直接执行后续代码,此时condition(如buffer.isEmpty())很可能仍然不满足,从而导致程序出错(比如从空队列中取元素)。
  3. while 的健壮性: 使用while (condition),当线程(无论是被正常唤醒还是虚假唤醒)醒来后,它会重新检查循环条件。如果条件仍然不满足,它会再次调用wait(),重新进入等待状态。这确保了只有在条件真正满足时,线程才会继续执行。

另一个重要原因:当有多个消费者时,notifyAll()会唤醒所有消费者。第一个抢到锁的消费者会消费掉数据。当其他消费者随后获得锁时,如果它们用的是if,就会直接尝试消费(但此时缓冲区已空),导致错误。而while会迫使它们重新检查缓冲区状态,发现已空,于是继续等待。

结论: while 循环是保证wait/notify模式正确性的“安全网”,必须严格遵守。


7.3.2 [进阶] Condition 接口

如果说ReentrantLocksynchronized的增强版,那么Condition就是Object.wait/notify的增强版。它与Lock紧密绑定,通常被称**Lock的黄金搭档**,提供了更精细、更强大的线程等待与唤醒控制能力。

核心概念与方法

Condition对象必须通过一个Lock对象来创建,调用lock.newCondition()即可。它的核心方法与Object的方法有着清晰的对应关系:

Object 方法Condition 对应方法核心作用
wait()await()使当前线程等待,并释放当前Condition关联的Lock
notify()signal()唤醒一个在当前Condition上等待的线程。
notifyAll()signalAll()唤醒所有在当前Condition上等待的线程。

关键区别Object的等待/通知机制是与对象监视器(锁)绑定的,一个锁只能有一个等待队列。而一个Lock对象可以创建多个Condition实例,每个Condition都拥有自己独立的等待队列。

核心优势:实现分组唤醒与精准通知

synchronized + notifyAll() 的一个痛点是,它无法区分等待的线程类型。在生产者-消费者模型中,当一个生产者生产完产品后调用notifyAll(),它会唤醒所有在等待的线程,这其中可能包含了其他生产者。这些被唤醒的生产者发现缓冲区依然是满的,只好再次进入等待,这造成了不必要的CPU开销和上下文切换。

Condition完美地解决了这个问题。我们可以为不同的条件创建不同的Condition对象:

  1. 创建一个notFull条件,专门给因缓冲区已满而等待的生产者使用。
  2. 创建一个notEmpty条件,专门给因缓冲区为空而等待的消费者使用。

这样,当消费者消费了一个产品后,它只需要调用notFull.signal()精准地唤醒一个生产者,而不会打扰到其他正在等待的消费者。反之亦然。

代码示例:使用 Condition 改造生产者-消费者模型

下面我们将使用ReentrantLock和两个Condition来重构之前的生产者-消费者代码,体验其精准通知的优雅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerWithCondition {
private final Queue<String> buffer = new LinkedList<>();
private final int capacity;

// 使用 ReentrantLock
private final Lock lock = new ReentrantLock();
// 创建两个 Condition,实现精准通知
private final Condition notFull = lock.newCondition(); // 生产者等待队列
private final Condition notEmpty = lock.newCondition(); // 消费者等待队列

public ProducerConsumerWithCondition(int capacity) {
this.capacity = capacity;
}

// 生产者
public void produce() throws InterruptedException {
int i = 0;
while (true) {
lock.lock(); // 获取锁
try {
while (buffer.size() == capacity) {
System.out.println("缓冲区已满,生产者 " + Thread.currentThread().getName() + " 在notFull上等待...");
notFull.await(); // 在 notFull 条件上等待
}

String item = "Item-" + (i++);
buffer.add(item);
System.out.println("生产者 " + Thread.currentThread().getName() + " 生产了: " + item);

// 精准唤醒:只唤醒在 notEmpty 条件上等待的消费者
notEmpty.signal();
} finally {
lock.unlock(); // 释放锁
}
TimeUnit.MILLISECONDS.sleep(500);
}
}

// 消费者
public void consume() throws InterruptedException {
while (true) {
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println("缓冲区为空,消费者 " + Thread.currentThread().getName() + " 在notEmpty上等待...");
notEmpty.await(); // 在 notEmpty 条件上等待
}

String item = buffer.poll();
System.out.println("消费者 " + Thread.currentThread().getName() + " 消费了: " + item);

// 精准唤醒:只唤醒在 notFull 条件上等待的生产者
notFull.signal();
} finally {
lock.unlock();
}
TimeUnit.MILLISECONDS.sleep(1000);
}
}

public static void main(String[] args) {
ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition(5);
// 创建两个生产者
new Thread(runnableWrapper(pc::produce), "P1").start();
new Thread(runnableWrapper(pc::produce), "P2").start();
// 创建两个消费者
new Thread(runnableWrapper(pc::consume), "C1").start();
new Thread(runnableWrapper(pc::consume), "C2").start();
}

// 辅助方法,用于将抛出受检异常的Lambda包装成Runnable
private static Runnable runnableWrapper(InterruptibleTask task) {
return () -> {
try {
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
}

@FunctionalInterface
interface InterruptibleTask {
void run() throws InterruptedException;
}
}

总结: Condition 将线程的等待队列从锁中分离出来,使得我们可以根据不同的业务条件对线程进行分组管理和精准唤醒,从而实现更高效、更复杂的线程协作逻辑。在需要精细控制线程通信的场景下,Lock + Condition 的组合是比synchronized + wait/notify更优越的选择。


7.3.3 [进阶] JUC 并发工具类

除了锁和条件变量,JUC还提供了一系列用于控制和同步线程的辅助类,我们称之为同步器(Synchronizers)。它们可以帮助我们轻松实现如“倒计时”、“循环栅栏”、“资源限流”等常见并发模式。

1. CountDownLatch (倒计时门闩)
  • 核心思想: CountDownLatch 允许一个或多个线程等待其他一组线程完成操作。它就像一个倒计时器,只有当计时器归零时,等待的线程才能继续执行。

  • 应用场景: 主任务需要等待所有前置的子任务(如初始化、数据加载)全部完成后,才能开始执行。

  • 核心方法:

    • CountDownLatch(int count): 构造函数,设置初始计数值。
    • countDown(): 将计数值减 1。通常由完成任务的子线程调用。
    • await(): 阻塞当前线程,直到计数值变为 0。通常由主线程调用。
  • 特性: 一次性使用,计数值归零后无法重置。

  • 代码示例: 模拟一场赛跑,裁判(主线程)必须等待所有选手(子线程)都准备就绪后才能鸣枪发令。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
    int runnerCount = 5;
    // 裁判需要等待5名选手准备好
    CountDownLatch readyLatch = new CountDownLatch(runnerCount);
    ExecutorService executor = Executors.newFixedThreadPool(runnerCount);

    System.out.println("裁判:各位选手请准备...");

    for (int i = 1; i <= runnerCount; i++) {
    final int runnerId = i;
    executor.execute(() -> {
    try {
    System.out.println("选手 " + runnerId + " 正在准备...");
    Thread.sleep((long) (Math.random() * 2000) + 1000);
    System.out.println("选手 " + runnerId + " 准备就绪!");
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    } finally {
    readyLatch.countDown(); // 准备好了,计数值减1
    }
    });
    }

    // 裁判等待所有选手准备就绪
    readyLatch.await();
    System.out.println("所有选手准备完毕,比赛开始!");
    executor.shutdown();
    }
    }
2. CyclicBarrier (循环屏障)
  • 核心思想: CyclicBarrier 让一组线程能够相互等待,直到所有线程都到达某个公共的屏障点(Rendezvous Point),然后才能一起继续执行。

  • 应用场景: 多线程分块处理数据,需要等待所有线程处理完当前阶段后,再统一进入下一阶段。

  • 核心方法:

    • CyclicBarrier(int parties, Runnable barrierAction): 构造函数,parties是需要等待的线程数,barrierAction是一个可选的回调任务,在所有线程到达屏障后、释放它们之前,由最后一个到达的线程执行。
    • await(): 线程调用此方法表示已到达屏障,并开始等待其他线程。
  • 特性: 可循环使用。当所有线程越过屏障后,屏障会自动重置,可用于下一轮等待。

  • 代码示例: 模拟公司团建,所有员工(线程)必须先集合到公司门口(屏障),然后大巴车(barrierAction)才能出发。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class CyclicBarrierExample {
    public static void main(String[] args) {
    int employeeCount = 4;
    // 创建一个需要4个线程到达的屏障,并在到达后执行一个任务
    CyclicBarrier barrier = new CyclicBarrier(employeeCount, () -> {
    System.out.println("所有人都到齐了,大巴车出发!");
    });
    ExecutorService executor = Executors.newFixedThreadPool(employeeCount);

    for (int i = 1; i <= employeeCount; i++) {
    final int empId = i;
    executor.execute(() -> {
    try {
    System.out.println("员工 " + empId + " 从家出发了...");
    Thread.sleep((long) (Math.random() * 3000) + 1000);
    System.out.println("员工 " + empId + " 到达公司门口,开始等待其他人...");
    barrier.await(); // 等待其他线程
    System.out.println("员工 " + empId + " 上车,一起出发!");
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    });
    }
    executor.shutdown();
    }
    }
3. Semaphore (信号量)
  • 核心思想: Semaphore 用于控制同时访问特定资源的线程数量。它内部维护了一组“许可证”。

  • 应用场景: 流量控制、限流,比如控制数据库连接池的最大连接数,或者限制同时下载文件的线程数。

  • 核心方法:

    • Semaphore(int permits): 构造函数,设置许可证的总数。
    • acquire(): 获取一个许可证,如果许可证已发完,则线程阻塞等待。
    • release(): 释放一个许可证,将其归还给信号量。
  • 特性: 必须保证release()被调用,通常放在finally块中。

  • 代码示例: 模拟一个只有3个停车位的停车场。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;

    public class SemaphoreExample {
    public static void main(String[] args) {
    // 停车场只有3个车位
    Semaphore parkingLot = new Semaphore(3);
    ExecutorService executor = Executors.newFixedThreadPool(10); // 10辆车

    for (int i = 1; i <= 10; i++) {
    final int carId = i;
    executor.execute(() -> {
    try {
    System.out.println("汽车 " + carId + " 到达停车场,试图停车。");
    parkingLot.acquire(); // 尝试获取一个许可证(车位)
    System.out.println("汽车 " + carId + " 成功停车!");
    Thread.sleep((long) (Math.random() * 4000) + 2000); // 停车时长
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println("汽车 " + carId + " 离开停车场。");
    parkingLot.release(); // 释放许可证(车位)
    }
    });
    }
    executor.shutdown();
    }
    }

[高频][面试题] CountDownLatch 与 CyclicBarrier 的区别
对比维度CountDownLatchCyclicBarrier
作用机制减法计数:一个或多个线程等待其他线程完成任务,计数值从 N->0。加法计数:一组固定数量的线程相互等待,直到所有线程都到达屏障点。
可重用性不可重用。计数值归零后,CountDownLatch就失效了。可循环使用。所有线程越过屏障后,屏障会自动重置,可用于下一轮。
使用场景一个主任务等待多个子任务完成(启动门)。多个子任务之间需要同步,分阶段执行(同步点)。
工作模式非对称:有await()的等待线程和countDown()的工作线程之分。对称:所有线程角色相同,都调用await()等待彼此。
回调函数无内置回调函数。可在构造时传入一个Runnable,在屏障触发时执行。

7.3.4 [面试题] 如何理解和避免死锁?

死锁是多线程环境下一个经典且危险的问题。它指的是两个或多个线程在执行过程中,因争夺资源而造成的一种互相等待的僵局。若无外力干涉,这些线程都将无法继续推进,导致程序“卡死”。

死锁的四个必要条件

一个死锁的发生,必须同时满足以下四个条件。只要破坏其中任意一个,就能有效预防死锁。

  1. 互斥条件
  • 一个资源在同一时刻只能被一个线程持有。如果其他线程请求该资源,则必须等待,直到资源被释放。这是产生死锁的根本前提。
  1. 请求与保持条件
  • 一个线程在已经持有了至少一个资源的情况下,又去请求其他线程正持有的资源,但因请求不到而阻塞,同时对自己已有的资源保持不放。
  1. 不可剥夺条件
  • 线程已获得的资源,在未使用完毕之前,不能被其他线程强行剥夺,只能由持有者自愿释放。
  1. 循环等待条件
  • 存在一个线程的资源等待环路。即线程T1等待线程T2的资源,线程T2等待线程T3的资源,…,而线程Tn又在等待线程T1的资源,形成一个闭环。
死锁的代码示例

下面是一个最经典的、由锁顺序不当导致的死锁场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DeadlockExample {
private static final Object lockA = new Object();
private static final Object lockB = new Object();

public static void main(String[] args) {
// 线程1:先锁A,再尝试锁B
new Thread(() -> {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " 持有 lockA,尝试获取 lockB...");
try { Thread.sleep(100); } catch (InterruptedException e) {}

synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " 成功获取 lockA 和 lockB。");
}
}
}, "线程1").start();

// 线程2:先锁B,再尝试锁A
new Thread(() -> {
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " 持有 lockB,尝试获取 lockA...");
try { Thread.sleep(100); } catch (InterruptedException e) {}

synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " 成功获取 lockA 和 lockB。");
}
}
}, "线程2").start();
}
}
  • 执行过程分析:
    1. 线程1获取lockA
    2. 与此同时,线程2获取lockB
    3. 线程1在持有lockA的情况下,尝试获取lockB,但lockB被线程2持有,于是线程1进入等待。
    4. 线程2在持有lockB的情况下,尝试获取lockA,但lockA被线程1持有,于是线程2也进入等待。
    5. 此时,线程1等待线程2释放lockB,线程2等待线程1释放lockA,形成循环等待,死锁发生。

如何排查死锁

如果线上应用发生了死锁,我们可以使用JDK自带的工具进行诊断。

  1. jps + jstack 命令 (推荐)

    • 第一步: 找到Java进程ID (PID)

      1
      jps -l

      该命令会列出所有Java进程及其PID。

    • 第二步: 分析线程堆栈

      1
      jstack -l <PID>

      jstack会打印出指定Java进程的全部线程堆栈信息。如果存在死锁,它会在输出的末尾明确指出,并打印出死锁涉及的线程、它们持有的锁以及正在等待的锁。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      Found one Java-level deadlock:
      =============================
      "线程2":
      waiting to lock monitor 0x00007f... (a java.lang.Object),
      which is held by "线程1"
      "线程1":
      waiting to lock monitor 0x00007f... (a java.lang.Object),
      which is held by "线程2"

      Java stack information for the threads listed above:
      ... (详细堆栈) ...
  2. 图形化工具 (JConsole, VisualVM)

    • 连接到目标Java进程后,这些工具的“线程”选项卡通常有“检测死锁”的功能,可以一键发现并可视化展示死锁信息。
如何预防死锁

预防死锁的核心思想是破坏其四个必要条件之一。在实际开发中,最常用、最有效的是破坏“循环等待”条件。

策略破坏的条件具体做法
按序申请锁循环等待(最推荐) 规定所有线程必须按照一个固定的、全局的顺序来申请锁。例如,规定必须先申请lockA再申请lockB
使用带超时的锁不可剥夺使用Lock接口的tryLock(timeout)方法。线程在尝试获取锁时,如果超过了指定时间还未成功,就主动放弃已经持有的锁,然后等待一会再重试。
一次性申请所有资源请求与保持在进入同步代码前,一次性地获取所有需要的锁。但这在很多复杂场景下难以实现。

最佳实践: 在绝大多数情况下,保证所有线程以相同的顺序获取锁,是预防死锁最简单、最有效的策略。


7.4 [实战] 线程池与异步编程

在前面的章节中,我们学习了如何创建和管理单个线程 (new Thread())。然而,在真实世界的服务端应用中,为每一个到来的请求或任务都创建一个新线程,是一种非常原始且危险的做法。因为线程的创建和销毁是重量级操作,会消耗大量的系统资源。如果请求量巨大,无限制地创建线程会迅速耗尽服务器资源,导致系统崩溃。

为了解决这个问题,线程池应运而生。它是现代并发编程的基石,也是管理线程的最佳实践。

7.4.1 [核心] 为什么要使用线程池?

简单来说,手动管理线程的方式既“昂贵”又“失控”。而线程池通过对线程的复用和统一管理,带来了三大核心优势。

1. 降低资源消耗
  • 核心思想: 复用已创建的线程,避免了频繁创建和销毁线程所带来的高昂开销。
  • 讲解:
    • 线程的创建和销毁涉及到与操作系统内核的交互、内存栈的分配与回收,这些都是成本很高的操作。
    • 线程池在启动时会预先创建一定数量的线程,当任务到来时,直接从池中取一个空闲线程来执行。任务结束后,线程并不会被销毁,而是归还给池子,等待下一个任务。
  • 比喻: 就像开一家餐厅,线程池相当于雇佣了一批固定的厨师团队(核心员工)。而不是每来一位客人就临时招聘一位厨师,客人吃完再将其解雇。
2. 提高响应速度
  • 核心思想: 消除了线程创建的延迟。
  • 讲解:
    • 当一个新任务到来时,如果需要临时创建一个新线程,这个过程是需要时间的。
    • 而使用线程池,任务可以直接交给一个处于等待状态的空闲线程执行,省去了创建线程的步骤,从而让任务能够更快地得到处理,提升了系统的响应能力。
  • 比喻: 当客人点餐后,待命的厨师可以立即开始炒菜,而无需等待HR完成招聘流程。
3. 提高线程的可管理性
  • 核心思想: 对线程进行统一的分配、调优和监控。
  • 讲解:
    • 如果任由代码随意创建线程,这些线程会散落在应用的各个角落,处于一种“失控”状态,难以管理。
    • 线程池作为一个中央管理器,为我们提供了强大的控制能力:
      • 控制并发数: 可以精确控制池中核心线程数和最大线程数,防止因线程过多而耗尽系统资源。
      • 统一监控: 可以方便地获取线程池的运行状态,如活动线程数、任务队列大小、已完成任务数等,便于监控和调优。
      • 统一管理: 可以统一设置线程的属性(如线程名、是否为守护线程),并能安全、平滑地关闭整个线程池。

结论: 在任何需要处理大量异步任务或并发请求的场景下,使用线程池都不仅仅是一种优化,而是一种必需。它是构建健壮、高性能并发系统的标准做法。接下来,我们将深入探索线程池的核心实现——ThreadPoolExecutor


7.4.2 [核心] ThreadPoolExecutor 详解

ThreadPoolExecutor是JUC线程池框架中最核心、最底层的实现类。我们日常使用的Executors工具类创建的各种线程池,其内部几乎都是ThreadPoolExecutor的实例。因此,要精通线程池,就必须从理解它的构造函数和核心参数开始。

[高频][面试题] 七大核心参数

ThreadPoolExecutor最常用的构造函数有七个参数,每一个都深刻影响着线程池的行为。理解它们的含义是面试的绝对高频考点。

参数 (Parameter)类型 (Type)核心作用
corePoolSizeint核心线程数。线程池中保持存活的线程数,即使它们是空闲的。
maximumPoolSizeint最大线程数。线程池能容纳的同时执行的线程最大数量。
keepAliveTimelong空闲线程存活时间。当线程数大于corePoolSize时,多余的空闲线程在被销毁前等待新任务的最长时间。
unitTimeUnitkeepAliveTime的时间单位(如秒、毫秒)。
workQueueBlockingQueue<Runnable>工作队列。用于存放等待执行的任务的阻塞队列。
threadFactoryThreadFactory线程工厂。用于创建新线程。可自定义线程名、是否为守护线程等。
handlerRejectedExecutionHandler拒绝策略。当队列和线程池都满了,无法处理新任务时所采取的策略。

线程池工作流程

当一个新任务通过execute()方法提交给ThreadPoolExecutor时,它会遵循以下决策路径:

image-20250713152622620

  1. 判断核心线程数:检查当前运行的线程数是否小于corePoolSize

    • :直接创建一个新的核心线程来执行该任务,即使池中有其他空闲线程。
    • :进入步骤2。
  2. 尝试加入工作队列:尝试将任务添加到workQueue中。

    • 成功:任务进入队列等待被空闲线程执行。
    • 失败(队列已满):进入步骤3。
  3. 判断最大线程数:检查当前运行的线程数是否小于maximumPoolSize

    • :创建一个新的非核心线程(也叫“救急线程”)来执行该任务。
    • :进入步骤4。
  4. 执行拒绝策略:当前线程数已达到最大值,且队列也已满。此时线程池已超负荷,必须通过指定的RejectedExecutionHandler来拒绝该任务。


工作队列 workQueue 的选择

workQueue的选择对线程池的行为有决定性影响:

  • ArrayBlockingQueue: 基于数组的有界阻塞队列。必须指定容量。当队列满了之后,会触发创建非核心线程,直到达到maximumPoolSize。有助于防止资源耗尽。
  • LinkedBlockingQueue: 基于链表的阻塞队列。如果构造时不指定容量,则默认为Integer.MAX_VALUE,相当于一个无界队列
    • 注意: 使用无界队列时,任务会一直被添加到队列中,导致maximumPoolSize参数失效,因为线程数永远不会超过corePoolSize。如果任务生产速度远超消费速度,可能导致内存溢出(OOM)。
  • SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等待一个相应的移除操作。它会直接将任务“递交”给一个线程。如果没有空闲线程,就会触发创建新线程(直到maximumPoolsize),因此适合处理大量、耗时短的瞬时任务。Executors.newCachedThreadPool()就使用了它。
  • PriorityBlockingQueue: 一个支持优先级排序的无界队列。任务会根据其优先级被执行。
workQueue 类型特点说明适用场景
ArrayBlockingQueue基于数组的有界阻塞队列,必须指定容量,队列满后会创建非核心线程需要控制资源、防止内存溢出的场景
LinkedBlockingQueue基于链表的阻塞队列,默认无界(Integer.MAX_VALUE),可能导致 OOM任务量可控、不关心线程数上限的场景
SynchronousQueue不存储元素,每个插入操作必须等待移除,直接将任务 “递交” 给线程处理大量短时任务、需要快速响应的场景
PriorityBlockingQueue支持优先级排序的无界队列,任务按优先级执行需要按优先级处理任务的场景
拒绝策略 RejectedExecutionHandler
拒绝策略类型特点说明适用场景
ThreadPoolExecutor.AbortPolicy (默认)直接抛出RejectedExecutionException异常,阻止系统正常工作。适用于任务非常重要,不允许丢失,且出现异常时希望立即得到通知的场景。
ThreadPoolExecutor.CallerRunsPolicy“调用者运行”策略。该任务不会被丢弃,也不会被线程池执行,而是由提交该任务的线程(调用execute的线程)自己来执行。这是一种有效的“反压”机制,可以减慢任务提交者的速度。当希望限制任务提交者的速率,防止提交过多任务导致线程池过载时使用。
ThreadPoolExecutor.DiscardPolicy直接静默地丢弃任务,不抛出任何异常。适用于任务不重要,即使丢失也不会对系统产生重大影响,并且希望最大化吞吐量的场景。
ThreadPoolExecutor.DiscardOldestPolicy丢弃工作队列队首的(最旧的)一个任务,然后重新尝试提交当前任务。适用于希望优先执行最新提交的任务,并且可以容忍丢失部分旧任务的场景。

7.4.3 [避坑指南] 为什么不推荐使用 Executors 工具类?

Executors工具类提供了一系列静态工厂方法,如newFixedThreadPool(), newCachedThreadPool()等,它们能够让我们用一行代码就创建一个线程池,看起来非常方便。然而,这种便利背后隐藏着巨大的风险,尤其是在生产环境中。

核心论点: 阿里巴巴《Java开发手册》等业界权威规范中,都强制要求开发者通过ThreadPoolExecutor的构造函数来创建线程池,而不是使用Executors

两大主要隐患:OOM(内存溢出)风险

Executors工厂方法创建的线程池,其内部参数配置存在“陷阱”,可能导致在特定场景下耗尽系统资源。

1. newFixedThreadPoolnewSingleThreadExecutor 的风险

  • 内部实现:

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • 问题根源: 它们都使用了一个无界的LinkedBlockingQueue(默认容量为Integer.MAX_VALUE,约21亿)。

  • 潜在风险: 如果任务的生产速度持续快于线程池的处理速度,任务就会在队列中无限堆积。最终,这将耗尽应用的所有堆内存,导致**OutOfMemoryError**,使整个应用崩溃。

2. newCachedThreadPoolnewScheduledThreadPool 的风险

  • 内部实现:
    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
  • 问题根源: 它们允许创建的线程数量上限为Integer.MAX_VALUE
  • 潜在风险: newCachedThreadPool的设计是来一个任务就创建一个新线程(如果没有空闲线程)。如果瞬间涌入大量请求,线程池就会尝试创建海量的线程。每个线程都需要消耗一定的栈内存(通常是1MB左右)。这会迅速耗尽JVM进程的可用内存,导致**OutOfMemoryError: unable to create new native thread**,同样会导致系统崩溃。
最佳实践:手动创建 ThreadPoolExecutor

规避上述风险的唯一可靠方法,就是放弃Executors的便利,回归到ThreadPoolExecutor的构造函数,手动指定每一个参数。

这样做的好处是**“所见即所得”**,强迫我们开发者对线程池的每一个行为细节进行深入思考和控制。

  • 代码示例:一个配置合理的线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    package com.example;

    import java.util.concurrent.*;

    public class ThreadPoolBestPractice {

    public static ExecutorService createMyThreadPool() {
    // 获取CPU核心数作为参考
    int corePoolSize = Runtime.getRuntime().availableProcessors();
    int maximumPoolSize = corePoolSize * 2;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;

    // 关键:使用有界队列来防止资源耗尽
    BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);

    // 建议:使用自定义的ThreadFactory,便于给线程命名,方便排查问题
    ThreadFactory threadFactory = r -> new Thread("my-pool-" + r.hashCode());

    // 关键:选择一个合适的拒绝策略来处理过载任务
    RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

    // 手动创建ThreadPoolExecutor
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    keepAliveTime,
    unit,
    workQueue,
    threadFactory,
    handler
    );

    System.out.println("自定义线程池创建成功!");
    return executor;
    }
    }

结论: Executors 工具类是学习和简单测试时的“好朋友”,但对于严肃的生产级应用,它却是“危险的敌人”。养成手动创建和配置ThreadPoolExecutor的习惯,是每一位专业Java程序员必备的素养。


7.4.4 [进阶] 带返回值的任务:Callable 与 Future

到目前为止,我们提交给线程池的任务都是Runnable,它的run()方法没有返回值。那么,如果我们需要异步执行一个任务,并获取其执行结果,应该怎么做呢?JUC为此提供了CallableFuture

Callable<V> 接口

Callable接口可以看作是Runnable的增强版,它弥补了Runnable的两个核心短板。

  • Runnable的局限:

    1. run()方法没有返回值,无法获取任务执行结果。
    2. run()方法不能抛出受检异常,异常处理很麻烦。
  • Callable的增强:
    Callable是一个泛型接口,其唯一的call()方法签名如下:

    1
    V call() throws Exception;
    1. 可以有返回值: 方法返回一个V类型的结果。
    2. 可以抛出异常: 允许在方法签名中声明抛出异常,使得异常处理更加直接。
Future<V> 接口

当你将一个Callable任务提交给线程池时,由于任务是异步执行的,你不可能立即拿到结果。线程池会立刻返回一个Future对象。

Future对象就像是一张“提货单”或者一个“承诺”,它代表了未来某个时刻将会完成的任务的结果。我们可以通过这张“提货单”在未来的任意时刻去查询任务状态或提取最终结果。

  • Future 的核心方法:
方法 (Method)核心作用行为说明
V get()获取异步任务的执行结果。阻塞式。如果任务尚未完成,调用此方法的线程会一直阻塞,直到拿到结果。
V get(long timeout, TimeUnit unit)带超时地获取结果。在指定时间内阻塞等待。如果超时任务仍未完成,会抛出TimeoutException
boolean isDone()判断任务是否已完成非阻塞。可以用来轮询任务状态,避免因调用get()而无限期阻塞。
boolean cancel(boolean mayInterrupt)尝试取消任务。如果任务已完成或已被取消,则失败。否则成功取消。mayInterrupt参数表示是否要中断正在执行该任务的线程。
boolean isCancelled()判断任务是否已被取消非阻塞
三者关系与代码示例

ExecutorServicesubmit()方法可以将一个Callable任务提交到线程池,并返回一个Future对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.*;

public class CallableFutureExample {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 创建一个线程池
ExecutorService executor = Executors.newSingleThreadExecutor();

// 2. 创建一个Callable任务,该任务会计算1到100的和
Callable<Integer> myTask = () -> {
System.out.println("子线程 " + Thread.currentThread().getName() + " 开始计算...");
TimeUnit.SECONDS.sleep(2); // 模拟耗时计算
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
};

// 3. 提交Callable任务,获取Future对象(“提货单”)
System.out.println("主线程:提交任务给线程池。");
Future<Integer> future = executor.submit(myTask);

// 4. 主线程可以继续做其他事情...
System.out.println("主线程:我先去忙别的了...");
TimeUnit.SECONDS.sleep(1);
System.out.println("主线程:忙完了,现在来看看子线程的结果。");

// 5. 通过Future.get()获取结果(如果任务没执行完,这里会阻塞)
// ExecutionException 会包装Callable中抛出的原始异常
Integer result = future.get();
System.out.println("主线程:拿到任务执行结果 -> " + result);

// 6. 关闭线程池
executor.shutdown();
}
}
  • 关于FutureTask: 这是一个很巧妙的类,它同时实现了RunnableFuture接口。因此,你可以将一个Callable包装成FutureTask,然后像Runnable一样提交给线程池,同时这个FutureTask对象本身就可以用来获取结果。

Future 的局限性

尽管FutureCallable解决了有返回值和异常处理的问题,但它自身的设计也存在明显的局限性,这也是后来CompletableFuture诞生的原因。

  1. 阻塞式获取结果: Future的主要缺点是它的get()方法是阻塞的。这使得异步编程的优势大打折扣。虽然我们的任务是异步执行的,但为了获取结果,主线程往往还是得停下来等待,整个流程又变成了“同步”模式。

  2. 缺乏完成回调: 我们无法为Future任务的完成注册一个回调函数。也就是说,我们不能方便地实现“当任务完成后,自动执行下一个动作”这样的逻辑。我们只能通过isDone()轮询或者get()阻塞的方式来被动地等待任务完成。

  3. 组合能力弱: 对于多个Future任务,我们很难实现复杂的组合。比如,“等待两个Future都完成后,将它们的结果合并处理”,或者“等待多个Future中任意一个完成后就继续”等场景,Future接口本身并未提供优雅的支持。

这些局限性,促使了Java 8中更强大的现代异步编程工具——CompletableFuture的诞生。


7.4.5 [Java 8+] 现代异步编程:CompletableFuture

CompletableFuture (CF) 是对Future的革命性增强。如果说Future仅仅是一个异步结果的被动容器,那么CompletableFuture则是一个功能完备、可主动编排的异步任务“装配线”。

核心优势:彻底摆脱 Future.get() 的阻塞

CompletableFuture的核心设计理念,就是用一种非阻塞、事件驱动的方式来处理异步结果。

  1. 非阻塞 (Non-blocking): 它通过注册回调函数的方式,让你可以在任务完成时自动执行后续操作,而无需主线程傻傻地阻塞等待。
  2. 链式调用 (Fluent API): 它的API设计得像Stream一样,可以进行优雅的链式调用(.thenApply().thenAccept()...),将复杂的异步处理流程清晰地串联起来。
  3. 组合能力 (Combinable): 提供了强大的方法来组合多个异步任务,轻松实现“AND”、“OR”以及串行依赖等复杂的业务逻辑。
  4. 完善的异常处理: 提供了exceptionallyhandle等方法,可以优雅地在异步链中捕获和处理异常。

核心用法

1. 创建异步任务

创建CompletableFuture通常使用两个静态方法,它们默认使用全局的ForkJoinPool.commonPool()来执行任务。为了避免业务任务与框架任务(如并行流)互相干扰,强烈建议为其指定自定义的业务线程池

  • supplyAsync(Supplier<U> supplier, Executor executor): 用于执行有返回值的任务。
  • runAsync(Runnable runnable, Executor executor): 用于执行无返回值的任务。
1
2
3
4
5
6
7
8
// 建议:为你的业务创建一个专用的线程池
ExecutorService myBizExecutor = Executors.newFixedThreadPool(10);

// 创建一个有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// ... 执行耗时操作
return "Hello, CompletableFuture";
}, myBizExecutor);

2. 结果处理与转换(then...系列)

这是CompletableFuture最强大的地方,它提供了一系列方法来注册任务完成后的回调。

方法作用参数类型返回类型
thenApply(Function fn)转换结果。接收上一步结果,处理后返回新结果Function<T, U>CompletableFuture<U>
thenAccept(Consumer action)消费结果。接收上一步结果,执行操作,无返回值Consumer<T>CompletableFuture<Void>
thenRun(Runnable action)执行动作。不关心上一步结果,仅在上一步完成后执行一个RunnableRunnableCompletableFuture<Void>
  • 代码示例:

    1
    2
    3
    4
    5
    6
    CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> "123") // 1. 异步提供字符串"123"
    .thenApply(Integer::parseInt) // 2. 将字符串结果转换为Integer
    .thenApply(i -> i * 10); // 3. 将Integer结果乘以10

    // thenAccept消费最终结果
    cf.thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: 最终结果: 1230
  • 关于*Async变体: 上述每个方法几乎都有一个对应的*Async版本(如thenApplyAsync)。它们的区别在于,*Async版本可以让你指定一个线程池来执行当前的回调任务,从而实现更精细的线程调度。如果不指定,则默认使用ForkJoinPool.commonPool()

3. 组合多个任务

  • AND 关系 - thenCombine: 当两个CompletableFuture完成时,将它们的结果合并处理。

    1
    2
    3
    4
    5
    6
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "World");

    // 合并cf1和cf2的结果
    CompletableFuture<String> combined = cf1.thenCombine(cf2, (res1, res2) -> res1 + ", " + res2);
    System.out.println(combined.get()); // 输出: Hello, World
  • OR 关系 - applyToEither / acceptEither: 当两个CompletableFuture任意一个完成时,就使用它的结果进行下一步操作。

  • 等待全部 - allOf: 等待所有给定的CompletableFuture都执行完毕。返回CompletableFuture<Void>

  • 等待任一 - anyOf: 等待任意一个CompletableFuture执行完毕。返回CompletableFuture<Object>

实战场景:服务编排

CompletableFuture在微服务架构中的服务编排场景下大放异彩。假设为了展示一个商品详情页,你需要并行调用多个下游服务:

  1. 获取商品基本信息
  2. 获取商品价格信息
  3. 获取商品库存信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example;

import java.util.concurrent.*;

import static com.example.Main.ServiceOrchestration.*;

public class Main {

static class ServiceOrchestration {
// 模拟调用下游服务
static String getProductInfo(long id) { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {} return "商品信息"; }
static String getPriceInfo(long id) { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) {} return "价格信息"; }
static String getStockInfo(long id) { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) {} return "库存信息"; }


}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
long productId = 1L;
System.out.println("开始并行获取商品详情...");
long startTime = System.currentTimeMillis();
// 1. 并行发起三个异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> getProductInfo(productId), executor);
CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> getPriceInfo(productId), executor);
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> getStockInfo(productId), executor);
CompletableFuture.allOf(productFuture, priceFuture, stockFuture).join(); // join()会阻塞等待,类似get()但抛出非受检异常
// 3. 组合结果
String result = String.format("【%s】\n【%s】\n【%s】",
productFuture.get(), priceFuture.get(), stockFuture.get());

System.out.println("成功获取所有信息:\n" + result);
System.out.println("总耗时: " + (System.currentTimeMillis() - startTime) + " ms"); // 耗时约等于最长的那个任务(2秒)

executor.shutdown();

}
}

结论: CompletableFuture 通过其强大的编排能力,将我们从Future.get()的阻塞泥潭中解放出来,使得编写高效、清晰、非阻塞的异步代码成为可能,是Java 8以后处理复杂异步场景的首选工具。


7.5 [Java 21+] 并发革命:虚拟线程 (Virtual Threads)

在深入学习虚拟线程的具体用法之前,我们必须首先回答一个根本性问题:长久以来,我们已经拥有了稳定可靠的线程池和功能强大的CompletableFuture,Java为何还要在21世纪20年代投入巨大精力,去创造一种全新的线程模型?

答案在于,我们过去所有的并发方案,都是在为一个“先天不足”的旧模型“打补丁”。而虚拟线程,则是要从根源上推翻这个旧模型。

7.5.1 为什么需要虚拟线程?平台线程的黄昏

[核心痛点] 平台线程的“重量级”本质

自Java诞生以来,我们所使用的java.lang.Thread实例,都被称为平台线程(Platform Threads)。它们的核心特点,也是其最大的局限,在于:

  1. 与操作系统线程的1:1映射

    • 每一个Java平台线程,都与一个操作系统(OS)内核线程一一对应。Java线程只是操作系统线程的一个薄层封装。这意味着,我们能创建的线程数量,直接受限于操作系统的能力。
  2. 高昂的资源成本

    • 内存成本: 操作系统为每个线程都预留了一大块独立的栈内存(在64位Linux上通常是1MB)。无论你的线程实际用不用得到,这1MB的空间都会被保留。因此,仅仅创建几千个线程,就可能消耗掉数GB的内存。
    • CPU成本: 线程的调度和上下文切换完全由操作系统内核负责。当线程数量增多时,CPU需要花费大量时间在不同线程之间切换状态,而不是执行真正的业务逻辑。这种开销是巨大的。

一个贴切的比喻:平台线程就像是重量级的洲际导弹发射井。它威力强大,但建造和维护成本极高,占地面积巨大,而且一个国家(操作系统)能拥有的数量也极其有限。

[历史包袱] I/O密集型应用在传统模型下的困境

现代大多数后端应用,本质上都是I/O密集型(I/O-intensive)的。这意味着程序的大部分时间不是在进行CPU计算,而是在等待:等待网络数据、等待数据库返回结果、等待文件读写完成。

在这样的背景下,最简单、最符合人类思维的编程模型是“一个请求一个线程(Thread-Per-Request)”:

1
2
3
4
5
6
7
8
9
10
11
// 理想中简单直观的代码
public void handleRequest(Request request) {
// 1. 调用用户服务(阻塞等待I/O)
UserInfo userInfo = userClient.getUser(request.getUserId());

// 2. 调用商品服务(阻塞等待I/O)
ProductInfo productInfo = productClient.getProduct(request.getProductId());

// 3. 渲染页面
renderPage(userInfo, productInfo);
}

这段代码清晰地反映了业务流程。但它有一个致命问题:在处理请求的过程中,线程会因为等待I/O而长时间阻塞

当这种简单模型遇上“昂贵”的平台线程,冲突便产生了。 如果有一万个用户同时请求,我们就需要一万个平台线程。按照我们之前的分析,这将瞬间耗尽服务器的所有资源,导致系统崩溃。

为了解决这个冲突,Java社区被迫走向了复杂的异步编程。我们使用CompletableFuture等工具,通过回调函数将业务逻辑拆得支离破碎,目的只有一个:决不能阻塞宝贵的平台线程

1
2
3
4
5
6
7
8
9
10
// 为了性能而变得复杂的代码
public CompletableFuture<Void> handleRequestAsync(Request request) {
CompletableFuture<UserInfo> userFuture = fetchUser(request.getUserId());
CompletableFuture<ProductInfo> productFuture = fetchProduct(request.getProductId());

return CompletableFuture.allOf(userFuture, productFuture)
.thenAccept(v -> {
renderPage(userFuture.join(), productFuture.join());
});
}

我们用性能换来了代码的复杂、调试的困难和逻辑的晦涩。整个生态都陷入了一种两难的境地:要么选择简单直观但无法扩展的代码,要么选择性能卓越但极其复杂的代码。

平台线程的时代,已经走到了它的黄昏。 我们需要一种新的模式,来终结这种痛苦的权衡。这就是虚拟线程诞生的历史使命:让我们能以同步的方式写代码,同时获得异步的性能。


7.5.2 虚拟线程的核心理念:M:N调度与协作

虚拟线程的解决方案,并非对平台线程进行修补,而是引入了一套全新的、从根本上不同的运作模式。

[革命性设计] 轻量级本质

虚拟线程的第一个颠覆性设计,就是它的“轻量”。

  • JVM管理的对象: 与平台线程不同,虚拟线程不再是操作系统线程的直接映射。它是一个纯粹由JVM在内部管理和调度的Java对象,存活于Java堆内存之上。
  • 极低的内存占用: 平台线程需要预先分配一个巨大的栈(Stack),而虚拟线程的栈则非常小(仅几百字节),并且可以根据需要动态增长和收缩。

这带来了什么?

因为创建和销毁一个虚拟线程的成本极低,几乎和创建一个普通Java对象相当,我们终于可以毫不吝啬地创建海量的虚拟线程——几万、几十万、甚至数百万个,而无需担心耗尽系统内存。

新的比喻:如果说平台线程是洲际导弹发射井,那么虚拟线程就是轻便的无人机。我们可以从一艘航空母舰(JVM)上,轻松地同时起飞成千上万架无人机(虚拟线程),让它们去执行各自的任务。

[底层揭秘] M:N调度模型

虚拟线程打破了与操作系统线程1:1的强绑定关系,采用了一种更为高效的M:N调度模型

  • M: 代表我们应用中创建的大量(M个)虚拟线程。
  • N: 代表JVM使用的一小部分(N个)平台线程,这些平台线程被称为载体线程(Carrier Threads)

JVM内置了一个调度器(默认是ForkJoinPool),它的工作就是将这M个虚拟线程,轮流“骑”到N个载体线程上去执行。通常,载体线程的数量N默认等于CPU的核心数。

[关键机制] 协作式调度:不阻塞载体线程

M:N调度只是基础,虚拟线程真正的魔法在于其协作式调度机制,它保证了宝贵的载体线程永不被无效阻塞。

让我们来看一下当一个虚拟线程遇到I/O阻塞时,发生了什么:

  1. 虚拟线程VT1正在载体线程PT1上运行。
  2. VT1的代码执行到一个阻塞I/O操作,比如socket.read()
  3. 魔法开始: JDK的网络库函数会通知JVM调度器:“VT1要开始等待数据了”。
  4. JVM调度器立即做出反应:
    • 卸载 (Unmount): 将VT1从载体线程PT1上“卸下”,并将其状态(主要是它那小小的栈)保存到堆内存中。
    • 换上 (Mount): 马上从等待执行的虚拟线程队列中,取出另一个就绪的虚拟线程VT2,“装载”到PT1上,让PT1继续执行VT2的任务。
  5. I/O完成: 当VT1等待的网络数据到达后,网络设备会通知操作系统,操作系统再通知JVM。
  6. VT1的状态从“等待”变回“就绪”,JVM调度器会在未来的某个时刻,当有空闲的载体线程时,再将它“装载”回去,从它上次中断的地方继续执行。

整个过程中,载体线程PT1几乎一刻也没有闲着。 它没有因为VT1的I/O等待而被阻塞,而是马上去服务其他虚拟线程了。

[核心优势总结] “同步的写法,异步的性能”

通过这套行云流水的“卸载-装回”机制,虚拟线程实现了惊人的效果:

开发者可以用最简单、最直观的同步阻塞方式来编写业务代码,而JVM则在底层自动将其转换为非阻塞的高性能执行模式。

我们不再需要在CompletableFuture的回调地狱中挣扎,也不再需要为线程池的配置而绞尽脑汁。我们可以重新回到那个清晰的“一个请求一个线程”模型,因为现在,我们拥有了几乎无限且廉价的线程供应。这就是虚拟线程为Java并发编程带来的解放。


7.5.3 API详解:创建与管理虚拟线程

尽管虚拟线程的内部机制非常复杂,但Java的设计者们为其配备了一套极其简洁和现代的API,旨在鼓励开发者采纳“一个任务一个线程”的新模式。

[基础用法] Thread.ofVirtual()

Java 21引入了一个新的Thread.Builder接口,用于以一种流畅的、链式调用的方式创建线程。Thread.ofVirtual()就是获取一个虚拟线程的构建器。

这是对传统new Thread(...)构造函数的现代替代方案。

  • 代码示例1:直接创建并启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    package com.example;

    public class VirtualThreadCreation {
    public static void main(String[] args) throws InterruptedException {
    // 直接调用 Thread.ofVirtual() 方法获取 Builder 实现,并链式调用 start
    Thread vt = Thread.ofVirtual().start(() -> {
    System.out.println("Hello from a virtual thread!");
    System.out.println("当前线程信息: " + Thread.currentThread());
    });

    vt.join(); // 同样可以 join
    }
    }

    输出会清晰地显示线程类型为VirtualThread

  • 代码示例2:使用构建器进行配置
    Thread.Builder允许我们在创建线程时设置名称等属性。

    1
    2
    package com.example;

public class VirtualThreadCreation {
public static void main(String[] args) throws InterruptedException {
// 使用构建器来创建具有特定名称模式的线程
Thread.Builder builder = Thread.ofVirtual().name(“my-worker-”, 0); // 名称会自动递增,如my-worker-0, my-worker-1…

    Runnable task = () -> System.out.println("执行任务的线程: " + Thread.currentThread().getName());

        Thread t1 = builder.start(task); // 创建并启动名为 my-worker-0 的虚拟线程
        Thread t2 = builder.start(task); // 创建并启动名为 my-worker-1 的虚拟线程

        t1.join();
        t2.join();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

##### **[推荐实践] `Executors.newVirtualThreadPerTaskExecutor()`**

虽然我们可以手动创建单个虚拟线程,但在处理大量并发任务时,使用`ExecutorService`仍然是更好的选择,因为它提供了更完善的生命周期管理。

为此,Java 21提供了一个全新的工厂方法:`Executors.newVirtualThreadPerTaskExecutor()`。

* **核心行为**:

* 这个方法创建的`ExecutorService`**不会复用任何线程**。
* 它为提交给它的**每一个任务**,都创建一个**全新的虚拟线程**来执行。

* **为什么这是推荐的方式?**

* 因为虚拟线程的创建成本极低,所以“池化”它们已经没有意义。为每个任务创建一个新的、干净的线程,反而简化了上下文管理,避免了使用`ThreadLocal`时可能产生的复杂问题。
* 这个`ExecutorService`封装了虚拟线程的创建细节,并提供了统一的任务提交和关闭接口,非常方便。

* **代码示例:处理批量I/O任务**
使用`try-with-resources`语法可以确保`ExecutorService`被自动、安全地关闭,这是处理短生命周期任务时的最佳实践。

```java
package com.example;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadExecutorExample {
public static void main(String[] args) {
// 使用try-with-resources确保executor被自动关闭
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {

// 提交10个模拟的I/O密集型任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("开始执行任务 #" + taskId + " in " + Thread.currentThread());
try {
// 模拟网络请求或数据库查询等I/O阻塞
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 #" + taskId + " 执行完毕。");
});
}

} // 在这里,executor.close()会被自动调用,它会等待所有已提交的任务完成

System.out.println("所有任务已提交,主线程退出。");
}
}

总结: Executors.newVirtualThreadPerTaskExecutor()是我们在虚拟线程时代处理并发任务的“首选武器”。它让我们彻底告别了对传统线程池的复杂配置(核心数、最大数、队列容量、拒绝策略等),让并发编程回归到只关注业务逻辑本身的简单与纯粹。


7.5.4 最佳实践与适用场景分析

虚拟线程虽然强大,但它并非解决所有并发问题的“银弹”。理解其适用边界,并根据任务特性做出正确的选择,是衡量一个高级Java工程师能力的重要标准。

[场景辨析] I/O密集型 vs. CPU密集型

这是决定是否使用虚拟线程的黄金法则

  1. **I/O密集型: **虚拟线程的主场

    • 定义: 任务的大部分时间都在等待I/O操作完成,CPU处于空闲状态。
    • 示例:
      • Web服务器处理HTTP请求。
      • 调用RPC、微服务接口。
      • 访问数据库、Redis缓存。
      • 读写文件、操作消息队列。
    • 为何适用: 在这些场景下,虚拟线程的协作式调度机制能发挥最大威力。当一个虚拟线程等待I/O时,它会让出宝贵的平台载体线程给其他任务使用,从而让CPU得到充分利用,系统可以用极少的平台线程支撑起海量的并发连接。
  2. CPU密集型: 平台线程池的阵地

    • 定义: 任务需要持续占用CPU进行大量的数学或逻辑运算,几乎没有等待时间。
    • 示例:
      • 复杂的科学计算、数据分析。
      • 视频编码、图像渲染。
      • 大规模的数据排序、加密解密。
    • 为何不适用: 对于CPU密集型任务,程序的瓶颈在于CPU核心的数量,而不是线程数量。即使你创建一百万个虚拟线程去执行计算,它们最终还是要抢占仅有的那几个(比如8个或16个)CPU核心。在这种情况下,虚拟线程的调度优势无法体现,反而可能因为额外的调度开销而略微降低性能。
    • 最佳实践: 对于CPU密集型任务,仍然应该使用一个大小与CPU核心数相当的平台线程池 (Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()))。这能确保每个CPU核心都有一个线程在满负荷工作,同时避免了过多的线程上下文切换开销。
[模式回归] “一个任务一个线程”(Thread-Per-Task)模型

在虚拟线程时代,我们应该重新拥抱这个最古老、最直观的并发模型。

  • 旧时代的困境: 由于平台线程的昂贵,我们不得不放弃这个模型,转而使用复杂的线程池和异步回调。
  • 新时代的回归: 虚拟线程的廉价特性,使得为每一个到来的请求或任务都创建一个新的虚拟线程,成为了最高效、最简单的选择。

编码范式转变:

  • 过去: 我们需要精心管理一个线程池,任务来了先提交到队列,然后由池中有限的线程去处理。
  • 现在 (对于I/O密集型任务): 直接使用 Executors.newVirtualThreadPerTaskExecutor(),或者手动 Thread.ofVirtual().start()。不再需要池化,不再需要队列,代码逻辑回归到线性的、同步的、易于理解和调试的状态。
[性能考量] 虚拟线程并非银弹
  1. 它不能让单个操作变快: 虚拟线程解决的是吞吐量的问题,而不是延迟的问题。它能让你的服务器同时处理更多的请求,但不会让单个数据库查询变得更快。
  2. 它不等于无限的CPU: 虚拟线程的魔力在于“等待”时让出CPU。如果你的所有任务都在疯狂计算,那么系统的瓶颈依然是CPU本身。
  3. 它依赖于现代化的JDK库: 虚拟线程的协作式调度能力,依赖于JDK内部的I/O、网络和并发库都已为虚拟线程进行了适配。如果你使用了某些旧的、未适配的第三方库或者自己编写的JNI本地方法,它们在阻塞时可能仍然会“钉住”平台线程。

结论: 在开始一个新项目或重构旧项目时,首先要做的就是分析其核心任务的类型。如果是典型的Web应用或微服务,那么大胆地采用虚拟线程将极大地简化你的代码并提升系统吞吐能力。如果它是一个计算中心或数据处理引擎,那么传统的平台线程池依然是你的不二之选。


7.5.5 [高频][避坑指南] 虚拟线程的陷阱与注意事项

虚拟线程虽然极大地简化了并发编程,但它并非没有“脾气”。如果不了解其内在的一些限制,我们可能会在不经意间写出性能不佳甚至有问题的代码。

[头号陷阱] synchronized 的“钉住”问题

这是使用虚拟线程时最重要、最需要警惕的一个问题。

  • 什么是“钉住”?
    “钉住”是指,当一个虚拟线程在执行某些特定代码时,它会被强制性地“钉”在它的载体平台线程上。在此期间,JVM调度器无法将这个虚拟线程从其载体上卸载,即使它遇到了I/O阻塞。

  • synchronized 为何会导致钉住?
    synchronized关键字的实现,在底层与操作系统线程的某些原生数据结构(如监视器Monitor)紧密相关。当一个虚拟线程进入synchronized代码块时,为了维持锁的正确性,JVM无法安全地将其与底层的平台线程分离。

  • 灾难性后果
    如果在synchronized同步块内部执行了一个阻塞I/O操作,灾难就发生了:

    1. 虚拟线程被“钉住”在载体线程上。
    2. 虚拟线程开始等待I/O,进入阻塞状态。
    3. 由于无法被卸载,其身下的载体平台线程也随之被阻塞。这完全违背了虚拟线程设计的初衷!一个宝贵的平台线程就此被无效占用,整个系统的高吞吐能力会因此受到严重影响。如果多个虚拟线程都发生这种情况,会迅速耗尽载体线程池,使虚拟线程的优势荡然无存。
  • 代码示例(错误示范):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 错误实践:在虚拟线程中,于synchronized块内执行阻塞I/O
    public class PinningProblem {
    private final Object lock = new Object();

    public void badBlockingOperation() throws IOException {
    synchronized (lock) {
    // 当前虚拟线程已被“钉住”到其载体平台线程上
    System.out.println(Thread.currentThread() + " 进入同步块,即将阻塞...");

    // 这个阻塞的I/O操作,将会阻塞宝贵的平台线程!
    System.in.read();

    System.out.println(Thread.currentThread() + " 阻塞结束,退出同步块。");
    }
    }
    }
  • [解决方案] 优先使用 JUC Lock
    在虚拟线程环境中,应始终优先使用 java.util.concurrent.locks.ReentrantLock 来替代 synchronized 关键字。
    ReentrantLock及其它JUC包下的锁,是纯Java实现的,它们被设计为“虚拟线程友好”的,在加锁和解锁时会与JVM调度器协作,不会导致“钉住”问题。

[其他要点] 其他注意事项
  1. 谨慎使用 ThreadLocal

    • 考量: 虚拟线程的生命周期通常很短(一个任务的长度)。虽然这天然地帮助我们避免了传统线程池中因线程复用而导致的ThreadLocal内存泄漏,但如果我们在极短时间内创建了数百万个虚拟线程,并且每个线程都关联了一个ThreadLocal变量,那么这些变量累积起来可能会对GC造成一定的压力。
    • 建议: 保持ThreadLocal中存放的对象尽可能小。在新代码中,优先考虑通过方法参数传递上下文,而不是依赖ThreadLocal
  2. 警惕本地方法调用

  • 如果虚拟线程调用了一个本地方法(JNI),而这个本地方法内部执行了阻塞操作,那么它同样会“钉住”载体线程。因为JVM无法看到C/C++等本地代码的内部,无法对其进行协作式调度。
    * 建议: 在与包含JNI的库交互时,需格外小心,并查阅其文档是否对虚拟线程兼容。
  1. 不要池化虚拟线程

    • 这是一个反模式。反复强调:虚拟线程被设计为用后即焚的,它们非常廉价,无需池化。
    • 试图创建一个“虚拟线程池”来复用虚拟线程,是完全没有必要的,反而会增加代码的复杂性。直接使用Executors.newVirtualThreadPerTaskExecutor(),享受“一个任务一个新线程”的简单与高效。

本章总结: 虚拟线程是Java并发迈向未来的关键一步,它极大地简化了I/O密集型应用的开发。要用好它,核心在于两点:

明确其适用场景(I/O密集型),并坚决避免“钉住”陷阱(优先使用ReentrantLock替代synchronized

掌握了这些,你就能在新的并发时代中游刃有余。


7.6 [工具] 使用 Hutool 简化并发编程

在掌握了JUC的底层原理和复杂API之后,我们常常会思考一个问题:在日常开发中,有没有更便捷、更不易出错的方式来使用这些强大的并发工具?答案是肯定的。

Hutool是一个“瑞士军刀”般的Java工具库,它极大地简化了Java的各种常用操作。其并发模块(包含在hutool-core中)正是为了解决原生JUC API“繁琐”和“易错”的痛点而设计的。

7.6.1 Hutool 并发工具概述与设计哲学

设计哲学

Hutool的并发工具,其设计初衷可以归结为以下几点:

  1. 极致简化: 将原生API(如ThreadPoolExecutor的长构造函数)进行高度封装,提供一目了然、一行调用的静态方法或链式构建器。
  2. 内建最佳实践: Hutool的工具在设计上就遵循了业界推荐的最佳实践。例如,它创建的线程池默认使用有界队列,从源头上帮助开发者避免了使用原生Executors时可能遇到的OOM风险。
  3. 提升代码可读性与开发效率: 大量使用链式调用(Fluent API),让代码逻辑像自然语言一样流畅;将繁琐的try-catch等样板代码内置,让开发者能更专注于业务本身。
核心组件概览

本章我们将重点介绍Hutool并发包中的几个核心利器:

  • ThreadUtil: 线程相关的静态工具类。它提供了大量便捷方法,如快速在线程池中执行任务、安全地休眠线程、创建线程池等。
  • ExecutorBuilder: 线程池构建器。这是对ThreadPoolExecutor构造函数最优雅的封装,通过链式调用的方式,让配置一个参数复杂的线程池变得清晰易读。
  • AsyncUtil: 异步工具类。提供了一些简化CompletableFuture等异步操作的静态方法。
  • ConcurrencyTester: 并发测试器。一个非常实用的小工具,可以用极简的代码模拟高并发场景,用于快速验证代码的线程安全性或进行简单的性能基准测试。
引入Hutool依赖

要在项目中使用这些功能,首先需要添加Hutool的Maven依赖。对于并发工具,我们通常只需要引入其核心包hutool-core即可。

1
2
3
4
5
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.27</version>
</dependency>

接下来,我们将从ThreadUtilExecutorBuilder开始,逐一学习如何运用这些工具来“降维打击”日常的并发编程任务。


7.6.2 线程与线程池的快捷方式

1. ThreadUtil - 线程操作万能工具箱

ThreadUtil是Hutool中用于线程相关操作的静态工具类,它将大量原本需要繁琐API调用和异常处理的逻辑,封装成了一行行的便捷方法。

  • 常用方法速查表
方法名功能描述
execute(Runnable task)**(主力)**快速将任务提交到全局共享线程池,适合临时的、非核心的异步任务。
sleep(long millis)以非受检异常方式封装Thread.sleep(),代码更简洁,无需显式try-catch
newExecutor(...)快速创建线程池,提供了比原生Executors更安全的默认值。
newNamedThreadFactory(...)**(推荐)**创建带自定义名称前缀的线程工厂,极大方便问题排查。
getStackTrace(Thread t)获取指定线程的堆栈信息,可用于日志记录和问题分析。
ThreadUtil 实战场景详解

场景1:execute - 快速执行“用后即焚”的异步任务

背景:在一个用户注册成功的主流程中,我们需要异步发送一封欢迎邮件。这个操作不应阻塞注册流程,但重要性不是最高,也没必要为此专门创建一个完整的线程池。ThreadUtil.execute正是为此而生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;

public class ThreadUtilScene1 {
public static void main(String[] args) {
System.out.println("主线程:用户注册流程开始...");

// 模拟注册耗时
ThreadUtil.sleep(1000);
String userEmail = "user_" + RandomUtil.randomString(6) + "@example.com";
System.out.println("主线程:用户 " + userEmail + " 注册成功!");

// 使用ThreadUtil.execute()异步发送邮件
ThreadUtil.execute(() -> {
System.out.println("异步任务:开始为 " + userEmail + " 发送欢迎邮件...");
// 模拟邮件发送耗时
ThreadUtil.sleep(2000);
System.out.println("异步任务:欢迎邮件已发送至 " + userEmail);
});

System.out.println("主线程:注册成功响应已返回给用户,无需等待邮件发送完成。");
}
}
// 输出 (顺序可能略有不同):
// 主线程:用户注册流程开始...
// 主线程:用户 user_xxxxxx@example.com 注册成功!
// 主线程:注册成功响应已返回给用户,无需等待邮件发送完成。
// 异步任务:开始为 user_xxxxxx@example.com 发送欢迎邮件...
// 异步任务:欢迎邮件已发送至 user_xxxxxx@example.com

小结execute是处理简单、临时性异步逻辑的最佳选择。一行代码即可实现,但需注意其使用的是全局共享线程池,不适用于执行关键业务或可能长时间阻塞的任务。


场景2:sleep & newNamedThreadFactory - 编写更专业、更可维护的线程代码

背景:我们需要创建一个专门处理定时上报任务的线程池。为了在日志和监控中清晰地识别这些线程,我们需要为它们命名。同时,在任务循环中需要使用sleep来控制上报频率,我们希望代码尽可能简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.example;

import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class ThreadUtilScene2 {
public static void main(String[] args) {
// 1. 使用ThreadUtil创建带名字的线程工厂
// 线程名将是 "metric-reporter-0", "metric-reporter-1", ...
// 第二个参数是是否开启守护线程
ThreadFactory namedThreadFactory = ThreadUtil.newNamedThreadFactory("metric-reporter-", false);

// 2. 创建一个单线程的线程池,并使用我们自定义的工厂
ExecutorService executor = Executors.newSingleThreadExecutor(namedThreadFactory);

System.out.println("启动监控数据上报任务...");

executor.execute(() -> {
int reportCount = 0;
while (reportCount < 3) {
System.out.println(Thread.currentThread().getName() + ": 上报第 " + (++reportCount) + " 次监控数据...");
// 模拟数据上报...

// 3. 使用ThreadUtil.sleep,代码更整洁
// 对比原生: try { Thread.sleep(3000); } catch (InterruptedException e) { ... }
System.out.println("上报完成,休眠3秒...");
ThreadUtil.sleep(3000);
}
System.out.println(Thread.currentThread().getName() + ": 任务完成。");
});

executor.shutdown();
}
}
// 输出:
// 启动监控数据上报任务...
// metric-reporter-0: 上报第 1 次监控数据...
// 上报完成,休眠3秒...
// metric-reporter-0: 上报第 2 次监控数据...
// 上报完成,休眠3秒...
// metric-reporter-0: 上报第 3 次监控数据...
// 上报完成,休眠3秒...
// metric-reporter-0: 任务完成。

小结newNamedThreadFactory是创建专业线程池不可或缺的一步,它能极大提升系统的可维护性。而ThreadUtil.sleep则是在日常编码中提升代码整洁度的小技巧。


2. ExecutorBuilder - 优雅地构建专业线程池

我们已经知道,直接使用ThreadPoolExecutor的构造函数来创建线程池是最佳实践,但这带来了记忆大量参数和顺序的负担。ExecutorBuilder通过优雅的链式API,完美地解决了这一问题。

  • 常用方法速查表
方法名功能描述
create()**(入口)**静态工厂方法,获取一个ExecutorBuilder实例,开启构建之旅。
setCorePoolSize(int)设置核心线程数。
setMaxPoolSize(int)设置最大线程数。
setKeepAliveTime(...)设置非核心线程的空闲存活时间。
setWorkQueue(...)**(关键)**设置工作队列。这是决定线程池行为的核心配置。
setThreadFactory(...)设置线程工厂,通常与ThreadUtil.newNamedThreadFactory配合使用。
setHandler(...)设置拒绝策略,用于处理线程池超载时的任务。
build()**(出口)**完成所有配置,构建并返回最终的ExecutorService实例。
ExecutorBuilder 实战场景详解

场景1:构建一个标准的、高可用的业务线程池

背景:我们需要为系统中的核心业务(例如处理订单、执行交易)创建一个专用的、健壮的线程池。它必须具备:明确的线程数限制、有界的任务队列、合理的拒绝策略以及可识别的线程名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.example;

import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadUtil;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorBuilderScene1 {
public static void main(String[] args) {
System.out.println("准备构建一个生产级的业务线程池...");

// 使用ExecutorBuilder的链式API进行配置
ExecutorService bizExecutor = ExecutorBuilder.create()
// 设置核心线程数为CPU核心数
.setCorePoolSize(Runtime.getRuntime().availableProcessors())
// 设置最大线程数为核心线程数的2倍
.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2)
// 设置非核心线程空闲60秒后被销毁
.setKeepAliveTime(60L, TimeUnit.SECONDS)
// 设置容量为500的有界队列,防止OOM
.setWorkQueue(new ArrayBlockingQueue<>(500))
// 使用Hutool工具创建带名字的线程工厂,方便定位问题
.setThreadFactory(ThreadUtil.newNamedThreadFactory("biz-order-handler-", false))
// 设置拒绝策略为“调用者运行”,这是一种有效的流量削峰/反压机制
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
// 完成构建
.build();

System.out.println("生产级线程池构建完成!\n" + bizExecutor);

// 提交一个任务测试
bizExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行核心业务...");
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 核心业务执行完毕。");
});

// 在实际应用中,线程池会作为单例一直运行,这里为了演示而关闭
bizExecutor.shutdown();
}
}
// 输出 (线程名中的数字可能不同):
// 准备构建一个生产级的业务线程池...
// 生产级线程池构建完成!
// java.util.concurrent.ThreadPoolExecutor@...[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
// biz-order-handler-0 开始执行核心业务...
// biz-order-handler-0 核心业务执行完毕。

小结ExecutorBuilder通过链式调用将ThreadPoolExecutor的复杂配置过程变得类型安全、顺序无关且极易阅读,是创建生产级线程池无可争议的最佳方式。


场景2:利用ExecutorBuilder的智能、安全的默认值

背景:有时我们只需要一个简单的线程池,不想配置所有细节,但又担心原生Executors的OOM陷阱。ExecutorBuilder在这里再次展现了它的优势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.example;

import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.thread.ThreadUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecutorBuilderScene2 {
public static void main(String[] args) {
System.out.println("使用ExecutorBuilder的默认配置创建一个相对安全的线程池...");

// 只配置核心和最大线程数,让其他参数使用Hutool的默认值
ExecutorService saferExecutor = ExecutorBuilder.create()
.setCorePoolSize(2)
.setMaxPoolSize(4)
.build();

// 打印出线程池的实际信息,观察其队列类型和容量
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) saferExecutor;
System.out.println("线程池类型: " + threadPoolExecutor.getClass().getName());
System.out.println("工作队列类型: " + threadPoolExecutor.getQueue().getClass().getName());
System.out.println("工作队列容量: " + threadPoolExecutor.getQueue().remainingCapacity());

saferExecutor.shutdown();
}
}
// 输出:
// 使用ExecutorBuilder的默认配置创建一个相对安全的线程池...
// 线程池类型: java.util.concurrent.ThreadPoolExecutor
// 工作队列类型: java.util.concurrent.LinkedBlockingQueue
// 工作队列容量: 1024

小结:从输出可见,当不指定工作队列时,ExecutorBuilder默认使用了一个容量为1024的有界队列,而不是Integer.MAX_VALUE。这个设计决策极大地提升了安全性,有效防止了因任务堆积导致的内存溢出,同时保持了创建过程的简洁。


7.6.3 异步任务与协调工具

本节我们将学习Hutool中用于简化FutureCompletableFuture操作的工具。虽然原生JUC的功能已经很强大,但Hutool在一些常见协作模式上,提供了更为简洁的“语法糖”。

1. AsyncUtil - 异步操作简化器

AsyncUtil (cn.hutool.core.thread.AsyncUtil) 提供了一系列静态方法,旨在用更少的代码处理异步任务的结果获取和等待。

  • 常用方法速查表
方法名功能描述
waitAll()**(推荐)**阻塞等待所有给定的CompletableFuture执行完毕。是对CompletableFuture.allOf(...).join()的封装,代码更简洁。
get(Future<T> future)以非受检异常的方式获取Future的结果,帮你省去编写try-catch块的麻烦。
waitAny()等待任意一个任务执行完毕
AsyncUtil 实战场景详解

场景1:waitAll - 更优雅地等待多个并行任务完成

背景:再次回到我们的服务编排场景。主线程需要并行发起对“商品信息”、“价格信息”、“库存信息”三个服务的调用,并且必须等待这三个调用全部成功返回后,才能进行最终的数据聚合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example;


import cn.hutool.core.thread.AsyncUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncUtilScene1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);

System.out.println("主线程:开始并行调用多个下游服务...");

// 1. 并行发起三个异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1000);
return "商品基本信息";
}, executor);

CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(2000);
return "商品价格";
}, executor);

CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1500);
return "商品库存";
}, executor);

System.out.println("主线程:所有服务已发起,等待全部完成...");

// 2. 使用AsyncUtil.waitAll优雅地等待
// 对比原生JDK: CompletableFuture.allOf(productFuture, priceFuture, stockFuture).join();
AsyncUtil.waitAll(productFuture, priceFuture, stockFuture);

System.out.println("主线程:所有服务均已返回!");

// 3. 获取并聚合结果
String result = StrUtil.format("聚合结果:\n - {}\n - {}\n - {}",
productFuture.get(), priceFuture.get(), stockFuture.get());

System.out.println(result);

executor.shutdown();
}
}
// 输出:
// 主线程:开始并行调用多个下游服务...
// 主线程:所有服务已发起,等待全部完成...
// (等待约2秒后)
// 主线程:所有服务均已返回!
// 聚合结果:
// - 商品基本信息
// - 商品价格
// - 商品库存

小结AsyncUtil.waitAll为“等待所有异步任务完成”这一常见模式提供了一个语义更清晰、代码更简洁的静态方法。它虽然只是对原生API的简单封装,但在提升代码可读性上效果显著。


7.6.4 并发测试好帮手:ConcurrencyTester

在编写了一个声称是“线程安全”的类或方法后,我们如何快速、方便地验证它呢?传统的方式是手动编写测试代码,创建多个线程,并使用CountDownLatch等同步器来确保它们尽可能地同时开始执行,代码相对繁琐。

Hutool的ConcurrencyTester (cn.hutool.core.thread.ConcurrencyTester) 就是为了解决这个问题而生的。

由来与用途

ConcurrencyTester 的核心使命是用最简单的API,模拟出高并发的测试场景。它主要用于:

  1. 验证线程安全性: 直观地暴露代码在并发访问下可能出现的竞态条件(Race Condition)和数据不一致问题。
  2. 进行简单性能测试: 粗略地衡量一段代码在多线程并发执行下的性能表现。

它是一个纯粹的测试辅助工具,不应用于生产业务逻辑中。

ConcurrencyTester 核心API

它的API极其精简:

方法名功能描述
ConcurrencyTester(int threadCount)**(入口)**构造函数,创建一个测试器,并指定需要模拟的并发线程数。
test(Runnable runnable)**(核心)**执行并发测试。它会创建并启动threadCount个线程,并使用内部的CyclicBarrier确保所有线程准备就绪后,尽可能地同时开始执行runnable中的逻辑。
getInterval()获取从第一个线程开始到最后一个线程结束的总耗时**(单位:毫秒)**。
ConcurrencyTester 实战场景详解

场景1:直观验证 i++ 的线程不安全性

背景:在7.2.1节,我们从理论上分析了i++操作并非原子性,在多线程下会导致计数错误。现在,我们将用ConcurrencyTester来实际复现这个问题,并与使用AtomicInteger的线程安全版本进行对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.example.hutool;

import cn.hutool.core.thread.ConcurrencyTester;

import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrencyTesterScene1 {

// 1. 一个非线程安全的计数器
static class UnsafeCounter {
private int count = 0;
public void increment() {
count++;
}
public int getCount() {
return count;
}
}

// 2. 一个使用AtomicInteger实现的线程安全计数器
static class SafeCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}

public static void main(String[] args) {
int threadCount = 1000; // 模拟1000个线程并发
int loopCountPerThread = 1000; // 每个线程执行1000次
long expectedValue = (long) threadCount * loopCountPerThread;

// --- 测试非线程安全的计数器 ---
UnsafeCounter unsafeCounter = new UnsafeCounter();
ConcurrencyTester unsafeTester = new ConcurrencyTester(threadCount);
unsafeTester.test(() -> {
for (int i = 0; i < loopCountPerThread; i++) {
unsafeCounter.increment();
}
});

System.out.println("--- 非线程安全测试 ---");
System.out.println("期望结果: " + expectedValue);
System.out.println("实际结果: " + unsafeCounter.getCount());
System.out.println("总耗时: " + unsafeTester.getInterval() + " ms\n");

// --- 测试线程安全的计数器 ---
SafeCounter safeCounter = new SafeCounter();
ConcurrencyTester safeTester = new ConcurrencyTester(threadCount);
safeTester.test(() -> {
for (int i = 0; i < loopCountPerThread; i++) {
safeCounter.increment();
}
});

System.out.println("--- 线程安全测试 ---");
System.out.println("期望结果: " + expectedValue);
System.out.println("实际结果: " + safeCounter.getCount());
System.out.println("总耗时: " + safeTester.getInterval() + " ms");
}
}
// 一次典型的输出:
// --- 非线程安全测试 ---
// 期望结果: 1000000
// 实际结果: 999457
// 总耗时: 51 ms
//
// --- 线程安全测试 ---
// 期望结果: 1000000
// 实际结果: 1000000
// 总耗时: 102 ms

小结ConcurrencyTester以一种无可辩驳的方式,清晰地暴露了UnsafeCounter的线程安全问题。它的结果通常不等于期望值,而SafeCounter则总能得到正确的结果。这证明了ConcurrencyTester是验证代码线程安全性的一个极其有效的“试金石”。