28.JAVA异步编程:Executor框架
1.Executor与ExecutorService接口
java.util.concurrent.Executor接口是对任务的执行进行的抽象,该接口仅定义了如下方法:
void execute(Runnable command)
其中,command 参数代表需要执行的任务。Executor接口使得任务的提交方(相当于生产者)只需要知道它调用 Executor.execute 方法便可以使指定的任务被执行,而无须关心任务具体的执行细节:比如,任务是采用一个专门的工作者线程执行的,还是采用线程池执行的;采用什么样的线程池执行的;多个任务是以何种顺序被执行的。可见,Executor接口使得任务的提交能够与任务执行的具体细节解耦。
Executor接口比较简单,功能也十分有限:首先,它只能为客户端代码执行任务,而无法将任务的处理结果返回给客户端代码;其次,Executor 接口实现类内部往往会维护一些工作者线程,当我们不再需要一个 Executor 实例的时候,往往需要主动将该实例内部维护的工作者线程停掉以释放相应的资源,而 Executor 接口并没有定义相应的方法。
ExecutorService 接口继承自 Executor 接口,它解决了上述问题。ExecutorService 接口定义了几个submit 方法,这些方法能够接受 Callable 接口或者 Runnable 接口表示的任务并返回相应的 Future 实例,从而使客户端代码提交任务后可以获取任务的执行结果。ExecutorService 接口还定义了 shutdown()方法和 shutdownNow()方法来关闭相应的服务(比
如关闭其维护的工作者线程)。ThreadPoolExecutor 是 ExecutorService 的默认实现类。
2.实用工具类 Executors
Executors除了能够返回默认线程工厂 (Executors.defaultThreadFactory())、能够将 Runnable 实例转换为 Callable 实例( Executors.callable 方法)之外,还提供了一些能够返回ExecutorService 实例的快捷方法,这些 ExecutorService 实例往往使我们在不必手动创建ThreadPoolExecutor实例的情况下使用线程池 。
- Executors.newCachedThreadPool() 。该方法的返回值相当于:
new ThreadPoolExecutor(O, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
即一个核心线程池大小为 0, 最大线程池大小不受限,工作者线程允许的最大空闲时间为 60秒,内部以 SynchronousQueue 为 工作队列(以下称之为 workerQueue) 的一个线程池 。 这种配置意味着该线程池中的所有工作者线程在空闲了指定的时间后都可以被自动清理掉。由于该线程池的核心线程池大小为 0, 因此提交给该线程池执行的第一个任务会导致该线程池中的第一个工作者线程被创建并启动。后续继续给该线程池提交任务的时候,由于当前线程池
大小已经超过核心线程池大小 (0), 因此 ThreadPoolExecutor 此时会将任务缓存到工作队列之中(即调用 workerQueue.offer 方法)。SynchronousQueue 内部并不维护用于存储队列元素的实际存储空间 。
一个线程(生产者线程)在执行 SynchronousQueue.offer(E)的时候,如果没有其他线程(消费者线程)因执行 SynchronousQueue.take()而被暂停,那么 SynchronousQueue.offer(E)调用会直接返回 false, 即入队列失败。因此,在该线程池中的所有工作者线程都在执行任务,即无空闲工作者线程的情况下给其提交任务会导致该任务无法被缓存成功。而ThreadPoolExecutor 在任务缓存失败且线程池当前大小未达到最大线程池大小(这里的最大线程池大小实际上相当于不限)的情况下会创建并启动新的工作者线程。在极端的情况下,给该线程池每提交一个任务都会导致一个新的工作者线程被创建并启动,而这最终会导致系统中的线程过多,从而导致过多的上下文切换而使得整个系统被拖慢。因此,Executors.newCachedThreadPool()所返回的线程池适合于用来执行大量耗时较短且提交频率较高的任务。而提交频率较高且耗时较长的任务(尤其是包含阻塞操作的任务)则不适合用Executors.newCachedThreadPool()所返回的线程池来执行 。
- Executors.newFixedThreadPool(int nThreads)。该方法的返回值相当于:
new ThreadPoolExecutor(nThreads, nThreads,OL, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()};
即一个以无界队列为工作队列,核心线程池大小与最大线程池大小均为nThreads 且线程池中的空闲工作者线程不会被自动清理的线程池,这是一种线程池大小一旦达到其核心线程池大小就既不会增加也不会减少工作者线程的固定大小的线程池。因此,这样的线程池实例一旦不再需要,我们必须主动将其关闭 。
- Executors.newSingleThreadExecutor() 。该方法的返回值基本相当于 Executors.newFixedThreadPool(1)所返回的线程池。不过,该线程池并非ThreadPoolExecutor实例,而是一个封装了 ThreadPoolExecutor 实例且对外仅暴露 ExecutorService 接口所定义的方法的一个 ExecutorService 实例。该线程池便于我们实现单(多)生产者—单消费者模式。该线程池确保了在任意一个时刻只有一个任务会被执行,
这就形成了类似锁将原本并发的操作改为串行的操作的效果。因此,该线程池适合于用来执行访问了非线程安全对象而我们又不希望因此而引入锁的任务。该线程池也适合于用来执行 I/O操作,因为 I/O 操作往往受限于相应的 I/O 设备,使用多个线程执行同一种 I/O 操作(比如多个线程各自读取一个文件)可能并不会提高 I/O 效率,所以如果使用一个线程执行 I/O 足以满足要求,那么仅使用一个线程即可,这样可以保障程序的简单性以避免一些不必要的问题(比如死锁)。
3.异步任务的批量执行:CompletionService
尽管 Future 接口使得我们能够方便地获取异步任务的处理结果,但是如果需要一次性提交一批异步任务并获取这些任务的处理结果的话,那么仅使用 Future 接口写出来的代码将颇为烦琐。java.util.concurrent.CompletionService 接口为异步任务的批量提交以及获取这些任务的处理结果提供了便利。
CompletionService 接口定义的一个 submit 方法可用千提交异步任务,该方法的签名
与 ThreadPoolExecutor 的一个 submit 方法相同:
Future<V> submit(Callable<V> task)
若要获取批量提交的异步任务的处理结果,那么我们可以使用 CompletionService 接口专门为此定义的方法,其中的一个方法是:
Future<V> take()throws InterruptedException
该方法与 BlockingQueue.take()相似,它是一个阻塞方法,其返回值是一个已经执行结束的异步任务对应的 Future 实例,该实例就是提交相应任务时 submit(Callable)调用的返回值。如果 take()被调用时没有已执行结束的异步任务,那么 take()的执行线程就会被暂停,直到有异步任务执行结束。因此,我们批量提交了多少个异步任务,则多少次连续调用CompletionService.take()便可以获取这些任务的处理结果。
Java 标准库提供的 CompletionService 接口的实现类是 ExecutorCompletionService。ExecutorCompletionService 的一个构造器是:
ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
由此可见, ExecutorCompletionService 相当于 Executor 实例与 BlockingQueue 实例的一个融合体。其中, Executor 实例负责接收并执行异步任务,而 BlockingQueue 实例则用于存储已执行完毕的异步任务对应的 Future 实例 。 ExecutorCompletionService 会为其客户端提交的每个异步任务 (Callable 实例或者 Runnable 实例)都创建一个相应的 Future 实例,通过该实例其客户端代码便可以获取相应异步任务的处理结果。 ExecutorCompletionService 每执行完一个异步任务,就将该任务对应的 Future 实例存入其内部维护的 BlockingQueue 实例之中,而其客户端代码则可以通过 ExecutorCompletionService.take()调用来获取这个 Future 实例。
ExecutorService.invokeAll(Collection<? extends Callable> tasks)也能够用来批量提交异步任务,该方法能够并发执行 tasks 参数所指定的一批任务,但是该方法只有在 tasks参数所指定的一批任务中的所有任务都执行结束之后才返回,其返回值是一个包含各个任务对应的 Future 实例的列表 (List)。 因此,使用 invokeAII 方法提交批量任务的时候,任务提交方等待 invokeAII 方法返回的时间取决于这批任务中最耗时的任务的执行耗时。