Java多线程

image-20210116203349505

Java多线程基础常见面试题

本文参考了一些技术文章,并加上自己的理解整理出来的一些Java多线程基础常见面试题解答,会持续更新。供本人复习可用,路过的小伙伴要是感兴趣也可以看看,欢迎指错。

什么是进程和线程?

进程

==进程是正在运行的程序,它单独占有内存地址空间。==系统运行一个程序就是一个进程从创建,运行到消亡的过程。

为什么会有进程的出现呢?

进程让操作系统的并发成为了可能。因为在早期的批处理操作系统中,虽然用户可以将多个需要执行的程序写在磁带上交给计算机处理,但计算机的内存始终只有一个程序在运行,也就是串行运行方式,效率是非常低的,所以就有聪明的科学人提出了进程的概念。

此时CPU采用了时间片轮转的方式运行进程。当时间片结束时,当前进程还在运行,CPU会暂停这个进程运行,并运行下一个时间片的进程。(这叫上下文切换)。当时间片还未结束,当前进程就结束或者阻塞了,CPU也会立马切换,不用等时间片用完。

比方说,当我们启动main函数时就是启动了一个JVM进程,而main函数所在的线程就是这个进程中的一个线程,也称主线程

线程

线程是比进程还小的执行单位,它负责执行进程中的子任务。一个线程能够分割成多个线程来执行。

为什么我们还要使用多线程呢?

当随着需求的发展,我们不再想一个程序只实现一个功能,而是多个功能,这就需要进程背负多个子任务。但是这样只能逐个有序地去执行这些子任务,效率很低。比方说,一款视频软件在播放视频的时候,不能发评论,得等视频播放完才可以评论。这样是很不爽的。

因此有了线程,线程让进程内部的并发成了可能

而且还有以下好处:

  • 线程共享进程的堆和方法区资源,从而实现简单通信。而进程间存在内存隔离,数据共享和程序通信是比较复杂的。
  • 除了共享资源区域外,每个线程还拥有自己的程序计数器、虚拟机栈和本地方法栈,所以进行线程切换时开销要比切换进程小。因此线程也被叫做轻量级进程

它们的区别是什么?

本质的区别是进程是操作系统进行资源分配的基本单位,线程是操作系统进行调度的基本单位。

除此之外:

  • 进程单独占有一定的内存地址空间,通信复杂同步简单;而线程共享所属进程的数据,通信简单但同步复杂。
  • 进程的创建和销毁不仅要保存寄存器和栈信息,还要资源的分配回收以及页调度,开销比较大;而线程只需要保存前两者的信息,开销比较小。

什么是上下文切换?

上下文切换是指进程/线程从保存到再加载的过程

CPU通过为每个线程分配时间片实现任务并发执行功能,在切换的过程中,会保存上一个任务的状态,以保证下次切换回来时是正确的执行位置。

创建线程的方式有哪几种?

总共有以下四种:

  • 继承Thread类

    public class MyThread extends Thread{
        @Override
        public void run() {
            System.out.println("MyThread");
        }
    }
  • 实现Runnable接口

    实现接口会比继承Thread类灵活。

    //第一种方式
    public class MyThread2 implements Runnable{
    
        @Override
        public void run() {
            System.out.println("MyThread2");
        }
    
    }
    
    public class Demo2 {
        public static void main(String[] args) {
            //start方法是Thread类拥有的
            MyThread2 myThread2 = new MyThread2();
            new Thread(myThread2).start();
    
            //第二种方式:Java8之后 使用函数式接口
            new Thread(()->{
                System.out.println("MyThread2");
            }).start();
        }
    }
  • 实现Callable接口

​ Runnable方法没有返回值,而Callable接口的方法有并且能抛出异常。

​ 我们通过JDK文档找到了将Callable和Thread建立关联的类FutureTask

image-20201208195600999

​ 一般配合线程池一起使用,下面举个栗子:

public class CallableDemo {
    static class Task implements Callable<Integer>{

        @Override
        public Integer call() throws Exception {
            //模拟计算需要一秒
            Thread.sleep(1000);
            return 2;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> result = new FutureTask<>(task);
        executor.submit(result);
        //会阻塞当前线程,直到获取结果
        System.out.println(result.get());
    }
}

结果:

2

Runable与Callable的区别

