Netty源码—10.Netty工具之时间轮
大纲
1.什么是时间轮
2.HashedWheelTimer是什么
3.HashedWheelTimer的使用
4.HashedWheelTimer的运行流程
5.HashedWheelTimer的核心字段
6.HashedWheelTimer的构造方法
7.HashedWheelTimer添加任务和执行任务
8.HashedWheelTimer的完整源码
9.HashedWheelTimer的总结
10.HashedWheelTimer的应用
1.什么是时间轮
简单来说,时间轮是一个高效利用线程资源进行批量化调度的调度器。首先把大批量的调度任务全部绑定到同一个调度器上,然后使用这个调度器对所有任务进行管理、触发、以及运行,所以时间轮能高效管理各种延时任务、周期任务、通知任务。
时间轮是以时间作为刻度组成的一个环形队列,所以叫做时间轮。这个环形队列通过一个HashedWheelBucket[]数组来实现,数组的每个元素称为槽,每个槽可以存放一个定时任务列表,叫HashedWheelBucket。HashedWheelBucket是一个双向链表,链表的每个节点表示一个定时任务项HashedWheelTimeout。在HashedWheelTimeout中封装了真正的定时任务TimerTask。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度ticketDuration,其中时间轮的时间格的个数是固定的。
如下图示,有16个时间格(槽),假设每个时间格的单位是1s,那么整个时间轮走完一圈需要16s。每秒钟(即时间格的单位也可以为1ms、1min、1h等)指针会沿着顺时针方向转动一格。通过指针移动来获得每个时间格中的任务列表,然后遍历这个时间格内的双向链表的每个任务并执行,依此循环。
2.HashedWheelTimer是什么
Netty的HashedWheelTimer是一个粗略的定时器实现,之所以称为粗略的实现是因为该时间轮并没有严格准时地执行定时任务,而是在每隔一个时间间隔之后的时间节点执行,并执行当前时间节点之前到期的定时任务。
不过具体的定时任务的时间执行精度,可以通过调节HashedWheelTimer构造方法的时间间隔的大小来进行调节。在大多数网络应用的情况下,由于IO延迟的存在,所以并不会严格要求具体的时间执行精度。因此默认100ms的时间间隔可以满足大多数情况,不需要再花精力去调节该时间精度。
3.HashedWheelTimer的使用
public class HashedWheelTimerTest { //构建HashedWheelTimer时间轮 //最后通过HASHED_WHEEL_TIMER.newTimeout()方法把需要延迟执行的任务添加到时间轮中 private static final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer( new DefaultThreadFactory("demo-timer"),//threadFactory参数表示创建处理任务的线程工厂 100,//tickDuration参数表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格 TimeUnit.MILLISECONDS, 512,//ticksPerWheel参数表示时间轮上一共有多少个时间格,分配的时间格越多,占用内存空间就越大,这里是512 true//leakDetection参数表示是否开启内存泄漏检测 ); public static void main(String[] args) { System.out.println("延时任务提交"); //延时多久执行 long delay = 10L; HASHED_WHEEL_TIMER.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("延时任务触发"); } }, delay, TimeUnit.SECONDS); } }
4.HashedWheelTimer的运行流程
步骤一:初始化时间轮
步骤二:启动时间轮
步骤三:添加任务,保存到延时任务队列
步骤四:时间轮指针休眠阻塞,实现转动
步骤五:休眠结束,指针指向下一个时间格(槽)
步骤六:将已经取消的任务从对应的槽中移除
步骤七:将延时任务队列的任务添加到对应的槽中
步骤八:执行时间轮指针指向当前槽的到期任务
5.HashedWheelTimer中的关键字段
字段一:wheel
wheel是一个HashedWheelBucket数组,默认的数组大小是512。可以认为wheel是一个TimerTask的哈希表,它的哈希函数是任务的截止日期。所以每个时间轮的时间格数ticksPerWheel默认是512。
字段二:tickDuration
时间格跨度,默认100ms。
字段三:ticksPerWheel
时间轮的格子数,默认512。
字段四:maxPendingTimeouts
时间轮中任务的最大数量。
字段五:deadline
延时任务的截止时间,值为当前时间 + 延时任务的延时时间 - 时间轮启动时间。
字段六:tick
时间轮启动以来指针总的转动次数。
字段七:remainingRounds
槽中延时任务剩余的圈(轮)数,为0时则表示需要执行延时任务了。
6.HashedWheelTimer的构造方法
步骤一:构造参数校验及给实际执行延时任务的线程池taskExecutor赋值
步骤二:将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂
步骤三:初始化HashedWheelBucket数组wheel
步骤四:校验tickDuration和ticksPerWheel
步骤五:创建工作线程workerThread,用于指针转动和触发执行时间格里的延时任务
步骤六:给时间轮中延时任务的最大数量maxPendingTimeouts赋值
步骤七:检查HashedWheelTimer的实例数量,如果大于64则打印error日志
//4.1.73.Final public class HashedWheelTimer implements Timer { private final HashedWheelBucket[] wheel; private final int mask; private final long tickDuration; private final Thread workerThread; private final ResourceLeakTracker<HashedWheelTimer> leak; private final Worker worker = new Worker(); private final long maxPendingTimeouts; private final Executor taskExecutor; ... //Creates a new timer. //@param threadFactory 创建线程的工厂 //@param tickDuration 每格的时间间隔,默认100ms,0.1秒 //@param unit 时间单位,默认为毫秒 //@param ticksPerWheel 时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算 //@param leakDetection 如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略 //@param maxPendingTimeouts 最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1 //@param taskExecutor 任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它 //@throws NullPointerException if either of threadFactory and unit is null //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0 public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值 checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算 //3.初始化时间轮wheel wheel = createWheel(ticksPerWheel); //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能 mask = wheel.length - 1; //4.校验tickDuration和ticksPerWheel //Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); //防止溢出 //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //tickDuration不能小于1ms if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; //6.给时间轮中任务的最大数量maxPendingTimeouts赋值 this.maxPendingTimeouts = maxPendingTimeouts; //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } //初始化时间轮环形数组 //@param ticksPerWheel private static HashedWheelBucket[] createWheel(int ticksPerWheel) { //ticksPerWheel不能大于2^30 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel"); //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); //创建时间轮环形数组 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } ... }
7.HashedWheelTimer添加任务和执行任务
(1)添加延时任务
(2)执行延时任务
(1)添加延时任务
步骤一:将需要执行的延时任务数pendingTimeouts+1
步骤二:如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
步骤三:启动工作线程,也就是启动时间轮
步骤四:计算被添加的延时任务的截止时间=当前时间+当前任务执行的延迟时间-时间轮启动的时间
步骤五:创建延时任务实例HashedWheelTimeout
步骤六:将延时任务实例添加到延时任务队列timeouts中
注意:添加时会将延时任务添加到延时任务队列timeouts中。这个延时任务队列timeouts将会在下一个滴答声中进行处理(指针的下一次转动)。在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket。
public class HashedWheelTimer implements Timer { private final AtomicLong pendingTimeouts = new AtomicLong(0);//需要执行的延时任务数 private final long maxPendingTimeouts; private volatile long startTime; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//延时任务队列 ... //添加延时任务 //@param task 任务 //@param delay 延时时间 //@param unit 延时时间单位 @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { checkNotNull(task, "task"); checkNotNull(unit, "unit"); //1.将需要执行的延时任务数pendingTimeouts + 1 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } //3.启动工作线程,即启动时间轮 start(); //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动) //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } //5.创建延时任务实例HashedWheelTimeout HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //6.将延时任务实例添加到延时任务队列中 timeouts.add(timeout); return timeout; } //Starts the background thread explicitly. //The background thread will start automatically on demand even if you did not call this method. //@throws IllegalStateException if this timer has been #stop() stopped already public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //启动工作线程,即启动时间轮 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } //Wait until the startTime is initialized by the worker. while (startTime == 0) { try { //阻塞时间轮的工作线程 startTimeInitialized.await(); } catch (InterruptedException ignore) { //Ignore - it will be ready very soon. } } } ... }
(2)执行延时任务
步骤一:记录时间轮启动的时间startTime
步骤二:开始do while循环,唤醒被阻塞的start()方法,通知时间轮已经启动完毕
步骤三:阻塞等待下一次指针转动的时间
步骤四:计算当前指针指向的时间轮的槽位idx
步骤五:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
步骤六:获取当前指针指向的时间槽HashedWheelBucket
步骤七:遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中,根据延时时间计算对应的时间槽和remainingRounds圈数
步骤八:运行目前指针指向的时间槽中的链表的任务,通过taskExecutor线程池去执行到期的任务
步骤九:到期的和取消的延时任务从链表中移除并将pendingTimeouts--
步骤十:时间轮指针的总转动次数tick++,继续do while循环
步骤十一:清除时间轮中不需要处理的任务,保存到unprocessedTimeouts中
步骤十二:将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
步骤十三:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
public class HashedWheelTimer implements Timer { private volatile long startTime; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); ... //指针转动和执行延时任务的线程 private final class Worker implements Runnable { //用于记录未执行的延时任务 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); //总的tick数(指针嘀嗒的次数) private long tick; @Override public void run() { //1.记录时间轮启动的时间startTime startTime = System.nanoTime(); if (startTime == 0) { //我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0 startTime = 1; } //2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕 startTimeInitialized.countDown(); //一直执行do while循环,直到时间轮被关闭 do { //3.阻塞等待下一次指针转动的时间 //这里会休眠tick的时间,模拟指针走动 final long deadline = waitForNextTick(); if (deadline > 0) { //4.计算当前指针指向的时间轮槽位idx int idx = (int) (tick & mask); //5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1 processCancelledTasks(); //6.获取当前指针指向的时间槽HashedWheelBucket HashedWheelBucket bucket = wheel[idx]; //7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中 transferTimeoutsToBuckets(); //8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务 //9.到期的和取消的延时任务从链表中移除并将pendingTimeouts-- bucket.expireTimeouts(deadline); //10.时间轮指针的总转动次数tick++ tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); //Fill the unprocessedTimeouts so we can return them from stop() method. //11.清除时间轮中不需要处理的任务 for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } //12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中 //遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中 for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { //如果延时任务没被取消,记录到未执行的任务Set集合中 unprocessedTimeouts.add(timeout); } } //13.处理被取消的任务 processCancelledTasks(); } //将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置 //也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中 private void transferTimeoutsToBuckets() { //每次转移10w个延时任务 for (int i = 0; i < 100000; i++) { //从队列中出队一个延时任务 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { //all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { //Was cancelled in the meantime. continue; } //到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔 long calculated = timeout.deadline / tickDuration; //tick已经走了的时间格,到期一共还需要需要走多少圈 timeout.remainingRounds = (calculated - tick) / wheel.length; //如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行 final long ticks = Math.max(calculated, tick); //槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); //根据索引该任务应该放到的槽 HashedWheelBucket bucket = wheel[stopIndex]; //将任务添加到槽中,链表末尾 bucket.addTimeout(timeout); } } //处理取消掉的延时任务 //将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1 private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { //all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } //从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来 private long waitForNextTick() { //deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔 long deadline = tickDuration * (tick + 1); for (;;) { //计算当前时间距离启动时间的时间间隔 final long currentTime = System.nanoTime() - startTime; //距离下一次指针跳动还需休眠多长时间 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //到了指针调到下一个槽位的时间 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } try { //表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } //记录未执行的延时任务 public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } } private static final class HashedWheelBucket { ... public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; //process all timeouts while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { //通过线程池执行任务 timeout.expire(); } else { //The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds --; } timeout = next; } } public HashedWheelTimeout remove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; //remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { //if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } else if (timeout == tail) { //if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } //null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; timeout.timer.pendingTimeouts.decrementAndGet(); return next; } ... } private static final class HashedWheelTimeout implements Timeout, Runnable { private final TimerTask task; private final HashedWheelTimer timer; ... public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { timer.taskExecutor.execute(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t); } } } ... } ... }
8.HashedWheelTimer的完整源码
//Netty时间轮 public class HashedWheelTimer implements Timer { static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class); private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean(); private static final int INSTANCE_COUNT_LIMIT = 64; private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1); private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1); private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); private final ResourceLeakTracker<HashedWheelTimer> leak; //指针转动和延时任务执行的线程 private final Worker worker = new Worker(); //worker任务封装的工作线程,用于指针转动和触发时间格里的延时任务的执行 private final Thread workerThread; public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; @SuppressWarnings({"unused", "FieldMayBeFinal"}) private volatile int workerState;//0 - init, 1 - started, 2 - shut down //每个时间格的时间跨度,默认为100ms private final long tickDuration; //时间轮(环形数组),HashedWheelBucket为每个时间格的槽 private final HashedWheelBucket[] wheel; private final int mask; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); //延时任务队列,队列中为等待被添加到时间轮的延时任务 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); //保存已经取消的延时任务的队列 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue(); //记录当前的任务数 private final AtomicLong pendingTimeouts = new AtomicLong(0); //最大的任务数 private final long maxPendingTimeouts; //执行延时任务的线程池 private final Executor taskExecutor; //工作线程启动时间 private volatile long startTime; ////////////////////////// 构造器 start ////////////////////////// public HashedWheelTimer() { this(Executors.defaultThreadFactory()); } public HashedWheelTimer(long tickDuration, TimeUnit unit) { this(Executors.defaultThreadFactory(), tickDuration, unit); } public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) { this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); } //使用默认的tickDuration(时间格跨度默认为100ms)和默认的ticksPerWheel(时间格总数默认为512)创建一个新的计时器(时间轮) public HashedWheelTimer(ThreadFactory threadFactory) { this(threadFactory, 100, TimeUnit.MILLISECONDS); } public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) { this(threadFactory, tickDuration, unit, 512); } public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { this(threadFactory, tickDuration, unit, ticksPerWheel, true); } public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1); } public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, maxPendingTimeouts, ImmediateExecutor.INSTANCE); } //Creates a new timer. //@param threadFactory 创建线程的工厂 //@param tickDuration 每格的时间间隔,默认100ms,0.1秒 //@param unit 时间单位,默认为毫秒 //@param ticksPerWheel 时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算 //@param leakDetection 如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略 //@param maxPendingTimeouts 最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1 //@param taskExecutor 任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它 //@throws NullPointerException if either of threadFactory and unit is null //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0 public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值 checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算 //3.初始化时间轮wheel wheel = createWheel(ticksPerWheel); //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能 mask = wheel.length - 1; //4.校验tickDuration和ticksPerWheel //Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); //防止溢出 //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //tickDuration不能小于1ms if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; //6.给时间轮中任务的最大数量maxPendingTimeouts赋值 this.maxPendingTimeouts = maxPendingTimeouts; //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } ////////////////////////// 构造器 end ////////////////////////// @Override protected void finalize() throws Throwable { try { super.finalize(); } finally { //This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. //If we have not yet shutdown then we want to make sure we decrement the active instance count. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); } } } //初始化时间轮环形数组 //@param ticksPerWheel private static HashedWheelBucket[] createWheel(int ticksPerWheel) { //ticksPerWheel不能大于2^30 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel"); //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); //创建时间轮环形数组 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } //将ticksPerWheel(时间轮上的时间格数)向上取值为2的次幂 private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; } //显式启动后台线程 //即使没有调用此方法,后台线程也会按需自动启动 //Starts the background thread explicitly. //The background thread will start automatically on demand even if you did not call this method. //@throws IllegalStateException if this timer has been #stop() stopped already public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //启动工作线程,即启动时间轮 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } //Wait until the startTime is initialized by the worker. while (startTime == 0) { try { //阻塞时间轮的工作线程 startTimeInitialized.await(); } catch (InterruptedException ignore) { //Ignore - it will be ready very soon. } } } @Override public Set<Timeout> stop() { if (Thread.currentThread() == workerThread) { throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { //workerState can be 0 or 2 at this moment - let it always be 2. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return Collections.emptySet(); } try { boolean interrupted = false; while (workerThread.isAlive()) { workerThread.interrupt(); try { workerThread.join(100); } catch (InterruptedException ignored) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return worker.unprocessedTimeouts(); } //添加延时任务 //@param task 任务 //@param delay 延时时间 //@param unit 延时时间单位 @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { checkNotNull(task, "task"); checkNotNull(unit, "unit"); //1.将需要执行的延时任务数pendingTimeouts + 1 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } //3.启动工作线程,即启动时间轮 start(); //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动) //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } //5.创建延时任务实例HashedWheelTimeout HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //6.将延时任务实例添加到延时任务队列中 timeouts.add(timeout); return timeout; } //Returns the number of pending timeouts of this Timer. public long pendingTimeouts() { return pendingTimeouts.get(); } private static void reportTooManyInstances() { if (logger.isErrorEnabled()) { String resourceType = simpleClassName(HashedWheelTimer.class); logger.error("You are creating too many " + resourceType + " instances. " + resourceType + " is a shared resource that must be reused across the JVM, " + "so that only a few instances are created."); } } //指针转动和延时任务执行的线程 private final class Worker implements Runnable { //用于记录未执行的延时任务 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); //总的tick数(指针嘀嗒的次数) private long tick; @Override public void run() { //1.记录时间轮启动的时间startTime startTime = System.nanoTime(); if (startTime == 0) { //我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0 startTime = 1; } //2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕 startTimeInitialized.countDown(); //一直执行do while循环,直到时间轮被关闭 do { //3.阻塞等待下一次指针转动的时间 //这里会休眠tick的时间,模拟指针走动 final long deadline = waitForNextTick(); if (deadline > 0) { //4.计算当前指针指向的时间轮槽位idx int idx = (int) (tick & mask); //5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1 processCancelledTasks(); //6.获取当前指针指向的时间槽HashedWheelBucket HashedWheelBucket bucket = wheel[idx]; //7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中 transferTimeoutsToBuckets(); //8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务 //9.到期的和取消的延时任务从链表中移除并将pendingTimeouts-- bucket.expireTimeouts(deadline); //10.时间轮指针的总转动次数tick++ tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); //Fill the unprocessedTimeouts so we can return them from stop() method. //11.清除时间轮中不需要处理的任务 for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } //12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中 //遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中 for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { //如果延时任务没被取消,记录到未执行的任务Set集合中 unprocessedTimeouts.add(timeout); } } //13.处理被取消的任务 processCancelledTasks(); } //将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置 //也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中 private void transferTimeoutsToBuckets() { //每次转移10w个延时任务 for (int i = 0; i < 100000; i++) { //从队列中出队一个延时任务 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { //all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { //Was cancelled in the meantime. continue; } //到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔 long calculated = timeout.deadline / tickDuration; //tick已经走了的时间格,到期一共还需要需要走多少圈 timeout.remainingRounds = (calculated - tick) / wheel.length; //如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行 final long ticks = Math.max(calculated, tick); //槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); //根据索引该任务应该放到的槽 HashedWheelBucket bucket = wheel[stopIndex]; //将任务添加到槽中,链表末尾 bucket.addTimeout(timeout); } } //处理取消掉的延时任务 //将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1 private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { //all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } //从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来 private long waitForNextTick() { //deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔 long deadline = tickDuration * (tick + 1); for (;;) { //计算当前时间距离启动时间的时间间隔 final long currentTime = System.nanoTime() - startTime; //距离下一次指针跳动还需休眠多长时间 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //到了指针调到下一个槽位的时间 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } try { //表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } //记录未执行的延时任务 public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } } //延时任务 private static final class HashedWheelTimeout implements Timeout, Runnable { private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; private static final int ST_EXPIRED = 2; private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); private final HashedWheelTimer timer; private final TimerTask task; //任务执行的截止时间 = 当前时间 + 延时任务延时时间 - 时间轮启动时间 private final long deadline; @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) private volatile int state = ST_INIT; //剩下的圈(轮)数 //remainingRounds将由Worker.transferTimeoutsToBuckets()在HashedWheelTimeout被添加到正确的HashedWheelBucket之前计算和设置 long remainingRounds; //HashedWheelTimerBucket槽中的延时任务列表是一个双向链表 //因为只有workerThread会对它进行操作,所以不需要 synchronization / volatile HashedWheelTimeout next; HashedWheelTimeout prev; //当前延时任务所插入时间轮的哪个槽 HashedWheelBucket bucket; HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { this.timer = timer; this.task = task; this.deadline = deadline; } @Override public Timer timer() { return timer; } @Override public TimerTask task() { return task; } @Override public boolean cancel() { //only update the state it will be removed from HashedWheelBucket on next tick. if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } //If a task should be canceled we put this to another queue which will be processed on each tick. //So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way //we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible. timer.cancelledTimeouts.add(this); return true; } void remove() { HashedWheelBucket bucket = this.bucket; if (bucket != null) { bucket.remove(this); } else { timer.pendingTimeouts.decrementAndGet(); } } public boolean compareAndSetState(int expected, int state) { return STATE_UPDATER.compareAndSet(this, expected, state); } public int state() { return state; } @Override public boolean isCancelled() { return state() == ST_CANCELLED; } @Override public boolean isExpired() { return state() == ST_EXPIRED; } public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { timer.taskExecutor.execute(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t); } } } @Override public void run() { try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } } @Override public String toString() { final long currentTime = System.nanoTime(); long remaining = deadline - currentTime + timer.startTime; StringBuilder buf = new StringBuilder(192).append(simpleClassName(this)).append('(').append("deadline: "); if (remaining > 0) { buf.append(remaining).append(" ns later"); } else if (remaining < 0) { buf.append(-remaining).append(" ns ago"); } else { buf.append("now"); } if (isCancelled()) { buf.append(", cancelled"); } return buf.append(", task: ").append(task()).append(')').toString(); } } //存放HashedWheelTimeouts的桶 //这些数据存储在一个类似于链表的数据结构中,允许轻松删除中间的hashedwheeltimeout //HashedWheelTimeout本身作为节点,因此不需要创建额外的对象 //保存头结点和尾节点,方便于任务的提取和插入 private static final class HashedWheelBucket { //头结点 private HashedWheelTimeout head; //尾节点 private HashedWheelTimeout tail; //Add HashedWheelTimeout to this bucket. public void addTimeout(HashedWheelTimeout timeout) { assert timeout.bucket == null; timeout.bucket = this; if (head == null) { head = tail = timeout; } else { tail.next = timeout; timeout.prev = tail; tail = timeout; } } //Expire all HashedWheelTimeouts for the given deadline. public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; //遍历当前时间槽中的所有任务 while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { //从链表中移除 next = remove(timeout); if (timeout.deadline <= deadline) { //延时任务到期,执行延时任务 timeout.expire(); } else { //The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } //如果延时任务取消,从链表中移除 } else if (timeout.isCancelled()) { next = remove(timeout); } else { //任务还没到期,剩余的轮数-1 timeout.remainingRounds --; } //将指针放置到下一个延时任务上 timeout = next; } } //删除槽中链表中的延时任务 public HashedWheelTimeout remove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; //remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { //if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } else if (timeout == tail) { //if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } //null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; timeout.timer.pendingTimeouts.decrementAndGet(); return next; } //Clear this bucket and return all not expired / cancelled Timeouts. public void clearTimeouts(Set<Timeout> set) { for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } } //头结点移除 private HashedWheelTimeout pollTimeout() { HashedWheelTimeout head = this.head; if (head == null) { return null; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; } //null out prev and next to allow for GC. head.next = null; head.prev = null; head.bucket = null; return head; } } }
9.HashedWheelTimer的总结
一.时间轮的转动是单线程
但是时间轮中每个时间槽里的延时任务则是由线程池来执行的。
二.延时任务保存到JVM中没有做宕机备份
系统重启时延时任务将会丢失,无法恢复任务进行重新调度。
三.时间轮调度器的时间精度不是很高
对于精度要求特别高的调度任务可能不太适合,因为时间轮的精度取决于时间格的跨度大小。
四.时间轮指针的转动是使用Sleep来完成等待的
10.HashedWheelTimer的应用
(1)时间轮的应用场景
一.Dubbo、Netty、Kafka、Redission等中间件都用到了时间轮机制
二.订单关闭、确认收货、批量定时数据更新等都可以采用时间轮机制
(2)心跳检测
心跳机制会每隔固定的时间发送一个心跳包来检测客户端和服务端的连接状态,客户端发送心跳包用来告诉服务器其还正常运行。
比如在Dubbo中,需要有心跳机制来维持Consumer与Provider的长连接,默认的心跳间隔是60s。当Provider在3次心跳时间内没有收到心跳响应,会关闭连接通道。当Consumer在3次心跳时间内没有收到心跳响应,会进行重连。
在Dubbo的HeaderExchangeClient类中会向时间轮中提交该心跳任务:
一.发送心跳的时间轮
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL );
二.向时间轮中提交心跳任务
private void startHeartBeatTask(URL url) { //Client的具体实现决定是否启动该心跳任务 if (!client.canHandleIdle()) { AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); //计算心跳间隔, 最小间隔不能低于1s int heartbeat = getHeartbeat(url); long heartbeatTick = calculateLeastDuration(heartbeat); //创建心跳任务 this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); //提交到IDLE_CHECK_TIMER这个时间轮中等待执行, 等时间到了时间轮就会去取出该任务进行调度执行 IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); } }
(3)超时处理
在Dubbo中发起RPC调用时,通常会配置超时时间,当消费者调用服务提供者出现超时进行一定的逻辑处理。那么怎么检测任务调用超时了呢?我们可以利用定时任务。
每次发起RPC调用时创建一个Future,记录这个Future的创建时间与超时时间,后台有一个定时任务进行检测。当Future到达超时时间并且没有被处理时,就需要对这个Future执行超时逻辑处理。
(4)Redisson分布式锁续期
Redisson看门狗机制,通过时间轮定时给分布式锁续期。在获取锁成功后,Redisson会封装一个锁续期的延时任务放入到时间轮中。默认10秒检查一下,用于对获取到的锁进行续期,延长持有锁的时间。如果业务机器宕机了,那么续期的延时任务失效,也无法续期,锁会超时释放。
一.添加续期延时任务
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //这边newTimeout点进去发现就是往时间轮中提交了一个任务 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { //续期成功后继续调度, 又往时间轮中放一个续期任务 renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
二.lua续期代码
protected RFuture<Boolean> renewExpirationAsync(long threadId) { //通过lua脚本对锁进行续期 return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId) ); }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等