有趣的时间轮

前言

如果有一个需求,一个任务执行失败后需要重试,重试的次数和时间点是可配置的,那么需要怎么实现呢?一个不错的方法就是通过 时间轮 来实现。

时间轮是什么

时间轮(Time Wheel)是一种常见的定时任务调度算法。它通过将一段时间划分为若干个离散的时间槽,并按照固定的时间间隔顺时针转动,来触发和执行在特定时间点上应该执行的任务。

时间轮.png

常见时间轮应用场景

  • 定时任务调度:时间轮非常适合用于定时任务的调度。通过将任务按照触发时间划分到不同的槽中,可以实现对任务的精确触发和执行。例如,在分布式系统中,可以使用时间轮来实现定时任务的触发和调度。

  • 超时管理:在网络通信或分布式系统中,经常需要管理请求的超时情况。时间轮可以用于管理和处理超时任务。每个槽可以存放一个超时请求,并在达到超时时间时触发相应的操作,例如重新发送请求或进行异常处理。

  • 定时器:时间轮可以用于实现定时器功能。通过将计时任务加入时间轮的相应槽中,可以在预定的时间点触发执行定时任务。定时器广泛应用于各种需求,如批量处理、定时提醒、定时数据刷新等。

  • 调度器:时间轮可用于实现任务调度器。通过将任务根据其优先级划分到不同的槽中,可以实现按优先级顺序触发执行任务。这在一些需要优先处理紧急任务的场景中非常有用。

  • 缓存失效管理:在缓存系统中,需要管理缓存的失效时间。时间轮可以用于管理和处理缓存失效任务。每个槽可以存放一个缓存失效项,并在失效时间到达时触发相应的操作,例如更新缓存或重新加载数据。

通过合理使用时间轮,可以提高系统的任务调度效率和执行精度。

定时任务调度来看时间轮

xxl-job 应该大多数小伙伴都接触过,xxl-job 是一个分布式任务调度平台。其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

我们来看看时间轮在 xxl-job 中是如何使用的。

xxl-job中的时间轮

private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

没错这行代码就是 xxl-job 中的时间轮,本质就是一个 ConcurrentHashMap,key 为执行的秒数,value 为要执行的 job 的 id列表

那么ConcurrentHashMap中的数据是如何维护,如何管理的呢?

我们来看一下 xxl-job 中是如何做的, JobScheduleHelper 类。

xxl-job 源码

private Thread scheduleThread;

private Thread ringThread;

xxl-job 中用了两个线程:

  • scheduleThread 线程:预读,计算下一次调度时间,过期任务根据配置策略处理,过期5秒内任务放入线程池,未过期任务放入时间轮。
  • ringThread 线程:时间轮调度,时间轮转动触发任务调度。

scheduleThread线程

源码有些多,只保留了一些主要的代码。

scheduleThread = new Thread(new Runnable() {
    @Override
    public void run() {
        // 时间对齐,因为预读读的是每次读取现在开始的未来5s内的任务
        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000);

        logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

        // 计算预读数量: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
        int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

        while (!scheduleThreadToStop) {

            // 扫描任务开始时间
            long start = System.currentTimeMillis();

            Connection conn = null;
            Boolean connAutoCommit = null;
            PreparedStatement preparedStatement = null;

            boolean preReadSuc = true;
            
            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
            connAutoCommit = conn.getAutoCommit();
            conn.setAutoCommit(false);

            // 悲观锁。集群部署的话可能存在性能瓶颈
            preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
            preparedStatement.execute();

            // 预读开始
            long nowTime = System.currentTimeMillis();
            // 查询任务信息表,当前时间为基准,读未来5s需要执行的任务
            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
            if (scheduleList != null && scheduleList.size() > 0) {
                // 2、push time-ring
                for (XxlJobInfo jobInfo : scheduleList) {

                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                        // 2.1、当前任务过期大于5s
                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                        // 1、匹配过期策略
                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                            // 执行策略
                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());
                        }

                        // 2、计算下一次调度时间
                        refreshNextValidTime(jobInfo, new Date());

                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                        // 2.2、当前任务过期小于5s

                        // 1、生成任务线程放入线程池
                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId());

                        // 2、计算下一次调度时间
                        refreshNextValidTime(jobInfo, new Date());

                        // 下一次调度时间在5s内
                        if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                            // 1、计算时间轮槽
                            int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

                            // 2、任务放入时间轮
                            pushTimeRing(ringSecond, jobInfo.getId());

                            // 3、计算下一次调度时间
                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                        }

                    } else {
                        // 2.3、任务未过期

                        // 1、计算时间轮槽
                        int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);

                        // 2、任务放入时间轮
                        pushTimeRing(ringSecond, jobInfo.getId());

                        // 3、计算下一次调度时间
                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                    }

                }
                // 3、更新任务的调度信息
                for (XxlJobInfo jobInfo : scheduleList) {
                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                }
            } else {
                preReadSuc = false;
            }
            // tx stop

            long cost = System.currentTimeMillis() - start;

            if (cost < 1000) {  // scan-overtime, not wait
                // 本次调度执行时间小于1s,时间对齐,再进入下一轮调度
                TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000);
            }

        }

        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
    }
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();