  • Runnable ⾃Java 1.0以来⼀直存在,但 Callable 仅在Java 1.5中引⼊,⽬的就是为了来处理 Runnable 不⽀持的⽤例。 Runnable 接⼝不会返回结果或抛出检查异常,但是 Callable 接⼝可以。所以,如果任务不需要返回结果或抛出异常推荐使⽤ Runnable 接⼝,这样代码看起来会更加简洁。

  • ⼯具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。( Executors.callable(Runnable task )Executors.callable(Runnable task,Object resule) )。

  • 通过线程池来创建

Java线程的状态以及如何转换

我们知道操作系统的状态有三个:

  • ready (就绪状态)线程正在等待CPU的调度,即将进入running状态。
  • running (执行状态)线程正在使用CPU。
  • waiting (等待状态)线程经过等待事件或者正在等待其他资源。

Java线程有六大状态

它把ready和running状态合并成了RUNNABLE状态,把waiting状态拆分成了阻塞、等待和超时等待三个状态。

image-20201208172430200

wait方法和sleep方法的区别

  • sleep方法释放CPU资源, 没有释放锁;而wait方法释放CPU资源同时释放了锁。
  • sleep方法是Thread,而wait方法是Object的。
  • sleep方法通常用来暂停线程的运行,而wait方法通常用来通信与交互,跟notify()方法搭配。
  • wait可以指定时间,也可以不指定;而sleep必须指定时间。
  • wait必须放在同步块或者同步方法中,但sleep可以在任意位置。

Java线程间的通信有哪几种?

  • 线程同步 (按一定顺序执行)需要 synchronized锁

  • 等待、通知 wait() notify()

  • 信号量 用 volatile来实现

    • volatile可以实现内存的可见性
    • int volatile signal;
  • 管道通信 多半与I/O流有关,传输文件或者字符串

    • JDK提供了PipeWriter、PipeReader、PipeInputStream、PipeOutputStream
  • 其他方式

    • join() 相当于被启动的那个线程是宾客,启动它的那个线程是主人,等宾客走了,主人才会结束掉宴会打扫。

    • sleep()

      • 与 wait的区别
    • ThreadLocal类

      • 原理:JVM 中维护一个Map,这个Map的key 就是当前的线程对象,而value则是线程通过ThreadLocal.set方法保存的对象实例。当线程调用ThreadLocal.get方法时,ThreadLocal会根据当前线程对象的引用,取出Map中对应的对象返回。这样,ThreadLocal通过以各个线程对象的引用作为区分,从而将不同线程的变量隔离开来。

      • ThreadLocal的作用:实现线程范围内的局部变量,即ThreadLocal在一个线程中是共享的,在不同线程之间是隔离的。因为每个Thread对象都有它自己存放进ThreadLocalMap的值。

      • 适用场景:

        • 每个线程需要有自己单独的实例

        • 实例需要在多个方法***享,但不希望被多线程共享

        • 最常见的用来解决数据库连接、Session管理等。https://my.oschina.net/huangyong/blog/159725

          简单地说,就是多线程情况下,如果没有ThreadLocal,连接有可能被其他线程关闭。

    • InheritableThreadLocal

      • 不仅仅是当前线程可以存取副本值,而且它的子线程也可以存取这个副本值。

线程池

为什么要用线程池

池化技术,线程池、数据库连接池、Http连接池等等都是这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可用不需要等到线程创建就能执行。
  • 提高线程的可管理性。 使用线程池可以进行统一的分配,调优和监控。

7大参数

ThreadPoolExecutor有四个构造函数,有7大参数:

  • int corPoolSize 该线程池中核心线程数最大值

  • int maximumPoolSize 该线程池中线程总数最大值

  • long keepAliveTime 非核心线程闲置超时时长

  • TimeUnit unit keepAliveTime的单位

    TimeUnit是⼀个枚举类型 ,包括以下属性:

    NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 =

    1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒

    MINUTES : 分 HOURS : ⼩时 DAYS : 天

  • BlockingQueue workQueue 阻塞队列,维护着等待执行的Runnable任务对象。

  • ThreadFactory threadFactory 创建线程的工厂,用于批量创建线程,统一在创建线程时设置一些参数,如是否为守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。

