使用场景
Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead,and they provide a means of bounding and managing the resources,including threads, consumed when executing a collection of tasks.
根据上面官方的解释,线程池主要是为了解决下面的两个问题
- 提高执行大量异步任务时的性能
- 管理执行异步任务的线程
关键类
-
ThreadPoolExecutor
线程池的核心类,实现了对线程的创建、执行、回收等各种管理功能。
-
Executors
线程池的工具类,提供了多种静态方法用来快速创建不同类型的线程池。
关键参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
上面是 ThreadPoolExecuto r的构造器,其中定义了线程池的几个关键参数。
corePoolSize
线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize 。如果当前线程数为 corePoolSize 继续提交的任务被保存到阻塞队列中,等待被执行。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,直到当前线程数等于 maximumPoolSize。
keepAliveTime
当线程池中的线程数大于 corePoolSize 时,多余线程的存活时间。如果线程空闲的时间达到keepAliveTime,则会被销毁,直到线程池中的线程数不超过 corePoolSize 。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,keepAliveTime 参数也会起作用,直到线程池中的线程数为0。
unit
keepAliveTime 参数的时间单位。
workQueue
任务缓存队列,用来存放等待执行的任务。如果当前线程数为 corePoolSize ,继续提交的任务就会被保存到任务缓存队列中,等待被执行。这里的 BlockingQueue 可以有以下几种选择:
-
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。
-
LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为 Integer.MAX_VALUE ,即为无界队列,按 FIFO 顺序执行任务。如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。
-
ArrayBlockingQueue:基于数组结构的有界阻塞队列,构造时需指定容量大小,按FIFO顺序执行任务。可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
-
PriorityBlockingQueue: 会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限。
-
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
threadFactory
线程工厂,创建新线程时使用的线程工厂。
handler
任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:
-
AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认策略。
-
CallerRunsPolicy:由调用execute方法的线程执行该任务。
-
DiscardPolicy:丢弃任务,但是不抛出异常。
-
DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如记录日志或持久化存储不能处理的任务。
线程池状态
// 线程池的运行状态,总共有5个状态,用高3位来表示
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
从上面的源码可以知道线程池有以下 5 个状态,但是线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数
// 通过与的方式,获取ctl的高3位,也就是线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
//SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
RUNNING
高3位为111,接受新任务并处理阻塞队列中的任务
SHUTDOWN
高3位为000,不接受新任务但会处理阻塞队列中的任务
STOP
高3位为001,不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
TIDYING
高3位为010,所有任务都已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法
TERMINATED
高3位为011,terminated()方法已经执行结束
复用原理
runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//复用的关键在这个while 循环,只要满足 task != null(Worker线程第一次使用是满足) || getTask() !=null 这 2 个条件就能一直复用这个Worker线程
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//真正执行提交的任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask 方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果线程池处于shutdown状态,并且队列为空,或者线程池处于stop或者terminate状态,在线程池数量-1,返回null,回收线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//标识当前线程在空闲时,是否应该超时回收
// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask()方法的返回情况:
- 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象
- 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象
- 线程池状态大于等于STOP,返回null,回收线程
- 线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程
- worker数量大于maximumPoolSize,返回null,回收线程
- 线程空闲时间超时,返回null,回收线程
代码示例
PriorityBlockingQueue
public static void main(String[] args) throws Exception {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 3, 0, TimeUnit.MICROSECONDS, new PriorityBlockingQueue<>(3));
//故意乱序
poolExecutor.execute(new OrderTask(3));
poolExecutor.execute(new OrderTask(1));
poolExecutor.execute(new OrderTask(2));
}
//实现 Comparable 接口 ,也可不实现构造PriorityBlockingQueue 传入一个比较器即可
static class OrderTask implements Runnable, Comparable<OrderTask> {
//排序依据
int order;
public OrderTask(int order) {
this.order = order;
}
@Override
public void run() {
System.out.println("this taks order = " + order);
}
@Override
public int compareTo(OrderTask o) {
return order - o.order;
}
}
//out 按order排序再执行的
this taks order = 1
this taks order = 2
this taks order = 3
CallerRunsPolicy
public static void main(String[] args) throws Exception {
//构造一个核心线程和最大线程数都是 1 的线程池,而且任务队列容量也是1 这样的话该线程池就只能最多处理 2 个任务
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
//提交 3 个任务 会有一个被拒绝
for (int i = 0; i < 3; i++) {
poolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}
//out 在 CallerRunsPolicy 这个策略下超出线程池处理能力的任务会在 execute 方法调用者的线程里执行 ,这里就是 main 线程。
main
pool-1-thread-1
pool-1-thread-1
ForkJoinPool
public static void main(String[] args) throws Exception {
List<String> tses = new ArrayList<>();
for (int i = 0; i < 45; i++) {
tses.add(i + ".ts");
}
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<List<String>> forkJoinTask = new TseqTask(tses, 10);
ForkJoinTask<List<String>> task = forkJoinPool.submit(forkJoinTask);
List<String> res = task.get();
System.out.println(res);
}
static class TseqTask extends RecursiveTask<List<String>> {
private List<String> tses;
private int splitSize;
public TseqTask(List<String> tses, int splitSize) {
this.tses = tses;
this.splitSize = splitSize;
}
@Override
protected List<String> compute() {
if (tses.size() <= splitSize) {
//此处模拟耗时操作
return tses;
}
List<TseqTask> subTasks = new ArrayList<>();
for (int i = 0; i < tses.size(); i += splitSize) {
int to = i + splitSize > tses.size() ? tses.size() : i + splitSize;
TseqTask subTsTask = new TseqTask(tses.subList(i, to), splitSize);
subTasks.add(subTsTask);
subTsTask.fork();
}
List<String> res = new ArrayList<>();
for (TseqTask subTask : subTasks) {
try {
res.addAll(subTask.get());
} catch (Exception e) {
e.printStackTrace();
}
}
return res;
}
}