多线程与高并发

1.Callable和Runnable

 

I    Callable定义的方法是call,而Runnable定义的方法是run。

II   Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。

III  Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。

    Runnable 不做具体介绍

通过实现Callable接口来创建Thread线程:其中,Callable接口(也只有一个方法)定义如下:

public interface Callable<V>  
{  
    V call() throws Exception;  
}
步骤1:创建实现Callable接口的类SomeCallable<Integer>(略);

步骤2:创建一个类对象:

      Callable<Integer> oneCallable = new SomeCallable<Integer>();

步骤3:由Callable<Integer>创建一个FutureTask<Integer>对象:

      FutureTask<Integer> oneTask = new FutureTask<Integer>(oneCallable);

      注释:FutureTask<Integer>是一个包装器,它通过接受Callable<Integer>来创建,它同时实现了Future和Runnable接口。
步骤4:由FutureTask<Integer>创建一个Thread对象:

       Thread oneThread = new Thread(oneTask);

步骤5:启动线程:

       oneThread.start();

 

至此,一个线程就创建完成了。

例:

 

package reed.meituan.com;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class CallableAndFuture1 {
    public static void main(String[] args) throws Exception{
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        };
        FutureTask<Integer> futureTask = new FutureTask<Integer>(callable);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

callable和future,一个产生结果,一个拿到结果

FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值..

 下面来看另一种方式使用Callable和Future,通过ExecutorService的submit方法执行Callable,并返回Future,代码如下:

 

import java.util.Random;
import java.util.concurrent.*;
public class CallableAndFuture {
    public static void main(String[] args) throws Exception{
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        Future<Integer> future = threadPool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });
        Thread.sleep(5000);
        System.out.println(future.get());
    }
}

 

 

 

 

 

 

2.锁的相关概念介绍

1)可重入锁

  如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。

举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

  看下面这段代码就明白了:

class MyClass {

    public synchronized void method1() {

        method2();

    }

    

    public synchronized void method2() {

        

    }

}

上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,

假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。

而由于synchronized和Lock都具备可重入性,所以不会发生上述现象。

2)可中断锁

  可中断锁:顾名思义,就是可以相应中断的锁。

  在Java中,synchronized就不是可中断锁,而Lock是可中断锁。

  如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,

       我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。

  lockInterruptibly()可体现了Lock的可中断性。

     

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by fanqunsong on 2017/8/5.
 */
public class LockTest {

    public static void main(String[] args) {

        Thread t1 = new Thread(new RunIt());
        Thread t2 = new Thread(new RunIt());
        t1.start();
        t2.start();
        t2.interrupt();
    }
}
class RunIt implements Runnable{
    
    private static Lock lock = new ReentrantLock();

    @Override
    public void run() {

        try {
            //----------------------a
            //lock.lock();
            lock.lockInterruptibly();
            System.out.println(Thread.currentThread().getName() + " running");
            TimeUnit.SECONDS.sleep(5);
            lock.unlock();
            System.out.println(Thread.currentThread().getName()+" finishend");
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+ " interrupted");
        }
    }
}


如果a处 是lock.lock(); 
输出 
Thread-0 running 
(这里休眠了5s) 
Thread-0 finished 
Thread-1 running 
Thread-1 interrupted

 

 

 

============================ 

如果a处是lock.lockInterruptibly() 
Thread-0 running 
Thread-1 interrupted 
(这里休眠了5s) 
Thread-0 finished
 

3)公平锁

  公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。

  非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。

  在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。

  而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

       在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。

  我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:

        ReentrantLock lock = new ReentrantLock(true);

   如果参数为true表示为公平锁,为fasle为非公平锁。默认情况下,如果使用无参构造器,则是非公平锁。

 

4)读写锁

  读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。

  正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。

  ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。

  可以通过readLock()获取读锁,通过writeLock()获取写锁。

       在多线程开发中,经常会出现一种情况,我们希望读写分离。就是对于读取这个动作来说,可以同时有多个线程同

时去读取这个资源,但是对于写这个动作来说,只能同时有一个线程来操作,而且同时,当有一个写线程在操作这个资

源的时候,其他的读线程是不能来操作这个资源的,这样就极大的发挥了多线程的特点,能很好的将多线程的能力发挥。

 