  • RejectedExecutionHandler handler 拒绝处理策略


原理

线程池的状态

线程池本身有⼀个调度线程,这个线程就是⽤于管理布控整个线程池⾥的各种任务 和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

故线程池也有⾃⼰的状态。 ThreadPoolExecutor 类中定义了⼀个 volatile int 变量runState来表示线程池的状态 ,分别为RUNNING、SHURDOWN、STOP、TIDYING 、TERMINATED。

线程池创建后处于RUNNING状态。

调⽤shutdown()⽅法后处于SHUTDOWN状态,线程池不能接受新的任务,清 除⼀些空闲worker,会等待阻塞队列的任务完成。

调⽤shutdownNow()⽅法后处于STOP状态,线程池不能接受新的任务,中断 所有线程,阻塞队列中没有被执⾏的任务全部丢弃。此时,poolsize=0,阻塞队 列的size也为0。

当所有的任务已终⽌,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。

接着会执⾏terminated()函数。

ThreadPoolExecutor中有⼀个控制状态的属性叫ctl,它是⼀个AtomicInteger类型的变量。

线程池处在TIDYING状态时,执⾏完terminated()⽅法之后,就会由 TIDYING**-> TERMINATED**, 线程池被设置为TERMINATED状态。

主要任务处理流程

处理任务的核心方法是execute,我们看看JDK1.8源码中的ThreadPoolExecutor是如何处理线程任务的:

// JDK 1.8 
public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException(); 
 int c = ctl.get();
 // 1.当前线程数⼩于corePoolSize,则调⽤addWorker创建核⼼线程执⾏任务
 if (workerCountOf(c) < corePoolSize) {
     if (addWorker(command, true))
         return;
     c = ctl.get();
 }
 // 2.如果不⼩于corePoolSize,则将任务添加到workQueue队列。
    //workQueue.offer若是满了,会返回false
 if (isRunning(c) && workQueue.offer(command)) {
     int recheck = ctl.get();
 // 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执⾏拒绝策略。
 if (! isRunning(recheck) && remove(command))
     reject(command);
 // 2.2 线程池处于running状态,但是没有线程,则创建线程
 else if (workerCountOf(recheck) == 0)
     addWorker(null, false);
 }
 // 3.如果放⼊workQueue失败,则创建⾮核⼼线程执⾏任务,
 // 如果这时创建⾮核⼼线程失败(当前线程总数不⼩于maximumPoolSize时),就会执⾏拒绝策略。
 else if (!addWorker(command, false))
     reject(command);
}

大致过程:

image-20210115210242281

总结一下

  1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务 (让核心线程数量快速达到corePoolSzie)。注意,这一步需要获得全局锁
  2. 线程总数量 >= corePoolSize,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行。体现了线程复用
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是创建非核心线程去执行这个任务。注意,这一步需要获得全局锁
  4. 缓存队列满了,且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。

如何做到线程复用

我们知道,⼀个线程在创建的时候会指定⼀个线程任务,当执⾏完这个线程任务之 后,线程⾃动销毁。但是线程池却可以复⽤线程,即⼀个线程执⾏完线程任务后不 销毁,继续执⾏另外的线程任务。那么,线程池如何做到线程复⽤呢?

原来,ThreadPoolExecutor在创建线程时,会将线程封装成⼯作线程worker,并放 ⼊⼯作线程组中,然后这个worker反复从阻塞队列中拿任务去执⾏。

我们来看源码进行分析:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                //1. 若core是true,证明要创建的线程是核心线程,跟核心线程的阈值比较;
                //2, 若core是false,证明要创建的线程不是核心线程,跟最大阈值比较
                wc >= (core ? corePoolSize : maximumPoolSize))
                //超过,则返回false;然后上个方法再调用拒绝策略
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //1. 创建一个Worker对象
        w = new Worker(firstTask);
        //2.实例化一个Thread对象
        final Thread t = w.thread;
        if (t != null) {
            //3.线程池全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //4.启动这个线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker()上半部分是判断是否超过阈值,若超过则返回fasle;

下半部分是创建worker对象,并初始化一个Thread对象,然后启动它。

我们接着来看worker类:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;


    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); //线程任务就是自己
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    } //当t.start时,这个run方***被jvm调用

    ...其余代码省略

