Executor 框架详解

Executor框架最核心的类是ThreadPoolExecutor,它是Java线程池的实现类,通过Executors工具类,可以创建3种类型的ThreadPoolExecutor:
首先附上ThreadPoolExecutor的构造函数

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

FixedThreadPool

可重用==固定线程数==的线程池。实现源码:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

在这里,corePoolSizemaximumPoolSize都设置成了创建时指定的线程数nThread,当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的时间,超过这个时间后线程将会被终止。在这里设置为0L,意味着多余的空闲线程将会被立即终止。
FixedPoolThreadexecute()方法运行流程如下:

FiexdPoolThread运行流程

  • 如果当前运行的线程少于corePoolSize,则创建新的线程来执行任务。
  • 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  • 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

由于它采用了LinkedBlockingQueue来作为线程池的工作队列,这是一个无界队列(队列容量为Integer.MAX_VALUE)。所以将会为线程池带来如下影响:

  1. 当线程池的线程数达到corePoolSize之后,新的任务将会在无界队列中等待,因此线程池中的线程不会超过corePoolSize

  2. 由于1,使用无界队列时maximumPoolSize将是一个无效参数;

  3. 由于1和2,使用无界队列时keepAliveTime将是一个无效参数
  4. 由于使用无界队列,运行中的FixedThreadPool(未执行shutdown()或shutdownNow())不会拒绝任务。

SingleThreadExecutor

使用单个worker线程的Executor,下面是源码实现:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutorcorePoolSizemaximumPoolSize都被设置成1,其他参数则与FixedThreadPool相同,它使用无界队列的影响与FixedThreadPool是同样的。下面是它的运行流程图:

SingleThreadExecutor运行流程

  • 如果当前没有线程在运行,则创建一个新的线程
  • 当线程池中有且仅有一个运行的线程,将任务加入LinkedBlockingQueue
  • 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务执行。

CachedThreadPool

一个会根据需要创建新线程的线程池,下面是源码实现:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

它的corePoolSize被设置为0,maximumPoolSize设置为Integer.MAX_VALUE,即maximumPool是无界的;把keepAliveTime设置成60L,意味着该线程池中空闲线程等待新任务的最长时间为60秒,超时则被会终止。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPoolmaximumPool是无界的,意味着,如果主线程提交任务的速度高于maximumPool中线程处理的速度,它将会不断的创建新的线程,直到耗尽所有CPU和内存资源。它的运行流程如图:

CachedThreadPool运行流程

  • 首先执行SynchronousQueue.offer(task),如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute执行成功,否则执行下面的操作。
  • maximumPool为空,或者其中没有空闲线程时,将没有线程执行poll操作,那么CachedThreadPool会创建一个新的线程,execute方法执行成功。
  • 在上面的步骤中,线程将任务执行完后,变为空闲状态,将会执行poll操作,空闲线程继续等待新的任务到来并配对执行,如果超过60s,则线程将会被终止。由于超过60秒的空闲线程将会被终止,所以长时间保持空闲的CachedThreadPool不会使用任何资源

CachedThreadPool的任务传递示意图:

任务传递

ScheduledThreadPoolExecutor

主要用来在给定的延迟之后运行任务,或者定期执行任务,下面是源码实现:

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

该类直接继承自ThreadPoolExecutor,为的是能直接使用已经实现的方法,而且,它新加了几个方法:

  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
    //向定时任务线程池提交一个延时Runnable任务(仅执行一次)
  • public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
    //向定时任务线程池提交一个延时的Callable任务(仅执行一次)

  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,long period, TimeUnit unit)
    //向定时任务线程池提交一个固定时间间隔执行的任务

  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay, TimeUnit unit);
    //向定时任务线程池提交一个固定延时间隔执行的任务

也可以向其直接提交普通任务,相当于延时和周期都为0的任务。这里构造时的数值含义与前面的类似,就不赘述了。下面是基本运行流程图:

ScheduledThreadPoolExecutor运行流程

  • 主线程提交普通任务或有延时或周期的任务。并将任务加入具有优先性质的阻塞队列
  • 线程池中的线程每次通过take()方法获取任务,然后从任务的getDelay()方法获取应当延时的时间,当延时达到,执行任务。执行完毕后,判断这是不是一个周期任务,如果是,则继续延时执行任务,如果不是,则继续调用take方法获取任务,如果没有获取到任务,空闲线程将会被终止。

FutureTask

代表异步计算的结果,根据FutureTask.run()方法执行状况,可以分为下面三种状态:

  • 未启动:FutureTask.run()方法还没有被执行前
  • 已启动:FutureTask.run()方法正在执行
  • 已完成:FutureTask.run()方法正常结束、被取消、执行抛出异常、异常结束

状态转换

下面是FutureTask对应get(),cancel()方法执行示意图:

get&cancel

文章目录
  1. 1. FixedThreadPool
  2. 2. SingleThreadExecutor
  3. 3. CachedThreadPool
  4. 4. ScheduledThreadPoolExecutor
  5. 5. FutureTask
|