线程到线程池的进阶之路
Thread与Runnable、Callable介绍
Runnable 是一个接口,该接口中只包含了一个run()方法
public interface Runnable { public abstract void run(); }
Thread 是一个类。Thread本身就实现了Runnable接口。
public class Thread implements Runnable {}
Thread 是类,而Runnable是接口;Thread本身是实现了Runnable接口的类。我们知道“一个类只能有一个父类,但是却能实现多个接口”,因此Runnable具有更好的扩展性。
此外,Runnable还可以用于“资源的共享”。即,多个线程都是基于某一个Runnable对象建立的,它们会共享Runnable对象上的资源。
通常,建议通过“Runnable”实现多线程!
Callable 接口:
Callable接口与Runnable接口相比,还有一个很大的不同:Callable接口的实例不能作为Thread线程实例的target来使用;而Runnable接口实例可以作为Thread线程实例的target构造参数,开启一个Thread线程。问题来了,Java中的线程类型,只有一个Thread类,没有其他的类型。如果Callable实例需要异步执行,就要想办法赋值给Thread的target成员,一个Runnable类型的成员。为此,Java提供了在Callable实例和Thread的target成员之间一个搭桥的类——FutureTask类。
FutureTask类的构造函数的参数为Callable类型,实际上是对Callable类型的二次封装,可以执行Callable的call方法。FutureTask类间接地继承了Runnable接口,从而可以作为Thread实例的target执行目标。
Future介绍
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方***阻塞直到任务返回结果。
Future类位于java.util.concurrent包下;在Future接口中声明了5个方法,下面依次解释每个方法的作用:
- cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
- isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
- isDone方法表示任务是否已经完成,若任务完成,则返回true;
- get()方法用来获取执行结果,这个方***产生阻塞,会一直等到任务执行完毕才返回;
- get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
FutureTask介绍
FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,可以把它理解为是可以返回结果的Runnable。
使用FutureTask的优势有:
- 可以获取线程执行后的返回结果;
- 提供了超时控制功能。
- 它实现了Runnable接口和Future接口
什么是异步计算呢?也就是说,在让该任务执行时,不需要一直等待其运行结束返回结果,而是可以先去处理其他的事情,然后再获取返回结果。例如你想下载一个很大的文件,这时很耗时的操作,没必要一直等待着文件下载完,你可以先去吃个饭,然后再回来看下文件是否下载完成,如果下载完成就可以使用了,否则还需要继续等待。
FutureTask的状态
FutureTask内部有这样几种状态:
private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
看名字应该很好理解了,当创建一个FutureTask对象是,初始的状态是NEW,在运行时状态会转换,有4中状态的转换过程:
- NEW -> COMPLETING -> NORMAL:正常执行并返回;
- NEW -> COMPLETING -> EXCEPTIONAL:执行过程中出现了异常;
- NEW -> CANCELLED;执行前被取消;
- NEW -> INTERRUPTING -> INTERRUPTED:取消时被中断。
使用FutureTask
下面看一下具体的使用过程:FutureTask实现了Runnable接口,所以需要实现run方法
public class FutureTaskTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i++) { result += i; } return result; } }); executor.execute(future); System.out.println(future.get()); } }
FutureTask构造方法
FutureTask有两个构造方法:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
第二种构造方法传入一个Runnable对象和一个返回值对象,因为Runnable是没有返回值的,所以要通过result参数在执行完之后返回结果。
run方法
FutureTask实现了Runnable接口,所以需要实现run方法。
run方法的执行过程
- 只有state为NEW的时候才执行任务;
- 执行前要设置runner为当前线程,使用CAS来设置是为了防止竞争;
- 如果任务执行成功,任务状态从NEW转换为COMPLETING,如果执行正常,设置最终状态为NORMAL;如果执行中出现了异常,设置最终状态为EXCEPTIONAL;
- 唤醒并删除Treiber Stack中的所有节点;
- 如果调用了cancel(true)方法进行了中断,要确保在run方法执行结束前的状态是INTERRUPTED。
这里涉及到3个比较重要的方法:setException,set和handlePossibleCancellationInterrupt。
get方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
这两个方法类似,首先判断状态,如果s <= COMPLETING
,说明任务已经执行完毕,但set方法或setException方法还未执行结束(还未设置状态为NORMAL或EXCEPTIONAL),这时需要将当前线程添加到waiters中并阻塞。
第二种get提供了超时功能,如果在规定时间内任务还未执行完毕或者状态还是COMPLETING,则获取结果超时,抛出TimeoutException。而第一种get会一直阻塞直到state > COMPLETING
。
FutureTask总结
FutureTask的执行过程和获取返回值的过程,要注意以下几个地方:
- FutureTask是线程安全的,在多线程下任务也只会被执行一次;
- 注意在执行时各种状态的切换;
- get方法调用时,如果任务没有结束,要阻塞当前线程,法阻塞的线程会保存在一个Treiber Stack中;
- get方法超时功能如果超时未获取成功,会抛出TimeoutException;
- 注意在取消时的线程中断,在run方法中一定要保证结束时的状态是INTERRUPTED,否则在cancel方法中可能没有执行interrupt,造成中断的泄露。
Executor框架接口
Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。
J.U.C中有三个Executor接口:
- Executor:一个运行新任务的简单接口;
- ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
- ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。
Executor接口
public interface Executor { void execute(Runnable command); }
Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法。例如,使用Thread来创建并启动线程的代码如下:
Thread t = new Thread(); t.start();
使用Executor来启动线程执行任务的代码如下:
Thread t = new Thread(); executor.execute(t);
对于不同的Executor实现,execute()方法可能是创建一个新线程并立即启动,也有可能是使用已有的工作线程来运行传入的任务,也可能是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入阻塞队列中或者拒绝接收传入的线程。
ExecutorService接口
ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。
ScheduledExecutorService接口
ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
Java线程池:ThreadPoolExecutor
ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。
线程池的运行状态
线程池一共有五种状态, 分别是:
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方***使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方***使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
下图为线程池的状态转换过程:
ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
构造方法中的字段含义如下:
corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行以下判断:
- 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
- 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
- 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
maximumPoolSize:最大线程数量;
workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;
workQueue
:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
直接切换:这种方式常用的队列是SynchronousQueue,但现在还没有研究过该队列,这里暂时还没法介绍;
使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
使用有界队列
:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
- 如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。
- 如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
- 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler
:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
execute方法
execute()方法用来提交任务
在执行execute()方法时如果状态一直是RUNNING时,执行过程如下:
- 如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中; - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);
,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0
时执行addWorker(null, false);
也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute方法执行流程如下:
addWorker方法
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) { ... }
启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
Worker类(这是一个类)
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ //构造方法 Worker(Runnable firstTask) { setState(-1); // -1:禁止在执行任务前对线程进行中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 运行方法*/ public void run() { runWorker(this); } }
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);
来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
如果正在执行任务,则不应该中断线程;
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方***使用tryLock方法来判断线程池中的线程是否是空闲状态;
之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方***中断正在运行的线程。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- getTaskCount:线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
- getPoolSize:线程池当前的线程数量;
- getActiveCount:当前线程池中正在执行任务的线程数量。
通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。
线程池进阶:ScheduledThreadPoolExecutor
自JDK1.5开始,JDK提供了ScheduledThreadPoolExecutor类来支持周期性任务的调度。在这之前的实现需要依靠Timer和TimerTask或者其它第三方工具来完成。但Timer有不少的缺陷:
- Timer是单线程模式;
- 如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
- Timer的任务调度是基于绝对时间的,对系统时间敏感;
- Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。
ScheduledThreadPoolExecutor继承ThreadPoolExecutor来重用线程池的功能,它的实现方式如下:
- 将任务封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响;
- ScheduledFutureTask实现了
java.lang.Comparable
接口和java.util.concurrent.Delayed
接口,所以有两个重要的方法:compareTo和getDelay。compareTo方法用于比较任务之间的优先级关系,如果距离下次执行的时间间隔较短,则优先级高;getDelay方法用于返回距离下次任务执行时间的时间间隔; - ScheduledThreadPoolExecutor定义了一个DelayedWorkQueue,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序;
- ScheduledFutureTask继承自FutureTask,可以通过返回Future对象来获取执行的结果。
通过如上的介绍,可以对比一下Timer和ScheduledThreadPoolExecutor:
Timer | ScheduledThreadPoolExecutor |
---|---|
单线程 | 多线程 |
单个任务执行时间影响其他任务调度 | 多线程,不会影响 |
基于绝对时间 | 基于相对时间 |
一旦执行任务出现异常不会捕获,其他任务得不到执行 | 多线程,单个任务的执行不会影响其他线程 |
所以,在JDK1.5之后,应该没什么理由继续使用Timer进行任务调度了。
ScheduledThreadPoolExecutor的构造方法
ScheduledThreadPoolExecutor有3中构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
因为ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以这里都是调用的ThreadPoolExecutor类的构造方法。
第一种构造方法示例:
scheduler = new ScheduledThreadPoolExecutor(20, threadFactory("BDP-Default-Scheduler-Thread-", true)) scheduler.setMaximumPoolSize(20) scheduler.setKeepAliveTime(5, TimeUnit.MINUTES)
scheduleAtFixedRate方法
该方法设置了执行周期,下一次执行时间相当于是上一次的执行时间加上period,它是采用已固定的频率来执行任务:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ... }
总结
ScheduedThreadPoolExecutor的实现,主要介绍了以下方面:
- ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以它也是一个线程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在于,自己实现了优先工作队列DelayedWorkQueue;
- ScheduedThreadPoolExecutor实现了ScheduledExecutorService,所以就有了任务调度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同时注意他们之间的区别;
- 内部类ScheduledFutureTask继承自FutureTask,实现了任务的异步执行并且可以获取返回结果。同时也实现了Delayed接口,可以通过getDelay方法获取将要执行的时间间隔;
- 周期任务的执行其实是调用了FutureTask类中的runAndReset方法,每次执行完不设置结果和状态。
- 详细分析了DelayedWorkQueue的数据结构,它是一个基于最小堆结构的优先队列,并且每次出队时能够保证取出的任务是当前队列中下次执行时间最小的任务。同时注意一下优先队列中堆的顺序,堆中的顺序并不是绝对的,但要保证子节点的值要比父节点的值要大,这样就不会影响出队的顺序。
总体来说,ScheduedThreadPoolExecutor的重点是要理解下次执行时间的计算,以及优先队列的出队、入队和删除的过程,这两个是理解ScheduedThreadPoolExecutor的关键。