设计一个线程池是一个经典的面试题,涉及并发编程、资源管理、任务调度等多个方面。以下是设计一个简化版线程池的详细思路和实现方案。
1. 需求分析
核心功能
- 任务提交:支持提交任务(Runnable 或 Callable)。
- 任务执行:使用线程池中的线程执行任务。
- 线程管理:动态管理线程的数量(核心线程、最大线程)。
- 任务队列:存储待执行的任务。
- 拒绝策略:当任务队列满时,处理新提交的任务。
非功能需求
- 高性能:高效地执行任务。
- 资源控制:限制线程数量,避免资源耗尽。
- 可扩展性:支持动态调整线程池参数。
2. 系统设计
核心组件
- 任务队列:存储待执行的任务(如
BlockingQueue
)。 - 工作线程:执行任务的线程。
- 线程池管理器:管理线程的生命周期和任务调度。
线程池参数
- 核心线程数(corePoolSize):线程池中保持的最小线程数。
- 最大线程数(maximumPoolSize):线程池中允许的最大线程数。
- 任务队列(workQueue):存储待执行的任务。
- 拒绝策略(RejectedExecutionHandler):当任务队列满时,处理新提交的任务。
3. 详细设计
任务队列
- 使用
BlockingQueue
实现任务队列,支持阻塞操作。 - 常用实现:
LinkedBlockingQueue
、ArrayBlockingQueue
。
工作线程
- 每个工作线程从任务队列中获取任务并执行。
- 如果任务队列为空,线程等待新任务。
线程池管理器
- 负责创建、销毁线程。
- 根据任务队列的状态动态调整线程数量。
拒绝策略
- 当任务队列满且线程数达到最大值时,执行拒绝策略。
- 常用策略:
- 抛出异常(
AbortPolicy
)。 - 丢弃任务(
DiscardPolicy
)。 - 丢弃队列中最旧的任务(
DiscardOldestPolicy
)。 - 由提交任务的线程执行任务(
CallerRunsPolicy
)。
- 抛出异常(
4. 关键问题与解决方案
1. 线程生命周期管理
- 解决方案:
- 使用
ThreadFactory
创建线程。 - 使用
volatile
变量控制线程状态。
- 使用
2. 任务调度
- 解决方案:
- 使用
BlockingQueue
实现任务队列。 - 工作线程从队列中获取任务并执行。
- 使用
3. 动态调整线程数量
- 解决方案:
- 当任务队列满时,创建新线程(不超过最大线程数)。
- 当线程空闲时,销毁多余线程(不低于核心线程数)。
4. 拒绝策略
- 解决方案:
- 提供多种拒绝策略,允许用户自定义。
5. 示例实现
线程池核心类
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadPool {
private final BlockingQueue<Runnable> workQueue; // 任务队列
private final Thread[] workers; // 工作线程数组
private final AtomicInteger threadCount = new AtomicInteger(0); // 当前线程数
public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
this.workers = new Thread[maximumPoolSize];
// 初始化核心线程
for (int i = 0; i < corePoolSize; i++) {
createWorker();
}
}
// 提交任务
public void execute(Runnable task) {
if (threadCount.get() < workers.length) {
// 如果线程数未达到最大值,创建新线程
createWorker();
}
// 将任务加入队列
workQueue.offer(task);
}
// 创建工作线程
private void createWorker() {
Thread worker = new Thread(() -> {
while (true) {
try {
// 从队列中获取任务
Runnable task = workQueue.take();
task.run(); // 执行任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
workers[threadCount.getAndIncrement()] = worker;
worker.start();
}
// 关闭线程池
public void shutdown() {
for (Thread worker : workers) {
if (worker != null) {
worker.interrupt();
}
}
}
}
测试代码
public class Main {
public static void main(String[] args) {
MyThreadPool threadPool = new MyThreadPool(2, 4, new LinkedBlockingQueue<>(10));
// 提交任务
for (int i = 0; i < 10; i++) {
int taskId = i;
threadPool.execute(() -> {
System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
threadPool.shutdown();
}
}
6. 拒绝策略实现
自定义拒绝策略
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable task, MyThreadPool executor);
}
// 抛出异常策略
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, MyThreadPool executor) {
throw new RuntimeException("Task rejected: " + task.toString());
}
}
// 调用者执行策略
public class CallerRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task, MyThreadPool executor) {
task.run(); // 由提交任务的线程执行
}
}
在线程池中使用拒绝策略
public void execute(Runnable task) {
if (threadCount.get() < workers.length) {
createWorker();
}
if (!workQueue.offer(task)) {
rejectedExecutionHandler.rejectedExecution(task, this); // 执行拒绝策略
}
}
7. 总结
- 核心组件:任务队列、工作线程、线程池管理器。
- 动态调整:根据任务队列状态动态调整线程数量。
- 拒绝策略:提供多种策略处理任务队列满的情况。
通过以上设计,可以实现一个简化版的线程池,支持高效的任务调度和执行。
THE END
暂无评论内容