Worker类继承了Runnable接口,所以Worker也是一个线程任务,在构造器中创建了一个线程,而这个线程的任务就是自己。所以addWorker的下半部分的t.start()时会触发Worker类的runWorker(this)被jvm调用。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //worker的任务,也就是我们传进去的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        //1. 线程启动后,通过unlock去释放锁
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //2. Worker执行我们传进去的任务,或者自己去任务队列取任务
            while (task != null || (task = getTask()) != null) { //会不断的去拿任务
                //2.1 进行加锁操作,保证线程不被其他线程中断,除非线程池被中断
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //2.2 检查线程池状态,如果处于中断状态,当前线程被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while循环中,worker会不断地调用getTask()方法从阻塞队列中获取任务然后调用task.run()执行任务,从而达到线程复用的目的。只要getTask()不返回null,此线程就不会退出。

最后看下getTask()方法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //1. allowCoreThreadTimeOut默认是false,就是核心线程即使空闲也不会被销毁
        //timed为true的情况有 核心线程只能超时等待 或 目前worker数大于核心线程数
        //若为true 核心线程在keepAliveTime内仍空闲则会被销毁
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //2. 如果运行线程超过了最大线程数 或 超时后设置了true
        //这时候任务队列已空,会去递减Worker数量,并返回null,这时候runWorker()结束,线程被回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //3. 若为true 超时 poll返回null;
            //若为false 核心线程会被卡在take()方法里,并被挂起,等待任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take(); //两个方法都是拿任务
            if (r != null)
                //返回任务
                return r;
            //拿不到任务,设置为true 在下一轮循环递减worker数
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

核心线程的会一直卡在workQueue.take方法里,被挂起,不会占用CPU资源,直到拿到Runnable然后返回(当然还得看allowCoreThreadTimeOut,如果它设置为true,那么核心线程就会去调用poll方法,因为poll方法可能会返回null,所以这时候核心线程满足超时条件也会被销毁)。

⾮核⼼线程会workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) ,如果 超时还没有拿到,下⼀次循环判断compareAndDecrementWorkerCount就会返 回 null ,Worker对象的 run() ⽅法循环体的判断为 null ,任务结束,然后线程被系统回收 。

4种拒绝策略

拒绝处理策略,线程数量⼤于最⼤线程数就会采⽤拒绝处理策略,四种拒绝处理的策略为 :

  1. ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。

  2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。

  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执⾏程序(如果再次失败,重复此过程)。

  4. ThreadPoolExecutor.CallerRunsPolicy:由调⽤线程处理该任务。

4种常见的线程池

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CacheThreadPool 的运⾏流程如下:

  1. 提交任务进线程池。

  2. 因为corePoolSize为0的关系,不创建核⼼线程,线程池最⼤为Integer.MAX_VALUE。

  3. 尝试将任务添加到SynchronousQueue队列。

  4. 如果SynchronousQueue⼊列成功,等待被当前运⾏的线程空闲后拉取执⾏。 如果当前没有空闲线程,那么就创建⼀个⾮核⼼线程,然后从SynchronousQueue拉取任务并在当前线程执⾏。

  5. 如果SynchronousQueue已有任务在等待,⼊列操作将会阻塞。

当需要执⾏很多短时间的任务时,CacheThreadPool的线程复⽤率⽐较⾼, 会显 著的提⾼性能。⽽且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占⽤很多资源。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

核⼼线程数量和总线程数量相等,都是传⼊的参数nThreads,所以只能创建核⼼线 程,不能创建⾮核⼼线程。因为LinkedBlockingQueue的默认⼤⼩是Integer.MAX_VALUE,故如果核⼼线程空闲,则交给核⼼线程处理;如果核⼼线程 不空闲,则⼊列等待,直到核⼼线程空闲。

CachedThreadPool的区别:

  • 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核 ⼼线程。 ⽽CachedThreadPool因为corePoolSize=0,所以只会创建⾮核⼼线程。

  • 在 getTask() ⽅法,如果队列⾥没有任务可取,线程会⼀直阻塞在LinkedBlockingQueue.take() ,线程不会被回收CachedThreadPool会在60s后收回。

  • 由于线程不会被回收,会一直卡着阻塞,所以没有任务的情况下,newFixedThreadPool占用资源更多

  • 都几乎不会触发拒绝策略,但是原理不同。newFixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;newCachedThreadPool是因为线程池很大,几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

