Java 中的 CyclicBarrier(循环屏障)是 java.util.concurrent
包提供的一个同步工具类,用于协调多个线程在某个“屏障点”(Barrier Point)等待彼此,直到所有线程都到达后再一起继续执行。它的核心特性是可重复使用(Cyclic),适用于需要多线程分阶段协作的场景。
1. 核心概念
- 屏障点(Barrier Point):
所有线程必须调用await()
方法到达屏障点后,才能继续执行后续任务。- 如果线程未全部到达,调用
await()
的线程会阻塞。 - 当最后一个线程到达屏障点时,所有被阻塞的线程会被唤醒。
- 如果线程未全部到达,调用
- 可重用性(Cyclic):
与CountDownLatch
不同,CyclicBarrier 的计数器可以重置并重复使用。通过reset()
方法,屏障可以进入下一轮同步。 - 屏障动作(Barrier Action):
可选参数,当所有线程到达屏障点时,会执行一个额外的Runnable
任务(例如合并结果或触发下一轮操作)。
2. 工作原理
CyclicBarrier 的底层实现基于 ReentrantLock 和 Condition,其核心逻辑如下:
- 初始化:
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程已到达屏障点");
});
parties=3
:表示需要 3 个线程到达屏障点。barrierAction
:当所有线程到达时执行的回调任务。
- 等待线程:
每个线程调用await()
方法到达屏障点:
barrier.await(); // 阻塞当前线程,直到所有线程到达
- 唤醒线程:
当所有线程到达屏障点后:
- 执行
barrierAction
(如果有)。 - 唤醒所有等待的线程,继续执行后续任务。
- 重置屏障:
调用reset()
方法重置屏障状态,允许下一轮同步:
barrier.reset(); // 重置计数器和状态
3. 核心方法
方法名 | 描述 |
---|---|
CyclicBarrier(int parties) | 构造函数,指定需要等待的线程数。 |
CyclicBarrier(int parties, Runnable barrierAction) | 构造函数,指定线程数和屏障动作。 |
int await() | 线程到达屏障点并阻塞,直到所有线程到达。返回当前线程的索引(0 表示最后一个线程)。 |
int await(long timeout, TimeUnit unit) | 带超时的等待,若超时未到达则抛出 TimeoutException 。 |
void reset() | 重置屏障状态,允许下一轮同步。 |
int getParties() | 返回需要等待的线程总数。 |
boolean isBroken() | 判断屏障是否被破坏(因中断或超时导致)。 |
4. 使用场景
CyclicBarrier 适用于以下典型场景:
- 并行计算:
将任务拆分为多个子任务并行执行,所有子任务完成后合并结果。
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("合并所有子任务结果");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
// 执行子任务
barrier.await();
}).start();
}
- 多线程测试:
确保所有线程同时触发某个操作(如模拟高并发请求)。
CyclicBarrier startBarrier = new CyclicBarrier(1000);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
startBarrier.await(); // 等待所有线程就位
simulateRequest();
}).start();
}
- 游戏服务器同步:
等待所有玩家完成当前回合后再广播结果。
CyclicBarrier turnBarrier = new CyclicBarrier(4, () -> {
broadcastTurnResult(); // 广播回合结果
});
players.forEach(player -> new Thread(() -> {
while (gameRunning) {
player.takeAction();
turnBarrier.await();
}
}).start());
5. 与 CountDownLatch 的区别
特性 | CyclicBarrier | CountDownLatch |
---|---|---|
计数器方向 | 递增(到达屏障后重置) | 递减(不可重置) |
可重用性 | 支持 reset() 方法,可重复使用 | 一次性工具,不可重置 |
等待机制 | 所有线程互相等待 | 主线程等待子线程 |
屏障动作 | 支持回调函数(barrierAction ) | 不支持 |
典型场景 | 多阶段并行计算、多线程同步 | 资源初始化、主线程等待子线程 |
6. 注意事项
- 屏障破坏(Broken Barrier):
- 如果任意线程在等待时被中断或超时,屏障会被标记为“broken”,所有等待线程抛出
BrokenBarrierException
。 - 可通过
isBroken()
检查状态,并用reset()
重置。
- 异常处理:
await()
方法可能抛出InterruptedException
(线程中断)、BrokenBarrierException
(屏障破坏)或TimeoutException
(超时)。
- 性能优化:
CyclicBarrier 基于锁(ReentrantLock
)实现,适合线程数较少的场景。高并发场景需谨慎使用。
7. 示例代码
场景:多线程计算并合并结果
import java.util.concurrent.*;
public class CyclicBarrierExample {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
int numThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numThreads, () -> {
System.out.println("所有线程已到达屏障点,开始合并结果");
});
for (int i = 0; i < numThreads; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 正在执行任务...");
Thread.sleep((long) (Math.random() * 3000)); // 模拟任务执行时间
System.out.println(Thread.currentThread().getName() + " 已到达屏障点");
barrier.await(); // 等待所有线程
System.out.println(Thread.currentThread().getName() + " 继续执行后续任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
}
}
8. 总结
CyclicBarrier 是 Java 并发编程中的重要工具,通过“屏障点”机制实现多线程的协同与分阶段执行。它与 CountDownLatch 各有适用场景:
- CyclicBarrier:适合线程之间需要多次同步的场景(如多阶段任务)。
- CountDownLatch:适合一次性等待的场景(如主线程等待子线程完成)。
理解其设计原理和使用场景,有助于编写高效、可靠的并发程序。
THE END