package reed.thread;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Data data = new Data();
        for (int i = 0; i <3 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int j = 0; j <5;j++) {
                        data.set(new Random().nextInt(100));
                    }
                }
            }
            ).start();
        }
        for (int i = 0; i <3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int j = 0; j <5;j++){
                        data.get();
                    }
                }
            }).start();
        }
    }
}
class Data{
    private int data;//共享数据1
    private ReadWriteLock rwl = new ReentrantReadWriteLock();
    public void set(int data){
        rwl.writeLock().lock();// 取到写锁
        try{
            System.out.println(Thread.currentThread().getName()+"准备写入数据");
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.data = data;
            System.out.println(Thread.currentThread().getName() + "写入" + this.data);
        }finally {
            rwl.writeLock().unlock();
        }
    }
    public void get() {
        rwl.readLock().lock();// 取到读锁
        try {
            System.out.println(Thread.currentThread().getName() + "准备读取数据");
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "读取" + this.data);
        } finally {
            rwl.readLock().unlock();// 释放读锁
        }
    }
}


运行结果
Thread-1准备写入数据
Thread-1写入57
Thread-4准备读取数据
Thread-3准备读取数据
Thread-5准备读取数据
Thread-4读取57
Thread-5读取57
Thread-3读取57
Thread-2准备写入数据
Thread-2写入64
Thread-2准备写入数据
Thread-2写入82
Thread-2准备写入数据
Thread-2写入26
Thread-4准备读取数据
Thread-5准备读取数据
Thread-3准备读取数据
Thread-5读取26
Thread-4读取26
Thread-3读取26
Thread-4准备读取数据
Thread-5准备读取数据
Thread-3准备读取数据
Thread-5读取26
Thread-3读取26
Thread-4读取26
Thread-3准备读取数据
Thread-5准备读取数据
Thread-4准备读取数据
Thread-5读取26
Thread-3读取26
Thread-4读取26
Thread-3准备读取数据
Thread-5准备读取数据
Thread-4准备读取数据
Thread-4读取26
Thread-5读取26
Thread-3读取26

 

 

 

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程
 * 
 * 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
 * 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
 * @author
 *
 */
public class MyReentrantReadWriteLock {
	 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
     
	    public static void main(String[] args)  {
	        final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();
	         
	        new Thread(){
	            public void run() {
	                test.get(Thread.currentThread());
	                test.write(Thread.currentThread());
	            };
	        }.start();
	         
	        new Thread(){
	            public void run() {
	                test.get(Thread.currentThread());
	                test.write(Thread.currentThread());
	            };
	        }.start();
	         
	    }  
	    
	    /**
	     * 读操作,用读锁来锁定
	     * @param thread
	     */
	    public void get(Thread thread) {
	        rwl.readLock().lock();
	        try {
	            long start = System.currentTimeMillis();
	             
	            while(System.currentTimeMillis() - start <= 1) {
	                System.out.println(thread.getName()+"正在进行读操作");
	            }
	            System.out.println(thread.getName()+"读操作完毕");
	        } finally {
	            rwl.readLock().unlock();
	        }
	    }

	    /**
	     * 写操作,用写锁来锁定
	     * @param thread
	     */
	    public void write(Thread thread) {
	        rwl.writeLock().lock();;
	        try {
	            long start = System.currentTimeMillis();
	             
	            while(System.currentTimeMillis() - start <= 1) {
	                System.out.println(thread.getName()+"正在进行写操作");
	            }
	            System.out.println(thread.getName()+"写操作完毕");
	        } finally {
	            rwl.writeLock().unlock();
	        }
	    }
}

 

 

一个线程读的时候另外一个线程也可以读

 

5)条件变量condition

条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。

