锁
Unsafe类的park和unpark
在前文(原子变量 )中曾经详细介绍过Unsafe类的指针和CAS操作,这里再介绍一下它的park和unpark操作。
public native void park(boolean var1, long var2);
public native void unpark(Object var1);
- park方法用来阻塞一个线程,第一个参数用来指示后面的参数是绝对时间还是相对时间,true表示绝对时间,false表示从此刻开始后的相对时间.调用park的线程就阻塞在此处.
- upark用来释放某个线程的阻塞,线程用参数var1表示.例子如下:
public class UnsafeParkExam {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
Thread t1 = new Thread() {
@Override
public void run() {
Thread.currentThread().setName("t1");
System.out.println(Thread.currentThread().getName() + " before park");
//park 100 seconds
unsafe.park(false, TimeUnit.NANOSECONDS.convert(100, TimeUnit.SECONDS));
System.out.println(Thread.currentThread().getName() + " after park");
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
Thread.currentThread().setName("t2");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " unpark t1");
unsafe.unpark(t1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread t3 = new Thread() {
@Override
public void run() {
Thread.currentThread().setName("t3");
System.out.println(Thread.currentThread().getName() + " park 5 seconds");
//park 5 seconds
unsafe.park(true, System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5,TimeUnit.SECONDS));
System.out.println(Thread.currentThread().getName() + " after park");
}
};
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
LockSupport
直接使用Unsafe还是有诸多不便之处,因此lock包提供了一个辅助类LockSupport来封装了park和unpark,例子如下:
public class LockSupportExam {
public static void main(String[] args) {
Thread t1 = new Thread() {
@Override
public void run() {
Thread.currentThread().setName("t1");
System.out.println(Thread.currentThread().getName() + " before park");
//park 100 seconds
LockSupport.parkNanos(TimeUnit.NANOSECONDS.convert(100, TimeUnit.SECONDS));
System.out.println(Thread.currentThread().getName() + " after park");
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
Thread.currentThread().setName("t2");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " unpark t1");
LockSupport.unpark(t1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread t3 = new Thread() {
@Override
public void run() {
Thread.currentThread().setName("t3");
System.out.println(Thread.currentThread().getName() + " park 5 seconds");
//park 5 seconds
LockSupport.parkUntil(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5,TimeUnit.SECONDS));
System.out.println(Thread.currentThread().getName() + " after park");
}
};
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
从例子中可以看出,使用LockSupport要比直接只用Unsafe更加便捷。除此之外,LockSupport还可以用来给线程设置一个Blocker对象,便于调试和检测线程,其原理是使用Unsafe的putObject方法直接设置Thread对象的parkBlocker属性,并在合适的时候读取这个Blocker对象,例子如下:
public class LockSupportBlockerExam {
public static void main(String[] args) {
Thread t3 = new Thread(() -> { Thread.currentThread().setName("t3"); System.out.println(Thread.currentThread().getName() + " park 5 seconds"); //park 5 seconds, set blocker Object blocker = new String("sss"); LockSupport.parkUntil(blocker, System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS)); System.out.println(Thread.currentThread().getName() + " after park"); }); t3.start(); try { Object t3_blocker = null; while (t3_blocker == null) { t3_blocker = LockSupport.getBlocker(t3); TimeUnit.MILLISECONDS.sleep(10); } System.out.println("t3 blocker is :" + t3_blocker); t3.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
锁的核心AQS同步器
毫不夸张的说,各种锁ReentrantLock、ReentrantReadWriteLock以及各种同步器诸如Semaphore、CountDownLatch等等,其核心都是AbstractQueuedSynchronizer(AQS同步器)。要了解AQS的原理,仅仅看代码和JDK文档是不够的,其原理在这一篇论文(http://gee.cs.oswego.edu/dl/papers/aqs.pdf)中详细阐述了,强烈建议仔细阅读此论文。
1 使用AQS写一个排它锁
由于AQS非常复杂,从源代码入手很难看懂,最好的学习方法是看它是如何被使用的。这里有一个非常简单的例子SimpleLock,实现了一个最简单的排它锁。当有线程获得锁时,其他线程只能等待;当这个线程释放锁时,另一个线程可以竞争获取锁。
public class SimpleLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int ignore) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int ignore) {
setState(0);
return true;
}
protected Sync() {
super();
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
private static class MyThread extends Thread {
private final String name;
private final SimpleLock lock;
private MyThread(String name, SimpleLock lock) {
this.name = name;
this.lock = lock;
}
@Override
public void run() {
try {
lock.lock();
System.out.println(name + " get the lock");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(name + " release the lock");
}
}
}
public static void main(String[] args) {
final SimpleLock mutex = new SimpleLock();
MyThread t1 = new MyThread("t1", mutex);
MyThread t2 = new MyThread("t2", mutex);
MyThread t3 = new MyThread("t3", mutex);
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main thread exit!");
}
}
运行结果表明,通过简单的几行代码,就实行了一个锁的所有功能。
根据JUC(Java Util Concurrency)作者的建议,AQS的使用方法要遵循上面这个模式。一是使用一个内部类Sync来继承AQS,并实现AQS的相关方法,一般是tryAcquire和tryRelease(排它锁),或者tryAcquireShared和tryReleaseShared(共享锁);二是在内部使用一个代理模式来实现锁的功能,这样做的好处是可以让暴露出的同步、互斥方法名由程序员自行决定,例如各种锁可以使用lock、unlock,Semaphore可以使用acquire和release,CountDownLatch可以使用await和countDown。
2 AQS基本原理
要实现一个同步器,需要三个条件:
- 一个同步状态值,并可以原子化的操作这个状态值.显然,我们可以用CAS操作状态值
- 阻塞线程和解除阻塞线程的机制,可以用LockSupport来实现
- 维持一个阻塞线程的队列,并在队列的每个节点中存储线程的状态等信息
AQS同时满足了这三个条件,首先,它提供了如下的状态值,以及相应的操作方法:
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
state是一个volatile int类型,除了getter和setter方法外,还提供了一个CAS方法,提供原子化的比较更新操作。一般来说,AQS认为
- state为0时,同步器处于释放状态,多个线程此刻可以竞争获取同步器
- state不等于0时,同步器处于已获取状态,后续线程需进入队列,等待同步器(可重入的同步器允许获取同步器的线程再次进入同步器,并使用state进行计数)。
- 当然,很多情况下,程序员也可自己定义state的值的含义,特别是在实现读写锁时,需要将state一分为二的用。
其次,LockSupport提供了阻塞和解除阻塞的功能。因此,所有同步器的阻塞操作其实都是基于LockSupport的,也就是基于Unsafe的park和unpark方法的。
第三,AQS内部提供了一个Node类型,它是用来形成“线程等待队列”的节点类型,以及一个由Node类型组成的队列。此队列的原理留待后续章节细说。