有且仅有⼀个核⼼线程( corePoolSize == maximumPoolSize=1),使⽤了LinkedBlockingQueue(容量很⼤),所以,不会创建⾮核⼼线程。所有任务按照 先来先执⾏的顺序执⾏。如果这个唯⼀的线程不空闲,那么新来的任务会存储在任 务队列⾥等待执⾏。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize
                                                              return new ScheduledThreadPoolExecutor(corePoolSize);
                                                              }
                                                              //ScheduledThreadPoolExecutor():
                                                              public ScheduledThreadPoolExecutor(int corePoolSize) {
                                                                  super(corePoolSize, Integer.MAX_VALUE,
                                                                        DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                                                                        new DelayedWorkQueue());
                                                              }

四种常⻅的线程池基本够我们使⽤了,但是《阿⾥把把开发⼿册》不建议我们直接 使⽤Executors类中的线程池,⽽是通过 ThreadPoolExecutor 的⽅式,这样的处理 ⽅式让写的同学需要更加明确线程池的运⾏规则,规避资源耗尽的⻛险。 但如果你及团队本身对线程池⾮常熟悉,⼜确定业务规模不会⼤到资源耗尽的程度 (⽐如线程数量或任务队列⻓度可能达到Integer.MAX_VALUE)时,其实是可以使⽤JDK提供的这⼏个接⼝的,它能让我们的代码具有更强的可读性。

阻塞队列

BlockingQueue接口提供了线程安全的队列访问方式,让我们不用担心在多线程环境下存、取共享变量的线程安全问题。

阻塞队列提供了四组不同的⽅法⽤于插⼊、移除、检查元素:

image-20210116133934057

  • 抛出异常:如果试图的操作⽆法⽴即执⾏,抛异常。当阻塞队列满时候,再往队列⾥插⼊元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列⾥获取元素时会抛出NoSuchElementException异常 。

  • 返回特殊值:如果试图的操作⽆法⽴即执⾏,返回⼀个特殊值,通常是true / false。

  • ⼀直阻塞:如果试图的操作⽆法⽴即执⾏,则⼀直阻塞或者响应中断。

  • 超时退出:如果试图的操作⽆法⽴即执⾏,该⽅法调⽤将会发⽣阻塞,直到能 够执⾏,但等待时间不会超过给定值。返回⼀个特定值以告知该操作是否成功,通常是 true / false

BlockingQueue的实现类

ArrayBlockingQueue

数组结构组成的有界阻塞队列。内部结构是数组。

public ArrayBlockingQueue(int capacity, boolean fair){
 //..省略代码 }

可以初始化队列⼤⼩, 且⼀旦初始化不能改变。构造⽅法中的fair表示控制对象的 内部锁是否采⽤公平锁,默认是⾮公平锁。

LinkedBlockingQueue

链表结构组成的有界阻塞队列。。内部结构是链表,具有链表的特性。默认队列的 ⼤⼩是 Integer.MAX_VALUE ,也可以指定⼤⼩。此队列按照先进先出的原则对元素进⾏排序。

DelayQueue

该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞。

PriorityBlockingQueue

基于优先级的无界阻塞队列。

SynchronousQueue

没有任务容量,每个put必须等待一个take,反之亦然。

需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。

以下⽅法的返回值,可以帮助理解这个队列:

iterator() 永远返回空,因为⾥⾯没有东⻄

peek() 永远返回null

put() 往queue放进去⼀个element以后就⼀直wait直到有其他thread进来把这个element取⾛。

offer() 往queue⾥放⼀个element后⽴即返回,如果碰巧这个element被另⼀个

thread取⾛了,offer⽅法返回true,认为offer成功;否则返回false。

take() 取出并且remove掉queue⾥的element,取不到东⻄他会⼀直等。

poll() 取出并且remove掉queue⾥的element,只有到碰巧另外⼀个线程正在往

queue⾥offer数据或者put数据的时候,该⽅法才会取到东⻄。否则⽴即返回null。

isEmpty() 永远返回true

remove()&removeAll() 永远返回false

注意

PriorityBlockingQueue不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用时要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有可用堆内存空间。对于默认大小的LinkedBlockingQueue也是一样的。

阻塞队列的原理

   /** The queued items */
    //数据元素数组
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    //下一个待取出元素索引
    int takeIndex;

    /** items index for next put, offer, or add */
    //下一个待添加元素索引
    int putIndex;

    /** Number of elements in the queue */
    //元素个数
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    //内部锁
    final ReentrantLock lock;

    /** Condition for waiting takes */
    //消费者监视器
    private final Condition notEmpty;

    /** Condition for waiting puts */
    //生产者监视器
    private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
 //..省略其他代码
 lock = new ReentrantLock(fair);
 notEmpty = lock.newCondition();
 notFull = lock.newCondition();
}