await*对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与

Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。

 

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Test{
	static class NumberWrapper {
		public int value = 1;
	}

	public static void main(String[] args) {
		//初始化可重入锁
		final Lock lock = new ReentrantLock();
		
		//第一个条件当屏幕上输出到3
		final Condition reachThreeCondition = lock.newCondition();
		//第二个条件当屏幕上输出到6
		final Condition reachSixCondition = lock.newCondition();
		
		//NumberWrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final
		//注意这里不要用Integer, Integer 是不可变对象
		final NumberWrapper num = new NumberWrapper();
		//初始化A线程
		Thread threadA = new Thread(new Runnable() {
			@Override
			public void run() {
				//需要先获得锁
				lock.lock();
				try {
					System.out.println("threadA start write");
					//A线程先输出前3个数
					while (num.value <= 3) {
						System.out.println(num.value);
						num.value++;
					}
					//输出到3时要signal,告诉B线程可以开始了
					reachThreeCondition.signal();
				} finally {
					lock.unlock();
				}
				lock.lock();
				try {
					//等待输出6的条件
					reachSixCondition.await();
					System.out.println("threadA start write");
					//输出剩余数字
					while (num.value <= 9) {
						System.out.println(num.value);
						num.value++;
					}

				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
			}

		});


		Thread threadB = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					lock.lock();
					
					while (num.value <= 3) {
						//等待3输出完毕的信号
						reachThreeCondition.await();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					lock.unlock();
				}
				try {
					lock.lock();
					//已经收到信号,开始输出4,5,6
					System.out.println("threadB start write");
					while (num.value <= 6) {
						System.out.println(num.value);
						num.value++;
					}
					//4,5,6输出完毕,告诉A线程6输出完了
					reachSixCondition.signal();
				} finally {
					lock.unlock();
				}
			}

		});


		//启动两个线程
		threadB.start();
		threadA.start();
	}
}


输出结果

 

threadA start write
1
2
3
threadB start write
4
5
6
ThreadA start write
7
8
9

3.ThreadLocal

用处:保存线程的独立变量。对一个线程类(继承自Thread)

当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,

而不会影响其它线程所对应的副本。ThreadLocal类中有一个map,用于存储每一个线程的变量的副本,Map中元素的键为线程对象,

而值对应线程的变量副本,由于Key值不可重复,每一个“线程对象”对应线程的“变量副本”,而到达了线程安全。

同步与ThreadLocal是两种思路,前者是数据共享的思路,后者是数据隔离的思路。同步是以时间换空间,ThreadLocal是以空间换时间。

synchronized利用锁的机制,让变量或者代码块在同一时间内只能被某一个线程访问,ThreadLocal为每一个线程保存自己独立的副本,

使得每个线程在同一时刻访问的并不是同一个对象。

JDK 5 以后提供了泛型支持,ThreadLocal 被定义为支持泛型:  
public class ThreadLocal<T> extends Object  
T 为线程局部变量的类型。该类定义了 4 个方法:  
      1) protected T initialValue():  返回此线程局部变量的当前线程的“初始值”。线程第一次使用 get()  方法访问变量时将调用此方法,

            但如果线程之前调用了 set(T)  方法,则不会对该线程再调用 initialValue  方法。通常,此方法对每个线程最多调用一次,

           但如果在调用 get() 后又调用了 remove(),则可能再次调用此方法。  该实现返回 null;如果程序员希望线程局部变量具有 null  以外的值,

           则必须为 ThreadLocal 创建子类,并重写此方法。通常将使用匿名内部类完成此操作。  
     2)public T get():返回此线程局部变量的当前线程副本中的值。如果变量没有用于当前线程的值,则先将其初始化为调用 initialValue() 方法返回的值。  
     3)public void set(T value):将此线程局部变量的当前线程副本中的值设置为指定值。大部分子类不需要重写此方法,

           它们只依靠 initialValue()  方法来设置线程局部变量的值。  

     4)public void remove():移除此线程局部变量当前线程的值。如果此线程局部变量随后被当前线程读取,且这期间当前线程没有设置其值,

          则将调用其 initialValue()  方法重新初始化其值。这将导致在当前线程多次调用 initialValue  方法。 

下面是一个使用 ThreadLocal 的例子,每个线程产生自己独立的序列号。就是使用ThreadLocal存储每个线程独立的序列号复本,线程之间互不干扰。 

 

package sync;

/**
 * Created by fanqunsong on 2017/7/24.
 */
