死磕JDK源码之Semaphore
源码分析
1 package java.util.concurrent; 2 3 import java.util.Collection; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 6 /* 7 一个计数信号量,从概念上讲,信号量维护了一个许可集,在许可可用前会阻塞每一个acquire,直到许可可用才能获取 8 release会添加一个许可,从而可能释放一个正被阻塞的acquire 9 Semaphore经常用于限制可以访问某些资源(物理或逻辑的)的线程数目 10 11 class Pool { 12 private static final int MAX_AVAILABLE = 100; 13 private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); 14 15 public Object getItem() throws InterruptedException { 16 available.acquire(); 17 return getNextAvailableItem(); 18 } 19 20 public void putItem(Object x) { 21 if (markAsUnused(x)) 22 available.release(); 23 } 24 25 // Not a particularly efficient data structure; just for demo 26 27 protected Object[] items = ... whatever kinds of items being managed 28 protected boolean[] used = new boolean[MAX_AVAILABLE]; 29 30 protected synchronized Object getNextAvailableItem() { 31 for (int i = 0; i < MAX_AVAILABLE; ++i) { 32 if (!used[i]) { 33 used[i] = true; 34 return items[i]; 35 } 36 } 37 return null; // not reached 38 } 39 40 protected synchronized boolean markAsUnused(Object item) { 41 for (int i = 0; i < MAX_AVAILABLE; ++i) { 42 if (item == items[i]) { 43 if (used[i]) { 44 used[i] = false; 45 return true; 46 } else 47 return false; 48 } 49 } 50 return false; 51 } 52 } 53 54 semaphore初始化为1,那么最多只有一个可用的许可,可用作一个相互排斥的锁,也称为二进制信号量 55 56 Semaphore的构造方法可选地接受一个公平参数,当设置为false时,不对线程获取许可的顺序作任何保证,即插队是允许的,也就是说可以在等待的线程前为调用acquire 57 的线程分配一个许可,当设置为true时,Semaphore保证对于任何调用acquire的线程而言,都按照调用方法的顺序即FIFO来选择线程获得许可所以可能某个线程在另一个线 58 程之前调用了acquire,却在之后到达排序点,并且从方法返回时也类似 59 60 tryAcquire方法默认是非公平的 61 一般对于控制资源访问的信号量都初始化为公平的,以确保所有线程都可访问资源,非公平的方式会比公平的方式在吞吐量上更有优势 62 63 Semaphore还提供acquire及release方法一次获取或释放多个许可,但是要注意的是,如果设置为非公平的方式,这可能会增加不确定延期的风险(同时获取多个许可可能会产生竞争) 64 65 */ 66 // Semaphore也是通过AQS实现的 67 public class Semaphore implements java.io.Serializable { 68 private static final long serialVersionUID = -3222578661600680210L; 69 private final Sync sync; 70 71 abstract static class Sync extends AbstractQueuedSynchronizer { 72 private static final long serialVersionUID = 1192457210091910933L; 73 74 Sync(int permits) { 75 setState(permits); 76 } 77 78 final int getPermits() { 79 return getState(); 80 } 81 82 final int nonfairTryAcquireShared(int acquires) { 83 for (; ; ) { 84 int available = getState(); 85 int remaining = available - acquires; 86 if (remaining < 0 || 87 compareAndSetState(available, remaining)) 88 return remaining; 89 } 90 } 91 92 protected final boolean tryReleaseShared(int releases) { 93 for (; ; ) { 94 int current = getState(); 95 int next = current + releases; 96 if (next < current) // overflow 97 throw new Error("Maximum permit count exceeded"); 98 if (compareAndSetState(current, next)) 99 return true; 100 } 101 } 102 103 final void reducePermits(int reductions) { 104 for (; ; ) { 105 int current = getState(); 106 int next = current - reductions; 107 if (next > current) // underflow 108 throw new Error("Permit count underflow"); 109 if (compareAndSetState(current, next)) 110 return; 111 } 112 } 113 114 final int drainPermits() { 115 for (; ; ) { 116 int current = getState(); 117 if (current == 0 || compareAndSetState(current, 0)) 118 return current; 119 } 120 } 121 } 122 123 // 非公平方式 124 static final class NonfairSync extends Sync { 125 private static final long serialVersionUID = -2694183684443567898L; 126 127 NonfairSync(int permits) { 128 super(permits); 129 } 130 131 protected int tryAcquireShared(int acquires) { 132 return nonfairTryAcquireShared(acquires); 133 } 134 } 135 136 // 公平方式 137 static final class FairSync extends Sync { 138 private static final long serialVersionUID = 2014338818796000944L; 139 140 FairSync(int permits) { 141 super(permits); 142 } 143 144 protected int tryAcquireShared(int acquires) { 145 for (; ; ) { 146 if (hasQueuedPredecessors()) 147 return -1; 148 int available = getState(); 149 int remaining = available - acquires; 150 if (remaining < 0 || 151 compareAndSetState(available, remaining)) 152 return remaining; 153 } 154 } 155 } 156 157 // 创建具有给定的许可数和非公平的公平设置的Semaphore,注意给定的permits可能为负数,在这种情况下释放许可必须保证在获取许可之前 158 public Semaphore(int permits) { 159 sync = new NonfairSync(permits); 160 } 161 162 // true表示Semaphore保证按照FIFO的顺序对线程授予许可 163 public Semaphore(int permits, boolean fair) { 164 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 165 } 166 167 /* 168 从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断 169 获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到: 170 1.某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程 171 2.某些其他线程中断当前线程 172 如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态 173 */ 174 public void acquire() throws InterruptedException { 175 sync.acquireSharedInterruptibly(1); 176 } 177 178 /* 179 从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断 180 获取一个许可后立即返回,并把许可数减1 181 如果没有可用的许可,当前线程会处于休眠状态直到:某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程 182 如果当前线程在等待许可时被中断,那么它会接着等待,但是与没有发生中断相比,为线程分配许可的时间可能改变 183 */ 184 public void acquireUninterruptibly() { 185 sync.acquireShared(1); 186 } 187 188 /* 189 仅在有可用的许可且在调用的时间内,那么会从semaphore中获取一个许可 190 获取一个许可并立即返回,返回值为true,当前的许可数-1,如果没有可用的许可,方法会立即返回false 191 即使semaphore是基于公平方式实现的,调用tryAcquire方法也会无视公平性,只要有许可,就会获得而不管是否有线程在排队等待 192 */ 193 public boolean tryAcquire() { 194 return sync.nonfairTryAcquireShared(1) >= 0; 195 } 196 197 /* 198 从semaphore中获取一个许可,仅在给定的时间内且当前线程未被中断 199 获得一个许可并立即返回,返回值为true,当前许可数-1 200 如果没有可用的许可,当前线程会处于休眠状态直到: 201 1.某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程 202 2.某些其他线程中断当前线程 203 3.超出等待时间 204 如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态 205 如果超出给定等待时间会返回false,如果给定时间小于等于0,那么方法根本不会等待 206 timeout:等待获取许可的时间限制 unit:timeout的时间参数 207 */ 208 public boolean tryAcquire(long timeout, TimeUnit unit) 209 throws InterruptedException { 210 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 211 } 212 213 /* 214 释放一个许可,并把它放回semaphore中,可用许可数+1 215 如果任意线程试图获取许可,则选中一个线程并把释放的许可给它,不要求释放许可的线程必须通过调用acquire方法来获取许可 216 */ 217 public void release() { 218 sync.releaseShared(1); 219 } 220 221 /* 222 从semaphore中获取给定数量的许可,一直阻塞直到全部获取或者线程被中断 223 获取给定数量的许可并立即返回,许可数-给定数量 224 如果没有足够的可用许可,那么线程会处于休眠状态直到: 225 1.其他某些线程调用release释放许可,且当前线程是下一个被分配允许的线程并且可用许可的数目满足给定数量 226 2.某些其他线程中断当前线程 227 如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态 228 */ 229 public void acquire(int permits) throws InterruptedException { 230 if (permits < 0) throw new IllegalArgumentException(); 231 sync.acquireSharedInterruptibly(permits); 232 } 233 234 // 从semaphore中获取给定数量的许可,一直阻塞直到全部获取 235 public void acquireUninterruptibly(int permits) { 236 if (permits < 0) throw new IllegalArgumentException(); 237 sync.acquireShared(permits); 238 } 239 240 // 从semaphore中获取给定数量的许可,仅在调用时间内所有许可可被获取 241 public boolean tryAcquire(int permits) { 242 if (permits < 0) throw new IllegalArgumentException(); 243 return sync.nonfairTryAcquireShared(permits) >= 0; 244 } 245 246 // 仅在给定的时间内且当前线程未被中断,那么会从semaphore中获取给定数量的许可 247 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) 248 throws InterruptedException { 249 if (permits < 0) throw new IllegalArgumentException(); 250 return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); 251 } 252 253 // 释放给定数量的许可,并放回semaphore中,可用许可数+给定数量 254 public void release(int permits) { 255 if (permits < 0) throw new IllegalArgumentException(); 256 sync.releaseShared(permits); 257 } 258 259 // 返回semaphore中可用的许可数 260 public int availablePermits() { 261 return sync.getPermits(); 262 } 263 264 // 获取并返回立即可用的所有许可 265 public int drainPermits() { 266 return sync.drainPermits(); 267 } 268 269 // 根据指定的缩减量减小可用许可的数目,不同于acquire,在许可变为可用的过程中,它不会阻塞等待 270 protected void reducePermits(int reduction) { 271 if (reduction < 0) throw new IllegalArgumentException(); 272 sync.reducePermits(reduction); 273 } 274 275 // 如果semaphore的公平设置为true,则返回true 276 public boolean isFair() { 277 return sync instanceof FairSync; 278 } 279 280 // 查询是否有线程正在等待获取,需要注意,因为可能同时发生取消,所以返回true并不保证有其他线程等待获取许可 281 public final boolean hasQueuedThreads() { 282 return sync.hasQueuedThreads(); 283 } 284 285 // 返回正在等待获取的线程的估计数量,仅是估计的数量,因为线程的数量在遍历的过程中可能会动态地变化 286 public final int getQueueLength() { 287 return sync.getQueueLength(); 288 } 289 290 // 返回一个collection,包含可能等待获取的线程 291 protected Collection<Thread> getQueuedThreads() { 292 return sync.getQueuedThreads(); 293 } 294 295 // 返回信号量的状态,即当前的许可数 296 public String toString() { 297 return super.toString() + "[Permits = " + sync.getPermits() + "]"; 298 } 299 }
典型用法
1 package jcip; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 7 public class SemaphoreTest { 8 private final static int threadCount = 20; 9 10 public static void main(String[] args) throws Exception { 11 ExecutorService exec = Executors.newCachedThreadPool(); 12 final Semaphore semaphore = new Semaphore(3); 13 for (int i = 0; i < threadCount; i++) { 14 final int threadNum = i; 15 exec.execute(() -> { 16 try { 17 semaphore.acquire(); //获取一个许可 18 fun(threadNum); 19 semaphore.release(); //释放一个许可 20 /* 21 semaphore.acquire(3); //获取多个许可 22 fun(threadNum); 23 semaphore.release(3); //释放多个许可 24 //对应结果为1-0-2-4-3-5-6-7-8-9-10-11-12-14-13-15-18-19-17-16 25 */ 26 } catch (Exception e) { 27 e.printStackTrace(); 28 } 29 }); 30 } 31 exec.shutdown(); 32 } 33 34 private static void fun(int threadNum) throws Exception { 35 System.out.print(threadNum + " "); 36 Thread.sleep(1000); 37 } 38 39 } 40 /* 41 0 1 2-3 4 5-6 8 7-10 11 9-12 14 13-15 16 17-18 19 42 */
1 package jcip; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 import java.util.concurrent.TimeUnit; 7 8 public class SemaphoreTest2 { 9 private final static int threadCount = 20; 10 11 public static void main(String[] args) throws Exception { 12 ExecutorService exec = Executors.newCachedThreadPool(); 13 final Semaphore semaphore = new Semaphore(3); 14 for (int i = 0; i < threadCount; i++) { 15 final int threadNum = i; 16 exec.execute(() -> { 17 try { 18 if (semaphore.tryAcquire()) { //尝试获取一个许可 19 fun(threadNum); 20 semaphore.release(); //释放一个许可 21 } 22 /* 23 if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { //尝试获取一个许可 24 fun(threadNum); 25 semaphore.release(); //释放一个许可 26 } 27 //对应结果为1 3 0-2 4 6-7 5 8-9 11 12-10 13 15-16 28 */ 29 } catch (Exception e) { 30 e.printStackTrace(); 31 } 32 }); 33 } 34 exec.shutdown(); 35 } 36 37 private static void fun(int threadNum) throws Exception { 38 System.out.print(threadNum + " "); 39 Thread.sleep(1000); 40 } 41 42 43 } 44 /* 45 0 2 1 46 */