Java 核心技术:并发编程
Java并发编程:从内存模型到并发工具的设计哲学
并发编程的核心挑战不在于"如何让多个线程同时跑",而在于"如何让多个线程正确地协作"。理解 Java 内存模型和并发工具的设计原理,是写出正确并发代码的前提。
并发编程是 Java 工程师的核心能力之一。它涉及从硬件层面的缓存一致性,到语言层面的内存模型,再到 JUC 工具类的 API 设计,是一个纵深很大的知识领域。
本文将从底层原理出发,逐层构建 Java 并发编程的知识体系。
一、硬件基础:CPU 缓存与一致性
1.1 为什么需要缓存
现代 CPU 的运算速度远超主内存的读写速度(差距约 100 倍)。为了弥补这一差距,CPU 引入了多级缓存(L1/L2/L3 Cache)。每个核心拥有独立的 L1/L2 缓存,L3 缓存为所有核心共享。
CPU Core 0 CPU Core 1
┌─────────┐ ┌─────────┐
│ L1 Cache│ │ L1 Cache│
│ L2 Cache│ │ L2 Cache│
└────┬────┘ └────┬────┘
└────────┬─────────┘
L3 Cache(共享)
│
主内存(RAM)
缓存的引入解决了性能问题,但带来了新问题:当多个核心各自缓存了同一块数据的副本,其中一个核心修改了数据,如何保证其他核心看到的是最新值?
1.2 MESI 缓存一致性协议
MESI 是最广泛采用的缓存一致性协议,每个缓存行处于四种状态之一:
| 状态 | 含义 | 对主内存 |
|---|---|---|
| M(Modified) | 当前核心修改了数据,与主内存不一致 | 需要写回 |
| E(Exclusive) | 当前核心独占数据,与主内存一致 | 无需写回 |
| S(Shared) | 多个核心共享数据,与主内存一致 | 无需写回 |
| I(Invalid) | 缓存行无效 | 需从主内存重新加载 |
当 Core 0 修改了处于 S 状态的缓存行时:
- Core 0 将缓存行状态改为 M
- 通过总线嗅探(Bus Snooping)通知其他核心
- 其他核心将对应缓存行标记为 I
- 其他核心下次读取该数据时,从 Core 0 的缓存或主内存重新加载
1.3 缓存行伪共享(False Sharing)
缓存行是缓存操作的最小单位,大小通常为 64 字节。如果两个无关的变量恰好落在同一缓存行中,一个变量的修改会导致另一个变量的缓存行也失效——这就是伪共享。
// 伪共享示例:head 和 tail 可能在同一缓存行
class Queue {
volatile long head; // 生产者频繁修改
volatile long tail; // 消费者频繁修改
}
Doug Lea 在 LinkedTransferQueue 中的解决方案——填充字节使变量独占一个缓存行:
// JDK 7 中的做法
class PaddedAtomicReference<T> extends AtomicReference<T> {
Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
}
// JDK 8+ 可以使用 @Contended 注解
@sun.misc.Contended
class QueueNode {
volatile long value;
}
二、Java 内存模型(JMM)
2.1 JMM 的抽象
Java 内存模型(Java Memory Model)定义了多线程如何通过共享内存进行通信的规则。它并不描述具体的硬件实现,而是提供了一组抽象的可见性和有序性保证。
线程 A 工作内存 线程 B 工作内存
┌──────────────┐ ┌──────────────┐
│ 变量副本 │ │ 变量副本 │
└──────┬───────┘ └──────┬───────┘
│ save/load │
└──────────┬───────────┘
主内存
┌──────────────┐
│ 共享变量 │
└──────────────┘
JMM 定义了 8 种内存交互操作:lock、unlock、read、load、use、assign、store、write。这些操作的组合规则保证了多线程程序的语义正确性。
2.2 三大并发问题
| 问题 | 描述 | 根源 |
|---|---|---|
| 可见性 | 一个线程修改了变量,其他线程看不到最新值 | CPU 缓存导致各线程工作内存不一致 |
| 原子性 | 一组操作被中断导致中间状态暴露 | 线程切换导致复合操作被打断 |
| 有序性 | 代码执行顺序与编写顺序不一致 | 编译器优化、CPU 指令重排序 |
2.3 volatile 的语义与实现
volatile 是 Java 中最轻量的同步机制,它提供两个保证:
- 可见性:对 volatile 变量的写操作对所有线程立即可见
- 有序性:禁止指令重排序(通过内存屏障实现)
但不保证原子性:volatile int count; count++ 并不是线程安全的,因为 count++ 是读-改-写三步操作。
硬件级实现:
在 x86 架构上,对 volatile 变量的写操作会生成一条带 LOCK 前缀的指令。LOCK 前缀的作用:
- 将当前处理器缓存行的数据写回主内存
- 使其他处理器中缓存该地址的缓存行失效(通过 MESI 协议)
// JIT 编译后的汇编(x86)
0x01a3de24: lock addl $0x0,(%esp) // LOCK 前缀指令
在 P6 及更新的处理器上,LOCK 不再锁总线,而是锁缓存行(Cache Locking),性能开销远小于总线锁。
2.4 happens-before 规则
JMM 通过 happens-before 关系定义了操作间的可见性保证。如果操作 A happens-before 操作 B,则 A 的结果对 B 可见。
| 规则 | 说明 |
|---|---|
| 程序顺序规则 | 同一线程中的操作,前面的 happens-before 后面的 |
| volatile 规则 | volatile 写 happens-before 后续的 volatile 读 |
| 锁规则 | unlock happens-before 后续对同一锁的 lock |
| 传递性 | 如果 A hb B,B hb C,则 A hb C |
| 线程启动规则 | Thread.start() happens-before 该线程的每个动作 |
| 线程终止规则 | 线程的所有动作 happens-before 其他线程检测到该线程终止 |
三、锁机制
3.1 synchronized vs Lock
Java 提供两种锁机制:内置锁(synchronized)和显式锁(java.util.concurrent.locks.Lock)。
| 维度 | synchronized | Lock |
|---|---|---|
| 实现层面 | JVM 内置(monitorenter/monitorexit) | Java API 层(基于 AQS) |
| 锁获取 | 阻塞式,不可中断 | 支持非阻塞 tryLock()、可中断 lockInterruptibly() |
| 锁释放 | 自动释放(退出同步块) | 必须在 finally 中手动 unlock() |
| 条件等待 | Object.wait()/notify() |
Condition.await()/signal(),支持多条件队列 |
| 公平性 | 不支持 | ReentrantLock(true) 支持公平锁 |
| 锁状态查询 | 不支持 | isLocked()、getHoldCount() 等 |
选择原则:优先使用 synchronized(JVM 持续优化,且不会忘记释放锁);需要高级特性(超时、中断、多条件、公平性)时选择 Lock。
3.2 Condition:精确的线程协作
Condition 是 Lock 的配套组件,它替代了 Object.wait()/notify() 机制,最大的优势是支持多个等待队列。
// 使用 Object 的 wait/notify:只有一个等待队列,notifyAll 会唤醒所有线程
// 使用 Condition:可以创建多个条件队列,signal 只唤醒特定队列中的线程
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition(); // 生产者等待队列
Condition notEmpty = lock.newCondition(); // 消费者等待队列
有界缓冲区实现(经典的生产者-消费者模型):
class BoundedBuffer<E> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putIndex, takeIndex, count;
public void put(E e) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 缓冲区满,生产者等待
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
++count;
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 缓冲区空,消费者等待
E e = (E) items[takeIndex];
if (++takeIndex == items.length) takeIndex = 0;
--count;
notFull.signal(); // 通知生产者
return e;
} finally {
lock.unlock();
}
}
}
注意 await() 必须在 while 循环中调用,以防止虚假唤醒(Spurious Wakeup)。
3.3 ReadWriteLock:读写分离
当读操作远多于写操作时,使用排他锁会严重限制并发度。ReadWriteLock 允许多个线程同时持有读锁,但写锁是排他的。
| 锁状态 | 读锁请求 | 写锁请求 |
|---|---|---|
| 无锁 | 允许 | 允许 |
| 读锁已持有 | 允许(共享) | 阻塞 |
| 写锁已持有 | 阻塞 | 阻塞 |
ReentrantReadWriteLock 的设计决策:
- 写锁可降级为读锁:持有写锁的线程可以再获取读锁,然后释放写锁
- 读锁不可升级为写锁:防止死锁(多个读线程同时尝试升级会互相等待)
- 支持公平/非公平模式:非公平模式下,读锁可能"插队"导致写线程饥饿
四、JUC 并发工具类
java.util.concurrent 包提供了一组高级同步工具,用于解决常见的线程协调问题。
4.1 CountDownLatch:一次性倒计数门闩
语义:一个或多个线程等待其他线程完成一组操作后再继续执行。
CountDownLatch latch = new CountDownLatch(3); // 计数器初始值 3
// 工作线程
executor.submit(() -> {
doTask();
latch.countDown(); // 计数器 -1
});
// 等待线程
latch.await(); // 阻塞直到计数器归零
// 所有任务完成,继续执行
核心特征:
- 一次性:计数器归零后无法重置
- 底层基于 AQS 的共享模式实现
典型场景:服务启动时等待所有依赖组件初始化完成。
4.2 CyclicBarrier:可重用的屏障
语义:一组线程互相等待,直到所有线程都到达屏障点,然后同时继续执行。
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到齐,开始下一阶段"); // barrierAction
});
// 每个工作线程
executor.submit(() -> {
doPhase1();
barrier.await(); // 等待其他线程
doPhase2();
barrier.await(); // 可以重复使用
});
核心特征:
- 可重用:所有线程通过屏障后,计数器自动重置
- 支持 barrierAction:所有线程到齐时执行的回调
- 如果某个线程等待超时或被中断,屏障进入 Broken 状态,所有等待线程收到
BrokenBarrierException
4.3 Semaphore:信号量
语义:控制同时访问某个资源的线程数量。
Semaphore semaphore = new Semaphore(5); // 最多 5 个并发
executor.submit(() -> {
semaphore.acquire(); // 获取许可(可用许可 -1)
try {
accessResource();
} finally {
semaphore.release(); // 释放许可(可用许可 +1)
}
});
核心特征:
- 支持公平/非公平模式
tryAcquire()提供非阻塞获取- 许可数量可以动态增减(
release()可以在未acquire()的情况下调用)
4.4 三者对比
| 工具 | 核心语义 | 是否可重用 | 计数方向 | 典型场景 |
|---|---|---|---|---|
| CountDownLatch | 一个线程等待 N 个线程 | 否 | 递减至 0 | 主线程等待子任务完成 |
| CyclicBarrier | N 个线程互相等待 | 是 | 递增至 N | 多阶段并行计算 |
| Semaphore | 控制并发访问数量 | - | 许可的获取与释放 | 限流、资源池 |
五、生产者-消费者模式
生产者-消费者是并发编程中最经典的协作模式。Java 提供了从底层到高层的多种实现方式。
5.1 三种实现方式对比
| 实现方式 | 同步机制 | 通知粒度 | 复杂度 | 推荐度 |
|---|---|---|---|---|
| synchronized + wait/notify | 内置锁 | 全量唤醒(notifyAll) | 低 | 一般 |
| Lock + Condition | 显式锁 | 精确唤醒(signal) | 中 | 推荐 |
| BlockingQueue | 封装在队列内部 | 内部自动处理 | 最低 | 最推荐 |
为什么 BlockingQueue 是最佳选择:
BlockingQueue 将同步、等待、通知的逻辑完全封装在 put()/take() 方法内部,调用方无需关心并发细节:
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(100);
// 生产者
queue.put(task); // 队列满时自动阻塞
// 消费者
Task task = queue.take(); // 队列空时自动阻塞
5.2 BlockingQueue 的实现选型
| 实现类 | 底层结构 | 是否有界 | 锁策略 | 适用场景 |
|---|---|---|---|---|
ArrayBlockingQueue |
数组 | 有界 | 单锁 | 通用场景 |
LinkedBlockingQueue |
链表 | 可选有界 | 读写分离锁 | 吞吐量要求高 |
SynchronousQueue |
无容量 | 无 | CAS | 直接传递(线程池默认) |
PriorityBlockingQueue |
堆 | 无界 | 单锁 | 优先级调度 |
六、线程池
6.1 ThreadPoolExecutor 核心参数
new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 非核心线程空闲存活时间
TimeUnit.SECONDS,
workQueue, // 任务队列
threadFactory, // 线程工厂
rejectedHandler // 拒绝策略
);
任务提交流程:
提交任务
→ 当前线程数 < corePoolSize? → 创建核心线程执行
→ 任务队列未满? → 入队等待
→ 当前线程数 < maximumPoolSize? → 创建非核心线程执行
→ 以上都不满足 → 执行拒绝策略
6.2 拒绝策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy | 抛出 RejectedExecutionException |
默认策略,适合需要感知过载的场景 |
| CallerRunsPolicy | 由提交线程自己执行任务 | 反压效果,但可能导致提交线程阻塞 |
| DiscardPolicy | 静默丢弃任务 | 允许丢失的场景(如日志) |
| DiscardOldestPolicy | 丢弃队列中最旧的任务 | 实时性要求高、可接受旧数据丢失 |
6.3 生产阻塞型线程池
标准 ThreadPoolExecutor 使用 BlockingQueue.offer()(非阻塞)入队。队列满时不会阻塞提交线程,而是触发拒绝策略。
在某些场景下(如需要严格的背压机制),需要让提交线程在队列满时阻塞等待而非被拒绝。可通过自定义拒绝策略实现:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
coreSize, maxSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(capacity),
(runnable, pool) -> {
try {
// 队列满时,put() 会阻塞提交线程
pool.getQueue().put(runnable);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
);
这种方式的优势在于:复用 ThreadPoolExecutor 的线程管理能力,同时实现了生产者阻塞语义,避免了手工管理线程的复杂性。
6.4 线程池配置最佳实践
| 任务类型 | 核心线程数建议 | 队列选择 |
|---|---|---|
| CPU 密集型 | N_cpu + 1 |
小容量有界队列 |
| I/O 密集型 | N_cpu × 2 或更高 |
较大容量有界队列 |
| 混合型 | 拆分为 CPU 池和 I/O 池 | 各自独立配置 |
关键原则:
- 永远不要使用无界队列:
Executors.newFixedThreadPool()默认使用无界的LinkedBlockingQueue,可能导致 OOM - 为线程池命名:自定义
ThreadFactory,给线程添加有意义的名称前缀,便于排查问题 - 监控队列深度:线程池队列持续增长是系统过载的信号
总结
Java 并发编程的知识体系可以沿着三个层次理解:
- 硬件层:CPU 缓存、MESI 协议、缓存行伪共享——这是并发问题的物理根源
- 模型层:JMM、happens-before、volatile/synchronized 语义——这是 Java 对硬件差异的抽象屏蔽
- 工具层:Lock/Condition、CountDownLatch/CyclicBarrier/Semaphore、BlockingQueue、ThreadPoolExecutor——这是面向工程的并发编程基础设施
并发工具的选择不在于功能的强大,而在于语义的匹配。
synchronized足以解决大多数问题;BlockingQueue比手动的 wait/notify 更安全;标准ThreadPoolExecutor比自定义线程管理更可靠。优先选择高层抽象,只在确有需要时才下沉到底层机制。
本文介绍的 JUC 工具类(ReentrantLock、CountDownLatch、Semaphore 等)底层都依赖同一个框架——AbstractQueuedSynchronizer(AQS)。如果想深入理解这些工具"为什么这样工作",推荐阅读 《Java 核心技术:深入理解 AQS》。
Java 核心技术专栏