public class SequenceNumber {
     // 定义匿名子类创建ThreadLocal的变量 
    private static ThreadLocal<Integer>seqNum = new ThreadLocal<Integer>(){
     // 覆盖初始化方法 
        public Integer initialValue() {
            return 0;
        }
    };
   // 下一个序列号 
public int getNextNum() {
    seqNum.set(seqNum.get() + 1);
    return seqNum.get();

}

private static class TestClient extends Thread {
    private SequenceNumber sn;

    public TestClient(SequenceNumber sn) {
        this.sn = sn;
    }
    public void run() {
        for (int i = 0; i < 3; i++) {
            System.out.println("thread[" + Thread.currentThread().getName()   + "] sn[" + sn.getNextNum() + "]");

        }

    }
}
public static void main(String[] args) {

    SequenceNumber sn = new SequenceNumber();
    TestClient t1 = new TestClient(sn);
    TestClient t2 = new TestClient(sn);
    TestClient t3 = new TestClient(sn);
    t1.start();
    t2.start();
    t3.start();

    }

}


 

 

 

 ThreadLocal 是如何实现为每个线程保存独立的变量的副本的呢?通过查看它的源代码,我们会发现,是通过把当前“线程对象”当作键,变量作为值存储在一个 Map 中。

 

private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
  createMap(t, value);
        return value;
        }

 

 

 

 

 

 

 4.线程池

 

1)初识线程池:

 

根据系统自身的环境情况,有效的限制执行线程的数量,使得运行效果达到最佳。线程主要是通过控制执行的线程的数量,超出数量的线程排队等候,等待有任务执行完毕,再从队列最前面取出任务执行。

 

2)线程池作用:

 

减少创建和销毁线程的次数,每个工作线程可以多次使用

 

可根据系统情况调整执行的线程数量,防止消耗过多内存

 

3)使用

 

ExecutorService:线程池接口

 

ExecutorService pool = Executors.常见线程

 

eg:ExecutorService pool = Executors.newSingleThreadExecutor();

 

 

4)常见线程池

 

①newSingleThreadExecutor

 

单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务

 

②newFixedThreadExecutor(n)

 

固定数量的线程池,没提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行

 

③newCacheThreadExecutor(推荐使用)

 

可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。

 

④newScheduleThreadExecutor

 

大小无限制的线程池,支持定时和周期性的执行线程

 

5)实例

public class MyThread extends Thread {

    @Override
 public void run() {
        System.out.println(Thread.currentThread().getName()+"执行中。。。");
    }
}


public class TestFixedThreadPool {

    public static void main(String[] args) {

        ExecutorService pool = Executors.newFixedThreadPool(5);

        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);
        pool.shutdown();
    }

}

 

 

运行结果

 

 

pool-1-thread-1执行中。。。

pool-1-thread-3执行中。。。

pool-1-thread-4执行中。。。

pool-1-thread-5执行中。。。

pool-1-thread-2执行中。。。

 

 

5.阻塞队列 BlockingQueue

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。

一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,

负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,

直到一个生产线程把一个对象丢进队列。

BlockingQueue 的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

四组不同的行为方式解释:

       1)抛异常:如果试图的操作无法立即执行,抛一个异常。

       2)特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。

       3)阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

       4)超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。

            返回一个特定值以告知该操作是否成功(典型的是 true / false)。

 

BlockingQueue 的实现

BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):

  • ArrayBlockingQueue

    ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

示例:

