多线程与高并发
1.Callable和Runnable
I Callable定义的方法是call,而Runnable定义的方法是run。
II Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
III Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。
Runnable 不做具体介绍
通过实现Callable接口来创建Thread线程:其中,Callable接口(也只有一个方法)定义如下:
public interface Callable<V>
{
V call() throws Exception;
}
步骤1:创建实现Callable接口的类SomeCallable<Integer>(略);
步骤2:创建一个类对象:
Callable<Integer> oneCallable = new SomeCallable<Integer>();
步骤3:由Callable<Integer>创建一个FutureTask<Integer>对象:
FutureTask<Integer> oneTask = new FutureTask<Integer>(oneCallable);
注释:FutureTask<Integer>是一个包装器,它通过接受Callable<Integer>来创建,它同时实现了Future和Runnable接口。
步骤4:由FutureTask<Integer>创建一个Thread对象:
Thread oneThread = new Thread(oneTask);
步骤5:启动线程:
oneThread.start();
至此,一个线程就创建完成了。
例:
package reed.meituan.com;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class CallableAndFuture1 {
public static void main(String[] args) throws Exception{
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
FutureTask<Integer> futureTask = new FutureTask<Integer>(callable);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
callable和future,一个产生结果,一个拿到结果
FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值..
下面来看另一种方式使用Callable和Future,通过ExecutorService的submit方法执行Callable,并返回Future,代码如下:
import java.util.Random;
import java.util.concurrent.*;
public class CallableAndFuture {
public static void main(String[] args) throws Exception{
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<Integer> future = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt(100);
}
});
Thread.sleep(5000);
System.out.println(future.get());
}
}
2.锁的相关概念介绍
1)可重入锁
如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。
举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。
看下面这段代码就明白了:
class MyClass {
public synchronized void method1() {
method2();
}
public synchronized void method2() {
}
}
上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,
假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。
而由于synchronized和Lock都具备可重入性,所以不会发生上述现象。
2)可中断锁
可中断锁:顾名思义,就是可以相应中断的锁。
在Java中,synchronized就不是可中断锁,而Lock是可中断锁。
如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,
我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。
lockInterruptibly()可体现了Lock的可中断性。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by fanqunsong on 2017/8/5.
*/
public class LockTest {
public static void main(String[] args) {
Thread t1 = new Thread(new RunIt());
Thread t2 = new Thread(new RunIt());
t1.start();
t2.start();
t2.interrupt();
}
}
class RunIt implements Runnable{
private static Lock lock = new ReentrantLock();
@Override
public void run() {
try {
//----------------------a
//lock.lock();
lock.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " running");
TimeUnit.SECONDS.sleep(5);
lock.unlock();
System.out.println(Thread.currentThread().getName()+" finishend");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+ " interrupted");
}
}
}
如果a处 是lock.lock();
输出
Thread-0 running
(这里休眠了5s)
Thread-0 finished
Thread-1 running
Thread-1 interrupted
============================
如果a处是lock.lockInterruptibly()
Thread-0 running
Thread-1 interrupted
(这里休眠了5s)
Thread-0 finished
3)公平锁
公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。
非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。
而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。
在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。
我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:
ReentrantLock lock = new ReentrantLock(true);
如果参数为true表示为公平锁,为fasle为非公平锁。默认情况下,如果使用无参构造器,则是非公平锁。
4)读写锁
读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。
正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。
ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。
可以通过readLock()获取读锁,通过writeLock()获取写锁。
在多线程开发中,经常会出现一种情况,我们希望读写分离。就是对于读取这个动作来说,可以同时有多个线程同
时去读取这个资源,但是对于写这个动作来说,只能同时有一个线程来操作,而且同时,当有一个写线程在操作这个资
源的时候,其他的读线程是不能来操作这个资源的,这样就极大的发挥了多线程的特点,能很好的将多线程的能力发挥。
package reed.thread;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
public static void main(String[] args) {
final Data data = new Data();
for (int i = 0; i <3 ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j <5;j++) {
data.set(new Random().nextInt(100));
}
}
}
).start();
}
for (int i = 0; i <3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j <5;j++){
data.get();
}
}
}).start();
}
}
}
class Data{
private int data;//共享数据1
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public void set(int data){
rwl.writeLock().lock();// 取到写锁
try{
System.out.println(Thread.currentThread().getName()+"准备写入数据");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
System.out.println(Thread.currentThread().getName() + "写入" + this.data);
}finally {
rwl.writeLock().unlock();
}
}
public void get() {
rwl.readLock().lock();// 取到读锁
try {
System.out.println(Thread.currentThread().getName() + "准备读取数据");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "读取" + this.data);
} finally {
rwl.readLock().unlock();// 释放读锁
}
}
}
运行结果
Thread-1准备写入数据
Thread-1写入57
Thread-4准备读取数据
Thread-3准备读取数据
Thread-5准备读取数据
Thread-4读取57
Thread-5读取57
Thread-3读取57
Thread-2准备写入数据
Thread-2写入64
Thread-2准备写入数据
Thread-2写入82
Thread-2准备写入数据
Thread-2写入26
Thread-4准备读取数据
Thread-5准备读取数据
Thread-3准备读取数据
Thread-5读取26
Thread-4读取26
Thread-3读取26
Thread-4准备读取数据
Thread-5准备读取数据
Thread-3准备读取数据
Thread-5读取26
Thread-3读取26
Thread-4读取26
Thread-3准备读取数据
Thread-5准备读取数据
Thread-4准备读取数据
Thread-5读取26
Thread-3读取26
Thread-4读取26
Thread-3准备读取数据
Thread-5准备读取数据
Thread-4准备读取数据
Thread-4读取26
Thread-5读取26
Thread-3读取26
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程
*
* 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
* 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
* @author
*
*/
public class MyReentrantReadWriteLock {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock();
new Thread(){
public void run() {
test.get(Thread.currentThread());
test.write(Thread.currentThread());
};
}.start();
new Thread(){
public void run() {
test.get(Thread.currentThread());
test.write(Thread.currentThread());
};
}.start();
}
/**
* 读操作,用读锁来锁定
* @param thread
*/
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行读操作");
}
System.out.println(thread.getName()+"读操作完毕");
} finally {
rwl.readLock().unlock();
}
}
/**
* 写操作,用写锁来锁定
* @param thread
*/
public void write(Thread thread) {
rwl.writeLock().lock();;
try {
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行写操作");
}
System.out.println(thread.getName()+"写操作完毕");
} finally {
rwl.writeLock().unlock();
}
}
}
一个线程读的时候另外一个线程也可以读
5)条件变量condition
条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
await*对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与
Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test{
static class NumberWrapper {
public int value = 1;
}
public static void main(String[] args) {
//初始化可重入锁
final Lock lock = new ReentrantLock();
//第一个条件当屏幕上输出到3
final Condition reachThreeCondition = lock.newCondition();
//第二个条件当屏幕上输出到6
final Condition reachSixCondition = lock.newCondition();
//NumberWrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final
//注意这里不要用Integer, Integer 是不可变对象
final NumberWrapper num = new NumberWrapper();
//初始化A线程
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
//需要先获得锁
lock.lock();
try {
System.out.println("threadA start write");
//A线程先输出前3个数
while (num.value <= 3) {
System.out.println(num.value);
num.value++;
}
//输出到3时要signal,告诉B线程可以开始了
reachThreeCondition.signal();
} finally {
lock.unlock();
}
lock.lock();
try {
//等待输出6的条件
reachSixCondition.await();
System.out.println("threadA start write");
//输出剩余数字
while (num.value <= 9) {
System.out.println(num.value);
num.value++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
while (num.value <= 3) {
//等待3输出完毕的信号
reachThreeCondition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
lock.lock();
//已经收到信号,开始输出4,5,6
System.out.println("threadB start write");
while (num.value <= 6) {
System.out.println(num.value);
num.value++;
}
//4,5,6输出完毕,告诉A线程6输出完了
reachSixCondition.signal();
} finally {
lock.unlock();
}
}
});
//启动两个线程
threadB.start();
threadA.start();
}
}
输出结果
threadA start write
1
2
3
threadB start write
4
5
6
ThreadA start write
7
8
9
3.ThreadLocal
用处:保存线程的独立变量。对一个线程类(继承自Thread)
当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,
而不会影响其它线程所对应的副本。ThreadLocal类中有一个map,用于存储每一个线程的变量的副本,Map中元素的键为线程对象,
而值对应线程的变量副本,由于Key值不可重复,每一个“线程对象”对应线程的“变量副本”,而到达了线程安全。
同步与ThreadLocal是两种思路,前者是数据共享的思路,后者是数据隔离的思路。同步是以时间换空间,ThreadLocal是以空间换时间。
synchronized利用锁的机制,让变量或者代码块在同一时间内只能被某一个线程访问,ThreadLocal为每一个线程保存自己独立的副本,
使得每个线程在同一时刻访问的并不是同一个对象。
JDK 5 以后提供了泛型支持,ThreadLocal 被定义为支持泛型:
public class ThreadLocal<T> extends Object
T 为线程局部变量的类型。该类定义了 4 个方法:
1) protected T initialValue(): 返回此线程局部变量的当前线程的“初始值”。线程第一次使用 get() 方法访问变量时将调用此方法,
但如果线程之前调用了 set(T) 方法,则不会对该线程再调用 initialValue 方法。通常,此方法对每个线程最多调用一次,
但如果在调用 get() 后又调用了 remove(),则可能再次调用此方法。 该实现返回 null;如果程序员希望线程局部变量具有 null 以外的值,
则必须为 ThreadLocal 创建子类,并重写此方法。通常将使用匿名内部类完成此操作。
2)public T get():返回此线程局部变量的当前线程副本中的值。如果变量没有用于当前线程的值,则先将其初始化为调用 initialValue() 方法返回的值。
3)public void set(T value):将此线程局部变量的当前线程副本中的值设置为指定值。大部分子类不需要重写此方法,
它们只依靠 initialValue() 方法来设置线程局部变量的值。
4)public void remove():移除此线程局部变量当前线程的值。如果此线程局部变量随后被当前线程读取,且这期间当前线程没有设置其值,
则将调用其 initialValue() 方法重新初始化其值。这将导致在当前线程多次调用 initialValue 方法。
下面是一个使用 ThreadLocal 的例子,每个线程产生自己独立的序列号。就是使用ThreadLocal存储每个线程独立的序列号复本,线程之间互不干扰。
package sync;
/**
* Created by fanqunsong on 2017/7/24.
*/
public class SequenceNumber {
// 定义匿名子类创建ThreadLocal的变量
private static ThreadLocal<Integer>seqNum = new ThreadLocal<Integer>(){
// 覆盖初始化方法
public Integer initialValue() {
return 0;
}
};
// 下一个序列号
public int getNextNum() {
seqNum.set(seqNum.get() + 1);
return seqNum.get();
}
private static class TestClient extends Thread {
private SequenceNumber sn;
public TestClient(SequenceNumber sn) {
this.sn = sn;
}
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println("thread[" + Thread.currentThread().getName() + "] sn[" + sn.getNextNum() + "]");
}
}
}
public static void main(String[] args) {
SequenceNumber sn = new SequenceNumber();
TestClient t1 = new TestClient(sn);
TestClient t2 = new TestClient(sn);
TestClient t3 = new TestClient(sn);
t1.start();
t2.start();
t3.start();
}
}
ThreadLocal 是如何实现为每个线程保存独立的变量的副本的呢?通过查看它的源代码,我们会发现,是通过把当前“线程对象”当作键,变量作为值存储在一个 Map 中。
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
4.线程池
1)初识线程池:
根据系统自身的环境情况,有效的限制执行线程的数量,使得运行效果达到最佳。线程主要是通过控制执行的线程的数量,超出数量的线程排队等候,等待有任务执行完毕,再从队列最前面取出任务执行。
2)线程池作用:
减少创建和销毁线程的次数,每个工作线程可以多次使用
可根据系统情况调整执行的线程数量,防止消耗过多内存
3)使用
ExecutorService:线程池接口
ExecutorService pool = Executors.常见线程
eg:ExecutorService pool = Executors.newSingleThreadExecutor();
4)常见线程池
①newSingleThreadExecutor
单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
②newFixedThreadExecutor(n)
固定数量的线程池,没提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
③newCacheThreadExecutor(推荐使用)
可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。
④newScheduleThreadExecutor
大小无限制的线程池,支持定时和周期性的执行线程
5)实例
public class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行中。。。");
}
}
public class TestFixedThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.shutdown();
}
}
运行结果
pool-1-thread-1执行中。。。
pool-1-thread-3执行中。。。
pool-1-thread-4执行中。。。
pool-1-thread-5执行中。。。
pool-1-thread-2执行中。。。
5.阻塞队列 BlockingQueue
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。
一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,
负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,
直到一个生产线程把一个对象丢进队列。
BlockingQueue 的方法
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
四组不同的行为方式解释:
1)抛异常:如果试图的操作无法立即执行,抛一个异常。
2)特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
3)阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
4)超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。
返回一个特定值以告知该操作是否成功(典型的是 true / false)。
BlockingQueue 的实现
BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):
- ArrayBlockingQueue
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
示例:
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue){
this.queue=queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
------------------------------------------------------------------------
public class Consumer implements Runnable {
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------
public class BlockingQueueExample {
public static void main(String[] args) throws Exception{
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
6.闭锁 CountDownLatch
1)类介绍
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数 初始化 CountDownLatch。
由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,
await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。
一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行
2)使用场景
在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。
应用实例:
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException{
final CountDownLatch begin = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(10);
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int index = 0; index <10 ; index++) {
final int NO = index + 1;
Runnable run = new Runnable() {
@Override
public void run() {
try {
begin.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("NO."+NO+"arrived");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
exec.submit(run);
}
System.out.println("Game Start");
begin.countDown();
end.await();
System.out.println("Game Over");
exec.shutdown();
}
}
7.CyclicBarrier
CyclicBarrier 翻译过来叫循环栅栏。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。
当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。
CyclicBarrier 的使用并不难,但需要主要它所相关的异常。除了常见的异常,CyclicBarrier.await() 方法会抛出一个独有的 BrokenBarrierException。
这个异常发生在当某个线程在等待本 CyclicBarrier 时被中断或超时或被重置时,其它同样在这个 CyclicBarrier 上等待的线程便会受到 BrokenBarrierException。
意思就是说,同志们,别等了,有个小伙伴已经挂了,咱们如果继续等有可能会一直等下去,所有各回各家吧。
CyclicBarrier.await() 方法带有返回值,用来表示当前线程是第几个到达这个 Barrier 的线程。
和 CountDownLatch 一样,CyclicBarrier 同样可以可以在构造函数中设定总计数值。与 CountDownLatch 不同的是,CyclicBarrier 的构造函数还可以接受一个 Runnable,会在 CyclicBarrier 被释放时执行。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "1号选手")));
executor.submit(new Thread(new Runner(barrier, "2号选手")));
executor.submit(new Thread(new Runner(barrier, "3号选手")));
executor.shutdown();
}
}
class Runner implements Runnable{
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + " 准备好了...");
// barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name+"起跑");
}
}
运行结果
3号选手 准备好了...
2号选手 准备好了...
1号选手 准备好了...
1号选手起跑
3号选手起跑
2号选手起跑
CountDownLatch : 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。
CyclicBarrier : N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”, 是它在等待, 而另外那N的线程在把“某个事情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。
CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.
而CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
8.交换机Exchanger
9.Semaphore
我们以一个停车场运作为例来说明信号量的作用。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦。
以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。
在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。信号量是一个非负整数,表示了当前公共资源的可用数目
(在上面的例子中可以用空闲的停车位类比信号量),当一个线程要使用公共资源时(在上面的例子中可以用车辆类比线程),首先要查看信号量,
如果信号量的值大于1,则将其减1,然后去占有公共资源。如果信号量的值为0,则线程会将自己阻塞,直到有其它线程释放公共资源。
在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
实例
public class SemaphoreTest {
private Semaphore semaphore = new Semaphore(3);
private Random random = new Random();
class TaskDemo implements Runnable{
private String id;
TaskDemo(String id){
this.id=id;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Thread " + id + " is working");
Thread.sleep(random.nextInt(1000));
semaphore.release();
System.out.println("Thread "+id+" is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SemaphoreTest semaphoreTest = new SemaphoreTest();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(semaphoreTest.new TaskDemo("a"));
executorService.submit(semaphoreTest.new TaskDemo("b"));
executorService.submit(semaphoreTest.new TaskDemo("c"));
executorService.submit(semaphoreTest.new TaskDemo("d"));
executorService.submit(semaphoreTest.new TaskDemo("e"));
executorService.submit(semaphoreTest.new TaskDemo("f"));
executorService.shutdown();
}
}
运行结果
Thread a is working
Thread b is working
Thread c is working
Thread a is over
Thread d is working
Thread d is over
Thread e is working
Thread b is over
Thread f is working
Thread f is over
Thread e is over
Thread c is over