image.png

ringThread线程

ringThread = new Thread(new Runnable() {
    @Override
    public void run() {

        while (!ringThreadToStop) {

            // 时间对齐
            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                
            try {
                // second data
                List<Integer> ringItemData = new ArrayList<>();
                // 获取当前时间的秒
                int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   
                // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                for (int i = 0; i < 2; i++) {
                    List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                    if (tmpData != null) {
                        ringItemData.addAll(tmpData);
                    }
                }

                // 时间轮调度
                logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                if (ringItemData.size() > 0) {
                    // 调度
                    for (int jobId: ringItemData) {
                        // 执行调度
                        JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                    }
                    // 清除,避免重复执行
                    ringItemData.clear();
                }
            } catch (Exception e) {
                if (!ringThreadToStop) {
                    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                }
            }
        }
        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
    }
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();

简单时间轮(Simple Time Wheel):

简单时间轮由一个环形数组和一个指针组成,每个槽存放一个链表。指针在固定时间间隔内按顺时针方向转动一个槽,触发并执行对应槽的任务。这种时间轮适用于任务触发比较频繁的场景。上文分析的 xxl-job 的时间轮就是这种。

时间刻度问题

现在如果按 1 秒为一个时间刻度,那么一天会有 86400 个刻度,如果添加一个任务 80000 秒后执行,那么其中大部分轮询都是空轮询,而且会浪费内存空间(每个时间刻度都有自己的任务队列)。

那么要怎么解决这个问题呢?

带round时间轮

在每个任务中添加一个 round 属性,时间轮每移动一个时间刻度,遍历任务队列取出 round = 0 的任务执行,然后将其余的任务 round - 1。

解决了内存空间的问题,不需要创建那么多的任务队列,但是每次转动都要扫描任务队列的所有任务,耗时是个问题。

分层时间轮(Hierarchical Time Wheel):

分层时间轮是对简单时间轮的扩展,将时间轮按照不同的精度划分为多个层级(Level)。每个层级的时间轮比前一个层级的时间间隔更大。当低层级的时间轮触发时,在高层级的时间轮中添加对应的任务,实现任务的延迟触发。这种时间轮适用于需要支持较长时间范围的任务调度,同时具有较高的触发精度。任务通过升级和降级来转移队列中的位置。Caffeine中就用到了这种轮。 edGYujrkkn.jpg

时间堆(Time Heap):

时间堆是通过优先队列实现的时间轮。任务按照触发时间的先后顺序插入到优先队列中,每次从队列中取出最近触发的任务执行。新增任务和删除任务的时间复杂度为 O(log n),适用于任务触发比较少且需要精确触发时间的场景。

时间轮与时间堆结合:

有些实现方式会将时间轮和时间堆结合起来使用,以平衡触发精度和时间复杂度。例如,可以使用时间轮作为大范围的调度器,支持快速触发任务;而对于需要更精确触发时间的任务,可以使用时间堆进行管理。

全部评论

相关推荐

点赞 评论 收藏
分享
躺尸修仙中:因为很多92的也去卷中小厂,反正投递简历不要钱,面试不要钱,时间冲突就推,不冲突就面试积累经验
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务