public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue){
        this.queue=queue;
    }

    public void run() {

        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

------------------------------------------------------------------------

public class Consumer implements Runnable {
    
    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
 public void run() {

        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

----------------------------------------------------------------------------------

public class BlockingQueueExample {
    
    public static void main(String[] args) throws Exception{
        BlockingQueue queue = new ArrayBlockingQueue(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }

}

 

 

 

 

 

6.闭锁 CountDownLatch

1)类介绍

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数 初始化 CountDownLatch。

由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,

await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。 

一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行

2)使用场景

在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

应用实例:

 

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException{

        final CountDownLatch begin = new CountDownLatch(1);

        final CountDownLatch end = new CountDownLatch(10);

        final ExecutorService exec = Executors.newFixedThreadPool(10);

        for (int index = 0; index <10 ; index++) {

            final int NO = index + 1;

            Runnable run = new Runnable() {
                @Override
  public void run() {
                    try {
                        begin.await();
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("NO."+NO+"arrived");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }
                }
            };
            exec.submit(run);
        }
        System.out.println("Game Start");
        begin.countDown();
        end.await();
        System.out.println("Game Over");
        exec.shutdown();
    }

}

 

 

 

 

7.CyclicBarrier

 

CyclicBarrier 翻译过来叫循环栅栏。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。

当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。

 

CyclicBarrier 的使用并不难,但需要主要它所相关的异常。除了常见的异常,CyclicBarrier.await() 方法会抛出一个独有的 BrokenBarrierException。

这个异常发生在当某个线程在等待本 CyclicBarrier 时被中断或超时或被重置时,其它同样在这个 CyclicBarrier 上等待的线程便会受到 BrokenBarrierException。

意思就是说,同志们,别等了,有个小伙伴已经挂了,咱们如果继续等有可能会一直等下去,所有各回各家吧。

 

CyclicBarrier.await() 方法带有返回值,用来表示当前线程是第几个到达这个 Barrier 的线程。

 

和 CountDownLatch 一样,CyclicBarrier 同样可以可以在构造函数中设定总计数值。与 CountDownLatch 不同的是,CyclicBarrier 的构造函数还可以接受一个 Runnable,会在 CyclicBarrier 被释放时执行。

 

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.submit(new Thread(new Runner(barrier, "1号选手")));
        executor.submit(new Thread(new Runner(barrier, "2号选手")));
        executor.submit(new Thread(new Runner(barrier, "3号选手")));
        executor.shutdown();
    }
}
class Runner implements Runnable{
    private CyclicBarrier barrier;
    private String name;
    public Runner(CyclicBarrier barrier, String name) {
        this.barrier = barrier;
        this.name = name;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(1000 * (new Random()).nextInt(8));
            System.out.println(name + " 准备好了...");
            // barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(name+"起跑");
    }
}

运行结果

 

3号选手 准备好了...
2号选手 准备好了...
1号选手 准备好了...
1号选手起跑
3号选手起跑
2号选手起跑

 

CountDownLatch : 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。

CyclicBarrier       : N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”, 是它在等待, 而另外那N的线程在把“某个事情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。

CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.

而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.

 

8.交换机Exchanger

 

 

9.Semaphore

我们以一个停车场运作为例来说明信号量的作用。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦。

以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。

在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。信号量是一个非负整数,表示了当前公共资源的可用数目

(在上面的例子中可以用空闲的停车位类比信号量),当一个线程要使用公共资源时(在上面的例子中可以用车辆类比线程),首先要查看信号量,

如果信号量的值大于1,则将其减1,然后去占有公共资源。如果信号量的值为0,则线程会将自己阻塞,直到有其它线程释放公共资源。

在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),

要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

 

实例

 

public class SemaphoreTest {

    private Semaphore semaphore = new Semaphore(3);
    private Random random = new Random();

    class TaskDemo implements Runnable{
        private String id;
        TaskDemo(String id){
            this.id=id;
        }

        @Override
 public void run() {

            try {
                semaphore.acquire();
                System.out.println("Thread " + id + " is working");
                Thread.sleep(random.nextInt(1000));
                semaphore.release();
                System.out.println("Thread "+id+" is over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {

        SemaphoreTest semaphoreTest = new SemaphoreTest();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(semaphoreTest.new TaskDemo("a"));
        executorService.submit(semaphoreTest.new TaskDemo("b"));
        executorService.submit(semaphoreTest.new TaskDemo("c"));
        executorService.submit(semaphoreTest.new TaskDemo("d"));
        executorService.submit(semaphoreTest.new TaskDemo("e"));
        executorService.submit(semaphoreTest.new TaskDemo("f"));
        executorService.shutdown();
    }
}

 

 

 

 

 

运行结果

Thread a is working

Thread b is working

Thread c is working

Thread a is over

Thread d is working

Thread d is over

Thread e is working

Thread b is over

Thread f is working

Thread f is over

Thread e is over

Thread c is over

全部评论

相关推荐

totoroyyw:千年老妖😂
投递华为等公司10个岗位
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务