public Condition newCondition() {
    return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}

put操作源码

public void put(E e) throws InterruptedException {
 checkNotNull(e);
 final ReentrantLock lock = this.lock;
 // 1.⾃旋拿锁
 lock.lockInterruptibly();
 try {
 // 2.判断队列是否满了
 while (count == items.length)
     // 2.1如果满了,阻塞该线程,并标记为notFull线程,
     // 等待notFull的唤醒,唤醒之后继续执⾏while循环。
     notFull.await();
     // 3.如果没有满,则进⼊队列
     enqueue(e);
 } finally {
     lock.unlock();
 }
}
private void enqueue(E x) {
 // assert lock.getHoldCount() == 1;
 // assert items[putIndex] == null;
 final Object[] items = this.items;
 items[putIndex] = x;
 //入完后队列已满,索引置0
 if (++putIndex == items.length)
     putIndex = 0;
 //元素个数加1
 count++;
 // 4 唤醒⼀个等待的线程
 notEmpty.signal();
}

总结put流程:

  1. 所有执行put操作的线程竞争lock锁,拿到lock锁的线程进入下一步,没有拿到的自旋竞争锁
  2. 判断阻塞队列是否满了,如果满了就调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。
  3. 如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
  4. 唤醒一个标记为notEmpty(消费者)的线程。

take操作源码:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //若当前数据为空,线程被阻塞
        while (count == 0)
            notEmpty.await();
        //取数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒一个生产者线程
    notFull.signal();
    return x;
}

总结take操作的流程:

  1. 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到的自旋竞争锁。
  2. 判断阻塞队列是否为空,如果为空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。
  3. 如果没有空,则调用dequeue方法。注意这一步还有一种情况是第二步阻塞的线程被唤醒且又拿到了lock锁的线程。
  4. 唤醒一个标记为notFull(生产者)线程。

注意

  1. put和tack操作都需要先获取锁,没有获取到锁的线程会被挡在第⼀道⼤⻔之外⾃旋拿锁,直到获取到锁。

  2. 就算拿到锁了之后,也不⼀定会顺利进⾏put/take操作,需要判断队列是否可⽤(是否满/空),如果不可⽤,则会被阻塞,并释放锁。

  3. 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执⾏,否则,⾃旋拿锁,拿到锁了再while判断队列是否可⽤(这也是为什么不⽤if判断,⽽使⽤while判断的原因)。

使用场景

生产者-消费者模式

