面试官:如何开启一个线程开启大量线程会有什么问题,如何优化?
这道题想考察什么?
- 是否了解线程开启的方式?
- 开启大量线程会引起什么问题?为什么?怎么优化?
考察的知识点
- 线程的开启方式
- 开启大量线程的问题
- 线程池
考生应该如何回答
1、首先,关于如何开启一个线程,大多数人可能都会说3种,Thread、Runnable、Callback嘛!但事实却不是这样的。看JDK里怎么说的。
/** * ... * There are two ways to create a new thread of execution. One is to * declare a class to be a subclass of <code>Thread</code>. * The other way to create a thread is to declare a class that * implements the <code>Runnable</code> interface. * .... */ public class Thread implements Runnable{ }
Thread源码的类描述中有这样一段,翻译一下,只有两种方法去创建一个执行线程,一种是声明一个Thread的子类,另一种是创建一个类去实现Runnable接口。惊不惊讶,并没有提到Callback。
- 继承Thread类
public class ThreadUnitTest { @Test public void testThread() { //创建MyThread实例 MyThread myThread = new MyThread(); //调用线程start的方法,进入可执行状态 myThread.start(); } //继承Thread类,重写内部run方法 static class MyThread extends Thread { @Override public void run() { System.out.println("test MyThread run"); } } }
- 实现Runnable接口
public class ThreadUnitTest { @Test public void testRunnable() { //创建MyRunnable实例,这其实只是一个任务,并不是线程 MyRunnable myRunnable = new MyRunnable(); //交给线程去执行 new Thread(myRunnable).start(); } //实现Runnable接口,并实现内部run方法 static class MyRunnable implements Runnable { @Override public void run() { System.out.println("test MyRunnable run"); } } }
- 还是看看Callable是怎么回事吧
public class ThreadUnitTest { @Test public void testCallable() { //创建MyCallable实例,需要与FutureTask结合使用 MyCallable myCallable = new MyCallable(); //创建FutureTask,与Runnable一样,也只能算是个任务 FutureTask<String> futureTask = new FutureTask<>(myCallable); //交给线程去执行 new Thread(futureTask).start(); try { //get方法获取任务返回值,该方法是阻塞的 String result = futureTask.get(); System.out.println(result); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } //实现Callable接口,并实现call方法,不同之处是该方法有返回值 static class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(10000); return "test MyCallable run"; } } }
Callable的方式必须与FutureTask结合使用,我们看看FutureTask的继承关系
//FutureTask实现了RunnableFuture接口 public class FutureTask<V> implements RunnableFuture<V> { } //RunnableFuture接口继承Runnable和Future接口 public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
真相大白了,其实实现Callback接口创建线程的方式,归根到底就是Runnable方式,只不过它是在Runnable的基础上又增加了一些能力,例如取消任务执行等。
2、开启线程的几种方式算是答上了,那开启大量线程,或者说频繁开启线程到底会引起什么问题呢?
众所周知,在Java中,调用Thread的start方法后,该线程即置为就绪状态,等待CPU的调度。这个流程里有两个关注点需要去理解。
start内部怎样开启线程的?看看start方法是怎么实现的。
// Thread类的start方法 public synchronized void start() { // 一系列状态检查 if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { //调用start0方法,真正启动java线程的地方 start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } //start0方法是一个native方法 private native void start0();
JVM中,native方法与java方法存在一个映射关系,Java中的start0对应c层的JVM_StartThread方法,我们继续看一下。
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_StartThread"); JavaThread *native_thread = NULL; bool throw_illegal_thread_state = false; { MutexLocker mu(Threads_lock); // 判断Java线程是否已经启动,如果已经启动过,则会抛异常。 if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) { throw_illegal_thread_state = true; } else { //如果没有启动过,走到这里else分支,去创建线程 //分配c++线程结构并创建native线程 jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread)); size_t sz = size > 0 ? (size_t) size : 0; //注意这里new JavaThread native_thread = new JavaThread(&thread_entry, sz); if (native_thread->osthread() != NULL) { native_thread->prepare(jthread); } } } ...... Thread::start(native_thread);
走到这里发现,Java层已经过渡到native层,但远远还没结束。
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) : Thread() { initialize(); _jni_attach_state = _not_attaching_via_jni; set_entry_point(entry_point); os::ThreadType thr_type = os::java_thread; thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread : os::java_thread; //根据平台,调用create_thread,创建真正的内核线程 os::create_thread(this, thr_type, stack_sz); } bool os::create_thread(Thread* thread, ThreadType thr_type, size_t req_stack_size) { ...... pthread_t tid; //利用pthread_create()来创建线程 int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread); ...... return true; }
pthread_create方法,第三个参数表示启动这个线程后要执行的方法的入口,第四个参数表示要给这个方法传入的参数。
static void *thread_native_entry(Thread *thread) {
......
//thread_native_entry方法的最下面的run方法,这个thread就是上面传递下来的参数,也就是JavaThread
thread->run();
......
return 0;
}
终于开始执行run方法了!!!
//thread.cpp类 void JavaThread::run() { ...... //调用内部thread_main_inner thread_main_inner(); } void JavaThread::thread_main_inner() { if (!this->has_pending_exception() && !java_lang_Thread::is_stillborn(this->threadObj())) { { ResourceMark rm(this); this->set_native_thread_name(this->get_thread_name()); } HandleMark hm(this); //注意:内部通过JavaCalls模块,调用了Java线程要执行的run方法 this->entry_point()(this, this); } DTRACE_THREAD_PROBE(stop, this); this->exit(false); delete this; }
一条U字型代码调用链总算结束了,高呼一声,原来start之后做了这么多事情呀,稍微总结一下。
-
Java中调用Thread的star方法,通过JNI方式,调用到native层。
-
native层,JVM通过pthread_create方法创建一个系统内核线程,并指定内核线程的初始运行地址,即一个方法指针。
-
在内核线程的初始运行方法中,利用JavaCalls模块,回调到java线程的run方法,开始java级别的线程执行。
线程开启后CPU调度会发生什么?
计算机的世界里,CPU会分为若干时间片,通过各种算法分配时间片来执行任务,有耳熟能详时间片轮转调度算法、短进程优先算法、优先级算法等。当一个任务的时间片用完,就会切换到另一个任务。在切换之前会保存上一个任务的状态,当下次再切换到该任务,就会加载这个状态, 这就是所谓的线程的上下文切换。
经过上面两个理论基础的回顾,开启大量线程引起的问题,总结起来,就两个字——开销。
- 消耗时间。 线程的创建和销毁都需要时间,当数量太大的时候,会影响效率。
- 消耗内存。 创建更多的线程会消耗更多的内存,这是毋庸置疑的。线程频繁创建与销毁,还有可能引起内存抖动,频繁触发GC,最直接的表现就是卡顿。长而久之,内存资源占用过多或者内存碎片过多,系统甚至会出现OOM。
- 消耗CPU。 在操作系统中,CPU都是遵循时间片轮转机制进行处理任务,线程数过多,必然会引起CPU频繁的进行线程上下文切换。这个代价是昂贵的,某些场景下甚至超过任务本身的消耗。
3、针对上面提及到的问题,我们自然需要进行优化。线程的本质是为了执行任务,在计算机的世界里,任务分大致分为两类,CPU密集型任务和IO密集型任务。
- CPU密集型任务,比如公式计算、资源解码等。这类任务要进行大量的计算,全都依赖CPU的运算能力,持久消耗CPU资源。所以针对这类任务,其实不应该开启大量线程。因为线程越多,花在线程切换的时间就越多,CPU执行效率就越低,一般CPU密集型任务同时进行的数量等于CPU的核心数,最多再加个1。
- IO密集型任务,比如网络读写、文件读写等。这类任务不需要消耗太多的CPU资源,绝大部分时间是在IO操作上。所以针对这类任务,可以开启大量线程去提高CPU的执行效率,一般IO密集型任务同时进行的数量等于CPU的核心数的两倍。
另外,在无法避免,必须要开启大量线程的情况下,我们也可以使用线程池代替直接创建线程的做法进行优化。线程池的基本作用就是复用已有的线程,从而减少线程的创建,降低开销。
在Java中,线程池的使用还是非常方便的,JDK中提供了现成的ThreadPoolExecutor类,我们只需要按照自己的需求进行相应的参数配置即可,这里提供一个示例。
/** * 线程池使用 */ public class ThreadPoolService { /** * 线程池变量 */ private ThreadPoolExecutor mThreadPoolExecutor; private static volatile ThreadPoolService sInstance = null; /** * 线程池中的核心线程数,默认情况下,核心线程一直存活在线程池中,即便他们在线程池中处于闲置状态。 * 除非我们将ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这时候处于闲置的核心 * 线程在等待新任务到来时会有超时策略,这个超时时间由keepAliveTime来指定。一旦超过所设置的超时时间,闲 * 置的核心线程就会被终止。 * CPU密集型任务 N+1 IO密集型任务 2*N */ private final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 1; /** * 线程池中所容纳的最大线程数,如果活动的线程达到这个数值以后,后续的新任务将会被阻塞。包含核心线程数+非* * 核心线程数。 */ private final int MAXIMUM_POOL_SIZE = Math.max(CORE_POOL_SIZE, 10); /** * 非核心线程闲置时的超时时长,对于非核心线程,闲置时间超过这个时间,非核心线程就会被回收。 * 只有对ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这个超时时间才会对核心线 * 程产生效果。 */ private final long KEEP_ALIVE_TIME = 2; /** * 用于指定keepAliveTime参数的时间单位。 */ private final TimeUnit UNIT = TimeUnit.SECONDS; /** * 线程池中保存等待执行的任务的阻塞队列 * ArrayBlockingQueue 基于数组实现的有界的阻塞队列 * LinkedBlockingQueue 基于链表实现的阻塞队列 * SynchronousQueue 内部没有任何容量的阻塞队列。在它内部没有任何的缓存空间 * PriorityBlockingQueue 具有优先级的无限阻塞队列。 */ private final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque<>(); /** * 线程工厂,为线程池提供新线程的创建。ThreadFactory是一个接口,里面只有一个newThread方法。 默认为DefaultThreadFactory类。 */ private final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory(); /** * 拒绝策略,当任务队列已满并且线程池中的活动线程已经达到所限定的最大值或者是无法成功执行任务,这时候 * ThreadPoolExecutor会调用RejectedExecutionHandler中的rejectedExecution方法。 * CallerRunsPolicy 只用调用者所在线程来运行任务。 * AbortPolicy 直接抛出RejectedExecutionException异常。 * DiscardPolicy 丢弃掉该任务,不进行处理。 * DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务。 */ private final RejectedExecutionHandler REJECTED_HANDLER = new ThreadPoolExecutor.AbortPolicy(); private ThreadPoolService() { } /** * 单例 * @return */ public static ThreadPoolService getInstance() { if (sInstance == null) { synchronized (ThreadPoolService.class) { if (sInstance == null) { sInstance = new ThreadPoolService(); sInstance.initThreadPool(); } } } return sInstance; } /** * 初始化线程池 */ private void initThreadPool() { try { mThreadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, UNIT, WORK_QUEUE, THREAD_FACTORY, REJECTED_HANDLER); } catch (Exception e) { LogUtil.printStackTrace(e); } } /** * 向线程池提交任务,无返回值 * * @param runnable */ public void post(Runnable runnable) { mThreadPoolExecutor.execute(runnable); } /** * 向线程池提交任务,有返回值 * * @param callable */ public <T> Future<T> post(Callable<T> callable) { RunnableFuture<T> task = new FutureTask<T>(callable); mThreadPoolExecutor.execute(task); return task; } }