亲缘性线程池

   好久没写技术文章了,今天刚好赶上oncall,周六日就直接来公司了,顺带就写写文章。上次写的hotring的java实现和在内存中指针的设计,其实中间搞了好久发现java操作指针还是有一些问题,导致最后没继续写了。

今天介绍一些好玩的东西,亲缘性线程池

先说一个业务场景吧,比如我们业务中常见到的binlog消费,这儿有一个问题,对于某些业务来说,比如一些下单场景需要重新落库,肯定是需要先监听下单insert的消息处理后在进行update的处理,不可能单子都没下就直接处理update了,这种要么我们自己维护一个缓存,要么通过亲缘性线程池处理。

维护缓存其实很难搞的,因为你没法判断这个消息延迟多长时间算正常,总会留一个很hack的时间,业务上也不美观,且引入缓存还要多维护一个逻辑,比如缓存超时、挂了、oom这些怎么处理,都是很头大的。

这段话是我写完后补充的,发现自己写文档能力还是不行,以下代码可能光看代码难以看懂,我把链接放评论区可以自行参考,做项目时如果遇到多线程且考虑顺序性问题可以直接将其当成一个组件使用,最近可能相比之前来说面试偏卷,不是21、22年那会儿随便整俩 xx商城 项目就可以拿offer的时候了,感兴趣在评论区讨论,相比于xx商城来说这个可以帮助大家在面试时有个可以和面试官聊出一个比较亮的点吧,有其他业务场景也可以在评论区讨论,我做电商的,应该和大部分人的简历项目都相关吧哈哈。

亲缘性线程池

通过维护一个kv映射关系,将同样属性的任务放进一个队列中,
使用单个线程执行单个队列中的任务,达到亲缘性的目的。

其实说起来挺简单,自己使用一个数组维护线程池简单取模就可以实现,类似这样

执行流程 alt

类文件关系 虚线是建造器,实线代表实现或者继承关系 alt

先看一个简单的实现

		List<Person> personList = new ArrayList<>();
        personList.add(new Person(1, "a1"));
        personList.add(new Person(2, "b1"));
        personList.add(new Person(1, "a2"));
        personList.add(new Person(2, "b2"));
        personList.add(new Person(3, "c1"));
        personList.add(new Person(3, "c2"));

        ExecutorService[] services = new ExecutorService[10];
        IntStream.range(0, 9)
                .boxed()
                .forEach(i -> services[i] = Executors.newSingleThreadExecutor());
        personList.forEach(person -> {
            services[person.id % 10].execute(() -> {
                System.out.println(Thread.currentThread().getName() + "----" + person.data);
            });
        });

上面就已经简单实现一个亲缘性线程池了,也能保证任务执行的顺序性,但有一个场景无法满足,就是热点问题。

当有热点时数据倾斜导致热点数据延迟增高,因此需要考虑热点问题。

以下就是具体实现了,亲缘性线程池原理挺简单,主要是实现需要考虑锁竞争等问题比较复杂。

先定义一个亲缘key的接口,方便扩展实现

KeyAffinity

// 需要继承资源关闭和迭代器功能,在具体实现中方便管理特定的键关联性行为
public interface KeyAffinity<K, V> extends AutoCloseable, Iterable<V> {

    boolean init();
}

KeyAffinityExecutor

public interface KeyAffinityExecutor<K> extends KeyAffinity<K, ListeningExecutorService> {
 int DEFAULT_QUEUE_SIZE = 100;
  
  /**\
     * 创建亲缘性线程池
     * @param parallelism 并发度
     * @param queueSize 队列长度
     * @param threadName 线程名
     * @return KeyAffinityExecutor
     * @param <K> 定义亲缘key
     */
    static <K> KeyAffinityExecutor<K> newSerializerExecutor(int parallelism, int queueSize, String threadName) {
        return newKeyAffinityExecutor()
                .parallelism(parallelism)
                .executor(executor(threadName, queueSize))
                .build();
    }
}

这儿为了使代码美观用了建造者模式 executor()方法分两个,内部的是构建亲缘key对应的线程池,外部则是构建我们的亲缘线程池,模型其实是这样

affinityKey1 -> Executors1
affinityKey2 -> Executors2
affinityKey3 -> Executors3

每个亲缘key对应一个属于自己的线程池,这样这样可以借助官方的线程池给我们实现队列、每个key对应的线程数等(当前线程池核心线程数和最大线程数都是1,如果不是1无法保证顺序性,但后续准备用Future进行异步优化,实现多线程的顺序性)

// 创建线程池
 public static Supplier<ExecutorService> executor(String threadName, int queueBufferSize) {
        return new Supplier<>() {
            private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                    .setNameFormat(threadName)
                    .build();

            @Override
            public ExecutorService get() {
                LinkedBlockingQueue<Runnable> queue;
                if (queueBufferSize > 0) {
                    queue = new LinkedBlockingQueue<>(queueBufferSize) {
                        @Override
                        public boolean offer(Runnable runnable) {
                            try {
                                put(runnable);
                                return true;
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            return false;
                        }
                    };
                } else {
                    queue = new LinkedBlockingQueue<>();
                }
                // 这儿返回的都是单线程容量为1的线程池
                return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, threadFactory);
            }
        };
    }

创建后就是执行,由于我们把key作为亲缘key判断哪个线程池执行当前任务,因此key最好实现equlas、hashcode等方法方便作比较。