public class ProducerAndConsumerDemo {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);

    public static void main(String[] args) {
        ProducerAndConsumerDemo demo = new ProducerAndConsumerDemo();
        Producer producer = demo.new Producer();
        Consumer consumer = demo.new Consumer();

        producer.start();
        consumer.start();

    }
    class Producer extends Thread{
        @Override
        public void run() {
            produce();
        }

        private void produce(){
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列插入一个元素,队列剩余空间"+queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread{
        @Override
        public void run() {
            consume();
        }

        private void consume(){
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取出一个元素,队列剩余"+queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

}

手写BlockingQueue

package bq;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyBlockingQueue {
    //数据元素数组
    private Object[] items;
    //元素个数
    private int count;
    //内部锁
    private ReentrantLock lock;
    //下一个待添加元素索引
    private int toPutIndex;
    //下一个待取出元素索引
    private int toTakeIndex;
    //生产者
    private Condition notFull;
    //消费者
    private Condition notEmpty;



    MyBlockingQueue(int capacity, boolean fair){
        this.lock = new ReentrantLock(fair);
        this.items = new Object[capacity];
        this.notFull = lock.newCondition();
        this.notEmpty = lock.newCondition();
    }

    public void put(Object o) throws InterruptedException {
        if(o == null) throw new  NullPointerException();
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try{
            while(count == items.length)
                notFull.await();
            enqueue(o);
            System.out.println("放入了"+o);
        }finally {

            lock.unlock();
        }
    }

    public void enqueue(Object o){
        Object[] items = this.items;
        items[toPutIndex] = o;
        if(++toPutIndex == items.length)
            toPutIndex = 0;
        count++;
        notEmpty.signal();
    }

    public Object take() throws InterruptedException {
        ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try{
            while(count == 0)
                notEmpty.await();
            return dequeue();

        }finally {
            lock.unlock();
        }
    }

    public Object dequeue(){
        Object[] items = this.items;
        Object result = items[toTakeIndex];
        items[toTakeIndex] = null;
        if(++toTakeIndex == items.length)
            toTakeIndex = 0;
        count--;
        notFull.signal();
        return result;
    }

    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10,true);
        new Thread(()->{
            try {
                for (int i = 0; i < 10; i++) {
                    myBlockingQueue.put(i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{

            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(2000);
                    System.out.println("拿出了"+myBlockingQueue.take());

                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    }
}

结果:

放入了0
放入了1
拿出了0
放入了2
放入了3
拿出了1
放入了4
放入了5
放入了6
拿出了2
放入了7
放入了8
拿出了3
放入了9
拿出了4
拿出了5
拿出了6
拿出了7
拿出了8
拿出了9

CAS

当涉及到悲观锁和乐观锁时,悲观锁思想的实现有很多种我们熟悉的,比如Java中的synchronizeReentrenlock,以及数据库中的行锁和表锁等。

而乐观锁有CAS多版本机制,乐观锁总是假设对共享资源的访问不会有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为CAS的技术来保证线程执行的安全性。

CAS的概念

CAS是一种原子操作,从CPU层面保证了它的原子性。

当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新。其余会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

Java实现CAS的原理 —— 原子类

JUC当中原子类有四类:

基本类型

使⽤原⼦的⽅式更新基本类型

  • AtomicInteger :整形原⼦类

  • AtomicLong :⻓整型原⼦类

  • AtomicBoolean :布尔型原⼦类

数组类型

使⽤原⼦的⽅式更新数组⾥的某个元素

  • AtomicIntegerArray :整形数组原⼦类

  • AtomicLongArray :⻓整形数组原⼦类

  • AtomicReferenceArray :引⽤类型数组原⼦类

引⽤类型

  • AtomicReference :引⽤类型原⼦类

  • AtomicStampedReference :原⼦更新带有版本号的引⽤类型。该类将整数值与引⽤关联起来,可⽤于解决原⼦的更新数据和数据的版本号,可以解决使⽤ CAS 进⾏原⼦更新时可能出现的 ABA 问题。

  • AtomicMarkableReference :原⼦更新带有标记位的引⽤类型

对象的属性修改类型

  • AtomicIntegerFieldUpdater :原⼦更新整形字段的更新器

  • AtomicLongFieldUpdater :原⼦更新⻓整形字段的更新器

  • AtomicReferenceFieldUpdater :原⼦更新引⽤类型字段的更新器

AtomicInteger类的源码分析

 // setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用)
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

//CAS操作修改值
 public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

//unsafe类的方法 
public final int getAndSetInt(Object var1, long var2, int var4) {
    int var5;
    do {
        //通过value这个对象字段偏移量和AtomicInteger这个类对象得到的主内存的值
        var5 = this.getIntVolatile(var1, var2);
        //跟工作内存的比较,失败则自旋
    } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    //返回旧值
    return var5;
}

AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。

CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。

CAS造成的问题

ABA问题 可以用版本号解决,或者是使用Java的atomic包里提供的一个类AtomicStampedReference来解决问题。

循环时间长开销大

CAS多与自旋结合。如果自旋CAS长时间不成功,会占用大量的CPU资源。

可以用pause指令解决

只能保证一个共享变量的原子性

有两种解决方案:

  1. 使用JDK1.5开始就提供的AtomicReference类保证对象之间的原子性,把变量放到一个对象里边进行CAS操作。
  2. 使用锁。
全部评论

相关推荐

比亚迪汽车新技术研究院 硬件工程师 总包21左右 硕士
点赞 评论 收藏
分享
像好涩一样好学:这公司我也拿过 基本明确周六加班 工资还凑活 另外下次镜头往上点儿
点赞 评论 收藏
分享
Noob1024:一笔传三代,人走笔还在
点赞 评论 收藏
分享
服从性笔试吗,发这么多笔,现在还在发。
蟑螂恶霸zZ:傻 x 公司,发两次笔试,两次部门匹配挂,
投递金山WPS等公司10个岗位 >
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务