有趣的时间轮
前言
如果有一个需求,一个任务执行失败后需要重试,重试的次数和时间点是可配置的,那么需要怎么实现呢?一个不错的方法就是通过 时间轮 来实现。
时间轮是什么
时间轮(Time Wheel)是一种常见的定时任务调度算法。它通过将一段时间划分为若干个离散的时间槽,并按照固定的时间间隔顺时针转动,来触发和执行在特定时间点上应该执行的任务。
常见时间轮应用场景
-
定时任务调度:时间轮非常适合用于定时任务的调度。通过将任务按照触发时间划分到不同的槽中,可以实现对任务的精确触发和执行。例如,在分布式系统中,可以使用时间轮来实现定时任务的触发和调度。
-
超时管理:在网络通信或分布式系统中,经常需要管理请求的超时情况。时间轮可以用于管理和处理超时任务。每个槽可以存放一个超时请求,并在达到超时时间时触发相应的操作,例如重新发送请求或进行异常处理。
-
定时器:时间轮可以用于实现定时器功能。通过将计时任务加入时间轮的相应槽中,可以在预定的时间点触发执行定时任务。定时器广泛应用于各种需求,如批量处理、定时提醒、定时数据刷新等。
-
调度器:时间轮可用于实现任务调度器。通过将任务根据其优先级划分到不同的槽中,可以实现按优先级顺序触发执行任务。这在一些需要优先处理紧急任务的场景中非常有用。
-
缓存失效管理:在缓存系统中,需要管理缓存的失效时间。时间轮可以用于管理和处理缓存失效任务。每个槽可以存放一个缓存失效项,并在失效时间到达时触发相应的操作,例如更新缓存或重新加载数据。
通过合理使用时间轮,可以提高系统的任务调度效率和执行精度。
定时任务调度来看时间轮
xxl-job 应该大多数小伙伴都接触过,xxl-job 是一个分布式任务调度平台。其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
我们来看看时间轮在 xxl-job 中是如何使用的。
xxl-job中的时间轮
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
没错这行代码就是 xxl-job 中的时间轮,本质就是一个 ConcurrentHashMap
,key 为执行的秒数,value 为要执行的 job 的 id列表。
那么ConcurrentHashMap
中的数据是如何维护,如何管理的呢?
我们来看一下 xxl-job 中是如何做的, JobScheduleHelper
类。
xxl-job 源码
private Thread scheduleThread;
private Thread ringThread;
xxl-job 中用了两个线程:
- scheduleThread 线程:预读,计算下一次调度时间,过期任务根据配置策略处理,过期5秒内任务放入线程池,未过期任务放入时间轮。
- ringThread 线程:时间轮调度,时间轮转动触发任务调度。
scheduleThread线程
源码有些多,只保留了一些主要的代码。
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
// 时间对齐,因为预读读的是每次读取现在开始的未来5s内的任务
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// 计算预读数量: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// 扫描任务开始时间
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// 悲观锁。集群部署的话可能存在性能瓶颈
preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
// 预读开始
long nowTime = System.currentTimeMillis();
// 查询任务信息表,当前时间为基准,读未来5s需要执行的任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList != null && scheduleList.size() > 0) {
// 2、push time-ring
for (XxlJobInfo jobInfo : scheduleList) {
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、当前任务过期大于5s
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、匹配过期策略
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// 执行策略
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
}
// 2、计算下一次调度时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、当前任务过期小于5s
// 1、生成任务线程放入线程池
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
// 2、计算下一次调度时间
refreshNextValidTime(jobInfo, new Date());
// 下一次调度时间在5s内
if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、计算时间轮槽
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
// 2、任务放入时间轮
pushTimeRing(ringSecond, jobInfo.getId());
// 3、计算下一次调度时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、任务未过期
// 1、计算时间轮槽
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
// 2、任务放入时间轮
pushTimeRing(ringSecond, jobInfo.getId());
// 3、计算下一次调度时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、更新任务的调度信息
for (XxlJobInfo jobInfo : scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
long cost = System.currentTimeMillis() - start;
if (cost < 1000) { // scan-overtime, not wait
// 本次调度执行时间小于1s,时间对齐,再进入下一轮调度
TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
ringThread线程
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// 时间对齐
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
// 获取当前时间的秒
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 时间轮调度
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// 调度
for (int jobId: ringItemData) {
// 执行调度
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// 清除,避免重复执行
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
简单时间轮(Simple Time Wheel):
简单时间轮由一个环形数组和一个指针组成,每个槽存放一个链表。指针在固定时间间隔内按顺时针方向转动一个槽,触发并执行对应槽的任务。这种时间轮适用于任务触发比较频繁的场景。上文分析的 xxl-job 的时间轮就是这种。
时间刻度问题
现在如果按 1 秒为一个时间刻度,那么一天会有 86400 个刻度,如果添加一个任务 80000 秒后执行,那么其中大部分轮询都是空轮询,而且会浪费内存空间(每个时间刻度都有自己的任务队列)。
那么要怎么解决这个问题呢?
带round时间轮
在每个任务中添加一个 round 属性,时间轮每移动一个时间刻度,遍历任务队列取出 round = 0 的任务执行,然后将其余的任务 round - 1。
解决了内存空间的问题,不需要创建那么多的任务队列,但是每次转动都要扫描任务队列的所有任务,耗时是个问题。
分层时间轮(Hierarchical Time Wheel):
分层时间轮是对简单时间轮的扩展,将时间轮按照不同的精度划分为多个层级(Level)。每个层级的时间轮比前一个层级的时间间隔更大。当低层级的时间轮触发时,在高层级的时间轮中添加对应的任务,实现任务的延迟触发。这种时间轮适用于需要支持较长时间范围的任务调度,同时具有较高的触发精度。任务通过升级和降级来转移队列中的位置。Caffeine
中就用到了这种轮。
时间堆(Time Heap):
时间堆是通过优先队列实现的时间轮。任务按照触发时间的先后顺序插入到优先队列中,每次从队列中取出最近触发的任务执行。新增任务和删除任务的时间复杂度为 O(log n),适用于任务触发比较少且需要精确触发时间的场景。
时间轮与时间堆结合:
有些实现方式会将时间轮和时间堆结合起来使用,以平衡触发精度和时间复杂度。例如,可以使用时间轮作为大范围的调度器,支持快速触发任务;而对于需要更精确触发时间的任务,可以使用时间堆进行管理。