ForkJoinPool 是 Java 7 引入的一个线程池实现,位于 java.util.concurrent
包中。它是专门为支持 分治算法(Divide-and-Conquer)和 并行任务 而设计的。ForkJoinPool 的核心思想是将一个大任务拆分成多个小任务(Fork),然后将这些小任务分配给多个线程并行执行,最后将结果合并(Join)。
1. ForkJoinPool 的核心特点
- 工作窃取算法(Work-Stealing Algorithm):
- 每个线程都有自己的任务队列。
- 当一个线程的任务队列为空时,它会从其他线程的任务队列中“窃取”任务来执行。
- 这种机制可以有效地平衡线程之间的负载,提高 CPU 利用率。
- 分治思想:
- 适用于递归任务,将大任务拆分成小任务,直到任务足够小可以直接解决。
- 适合处理可以分解的并行任务,如排序、搜索、图像处理等。
- 轻量级任务:
- ForkJoinPool 适合处理轻量级的任务,而不是 I/O 密集型任务。
2. ForkJoinPool 的核心类
- ForkJoinPool:
- 线程池的实现,用于管理和调度任务。
- 默认情况下,ForkJoinPool 的线程数等于 CPU 的核心数。
- ForkJoinTask:
- 表示一个可以被 ForkJoinPool 执行的任务。
- 通常使用它的两个子类:
- RecursiveAction:用于没有返回值的任务。
- RecursiveTask:用于有返回值的任务。
3. 使用 ForkJoinPool 的步骤
- 定义任务:
- 继承
RecursiveAction
或RecursiveTask
,并实现compute()
方法。 - 在
compute()
方法中,判断任务是否需要拆分,如果需要则拆分成子任务并调用fork()
,最后通过join()
合并结果。
- 继承
- 提交任务:
- 将任务提交到
ForkJoinPool
中执行。
- 将任务提交到
4. 代码示例
示例 1:使用 RecursiveAction
实现无返回值的任务
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class MyRecursiveAction extends RecursiveAction {
private final int threshold = 10;
private int start;
private int end;
public MyRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= threshold) {
// 直接执行任务
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
} else {
// 拆分任务
int mid = (start + end) / 2;
MyRecursiveAction leftTask = new MyRecursiveAction(start, mid);
MyRecursiveAction rightTask = new MyRecursiveAction(mid, end);
leftTask.fork(); // 异步执行左任务
rightTask.fork(); // 异步执行右任务
leftTask.join(); // 等待左任务完成
rightTask.join(); // 等待右任务完成
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
MyRecursiveAction task = new MyRecursiveAction(0, 100);
pool.invoke(task); // 提交任务
}
}
示例 2:使用 RecursiveTask
实现有返回值的任务
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MyRecursiveTask extends RecursiveTask<Integer> {
private final int threshold = 10;
private int start;
private int end;
public MyRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= threshold) {
// 直接计算
int sum = 0;
for (int i = start; i < end; i++) {
sum += i;
}
return sum;
} else {
// 拆分任务
int mid = (start + end) / 2;
MyRecursiveTask leftTask = new MyRecursiveTask(start, mid);
MyRecursiveTask rightTask = new MyRecursiveTask(mid, end);
leftTask.fork(); // 异步执行左任务
int rightResult = rightTask.compute(); // 同步执行右任务
int leftResult = leftTask.join(); // 等待左任务完成
return leftResult + rightResult;
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
MyRecursiveTask task = new MyRecursiveTask(0, 100);
int result = pool.invoke(task); // 提交任务并获取结果
System.out.println("Result: " + result); // 输出 4950
}
}
5. ForkJoinPool 的常用方法
invoke(ForkJoinTask<T> task)
:- 提交任务并等待结果。
submit(ForkJoinTask<T> task)
:- 提交任务,返回一个
ForkJoinTask
,可以异步获取结果。
- 提交任务,返回一个
shutdown()
:- 关闭线程池,不再接受新任务。
awaitTermination(long timeout, TimeUnit unit)
:- 等待线程池中的任务执行完毕。
6. ForkJoinPool 的适用场景
- CPU 密集型任务:
- 适合处理可以分解的并行任务,如排序、搜索、矩阵计算等。
- 递归任务:
- 适合处理递归算法,如归并排序、快速排序等。
- 不适合 I/O 密集型任务:
- 由于 ForkJoinPool 的设计目标是最大化 CPU 利用率,因此不适合处理 I/O 密集型任务(如文件读写、网络请求等)。
7. 总结
- ForkJoinPool 是一个高效的线程池实现,专为分治算法和并行任务设计。
- 它通过工作窃取算法和任务拆分机制,能够充分利用多核 CPU 的性能。
- 适合处理 CPU 密集型的递归任务,但不适合 I/O 密集型任务。
- 通过
RecursiveAction
和RecursiveTask
,可以方便地实现任务的拆分与合并。
THE END
暂无评论内容