KeyAffinityExecutorRelation

 public void executeAffinity(K key, ThrowableRunnable<Exception> task) {
        requireNonNull(task);
        ThrowableRunnable<Exception> finalTask;
   		// 如果设置了skipDuplicate为true,则直接跳过重复任务,仅仅执行最新任务
        if (skipDuplicate) {
            Callable<Void> wrapCallable = wrapSkipTask(key, () -> {
                task.run();
                return null;
            });
            if (wrapCallable == null) {
                return;
            }
            finalTask = wrapCallable::call;
        } else {
            finalTask = task;
        }
        // 获取具体ExecutorService,核心就在select方法进行数据倾斜处理
        ListeningExecutorService service = select(key);
        boolean addCallback = false;
        try {
            service.execute(() -> {
                try {
                    finalTask.run();
                } catch (Throwable throwable) {
                    throwIfUnchecked(throwable);
                    throw new UncheckedExecutionException(throwable);
                }
            });
            addCallback = true;
        } finally {
            if (!addCallback) {
                finishCall(key);
            }
        }
    }



// wrapSkipTask 当前方法主要是进行任务包装成Callable并进行替换,保障每次获取最新任务
private <T> Callable<T> wrapSkipTask(K key, Callable<T> task) {
  		// 用数组主要是消除stream的不可变特性
        boolean[] firstAdd = {false};
        SubstituentCallable<?> result = substituentTaskMap.compute(key, (k, v) -> {
          	// 如果当前任务是null,则创建一个新的对象
            if (v == null) {
                v = new SubstituentCallable<>(key, task);
                firstAdd[0] = true;
            } else {
              	// 否则就替换为最新的任务
                v.callable = (Callable) task;
            }
            return v;
        });
        if (firstAdd[0]) {
            return (Callable<T>) result;
        }
        return null;
    }

KeyAffinityRelation

public V select(K key) {
        int thisCount = count.getAsInt();
  		// tryCheckValueRefCount 该逻辑就不细过了,主要是判断当前并发量是否高于任务数
  		//   1. 如果高于,则直接将任务添加到一个list中等待处理
  		//   2. 如果低于,则需要将多余元素进行移除,然后调用一个新的线程处理移除操作
  		//   3. 如果等于0则直接返回
        tryCheckValueRefCount(thisCount);
        KeyRef ref = mapping.compute(key, (k, v) -> {
            if (v == null) {
                if (usingRandom.test(thisCount)) {
                    do {
                        try {
                          	// 如果value为null,且使用随机生成策略,则随机生成一个KeyRef存储在内存的map中,然后返回该任务
                            v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size())));
                        } catch (IndexOutOfBoundsException e) {
                            // ignore
                        }
                    } while (v == null);
                } else {
                  	// 否则则是找到并发度最小的keyRef进行返回
                    v = all.stream()
                            .min(Comparator.comparing(ValueRef::concurrency))
                            .map(KeyRef::new)
                            .orElseThrow(IllegalStateException::new);
                }
            }
            v.incrConcurrency();
            return v;
        });
        return ref.ref();
    }

以上的ValueRef、KeyRef都是对任务value进行包装的一个包装类,这么写的好处就是可以方便记录当前value、key的执行次数,也就是并发度。

上面是核心,具体代码行数较多就仅仅展示核心流程,感兴趣的可以去评论区上github clone下来看,当前有很多锁操作,且移除多余的任务那儿可能会导致new Thread()量很大导致线程爆炸,这儿都是可以优化的点,感兴趣一起交流。

以下就是自己写的一些用到的工具类方法了

public static boolean shutdownAndAwaitTermination(ExecutorService service, long timeout, TimeUnit unit) {
        long halfTimeout = unit.toNanos(timeout) / 2;
        service.shutdown();
        try {
            if (!service.awaitTermination(halfTimeout, TimeUnit.NANOSECONDS)) {
                service.shutdownNow();
                catchingIfFalse(service.awaitTermination(halfTimeout, TimeUnit.NANOSECONDS),
                        () -> logger.error("Oops. awaitTermination error"));
            }
        } catch (InterruptedException exception) {
            Thread.currentThread().interrupt();
            service.shutdown();
        }
        return service.isTerminated();
    }


public static void catchingIfFalse(boolean condition, Runnable runnable) {
        try {
            if (!condition) {
                runnable.run();
            }
        } catch (Exception e) {
            logger.error("[fail safe] ", e);
        }
    }


public static <T> CloseableSupplier<T> lazy(Supplier<T> delegate, boolean resetAfterCLose) {
        if (delegate instanceof CloseableSupplier) {
            return (CloseableSupplier<T>) delegate;
        }
        requireNonNull(delegate);
        return new CloseableSupplier<>(delegate, resetAfterCLose);
    }

看测试结果

亲缘性线程池执行结果:

alt

普通线程池执行结果: alt

可以看到普通线程是个无序的,这儿看上去有序是因为程序较为简单,但从线程异步结果可以看出来是无序的,但亲缘线程池可以保证同一个key分发到对应的线程上面去。

#Java##项目##面试内推#
全部评论
有点像RocketMQ的顺序消费,只不过RocketMQ会有rebalance过程,这个直接内部线程消费的感觉
1 回复 分享
发布于 2024-07-28 21:08 北京

相关推荐

点赞 评论 收藏
分享
死在JAVA的王小美:哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈哈,我也是,让我免了一轮,但是硬气拒绝了
点赞 评论 收藏
分享
评论
8
5
分享
牛客网
牛客企业服务