Netty源码实战(十) - 性能优化

1 性能优化工具类

1.1 FastThreadLocal

1.1.1 传统的ThreadLocal

ThreadLocal最常用的两个接口是set和get
最常见的应用场景为在线程上下文之间传递信息,使得用户不受复杂代码逻辑的影响

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
 t.threadLocals;

我们使用set的时候实际上是获取Thread对象的threadLocals属性,把当前ThreadLocal当做参数然后调用其set(ThreadLocal,Object)方法来设值
threadLocals是ThreadLocal.ThreadLocalMap类型的

每个线程对象关联着一个ThreadLocalMap实例,主要是维护着一个Entry数组
Entry是扩展了WeakReference,提供了一个存储value的地方
一个线程对象可以对应多个ThreadLocal实例,一个ThreadLocal也可以对应多个Thread对象,当一个Thread对象和每一个ThreadLocal***的时候会生成一个Entry,并将需要存储的值存储在Entry的value内

  • 一个ThreadLocal对于一个Thread对象来说只能存储一个值,为Object型
  • 多个ThreadLocal对于一个Thread对象,这些ThreadLocal和线程相关的值存储在Thread对象关联的ThreadLocalMap中
  • 使用扩展WeakReference的Entry作为数据节点在一定程度上防止了内存泄露
  • 多个Thread线程对象和一个ThreadLocal***的时候其实真实数据的存储是跟着线程对象走的,因此这种情况不讨论

我们在看看ThreadLocalMap#set:

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

for (Entry e = tab[i];
     e != null;
     e = tab[i = nextIndex(i, len)]) {
    ThreadLocal k = e.get();
    if (k == key) {
        e.value = value;
        return;
    }
    if (k == null) {
        replaceStaleEntry(key, value, i);
        return;
    }
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
    rehash();

每个ThreadLocal实例都有一个唯一的threadLocalHashCode初始值
上面首先根据threadLocalHashCode值计算出i,有下面两种情况会进入for循环:

  • 由于threadLocalHashCode &(len-1)对应的槽有内容,因此满足tab[i]!=null条件,进入for循环,如果满足条件且当前key不是当前threadlocal只能说明hash冲突了
  • ThreadLocal实例之前被设置过,因此满足tab[i]!=null条件,进入for循环

进入for循环会遍历tab数组,如果遇到以当前threadLocal为key的槽,即上面第(2)种情况,有则直接将值替换
如果找到了一个已经被回收的ThreadLocal对应的槽,也就是当key==null的时候表示之前的threadlocal已经被回收了,但是value值还存在,这也是ThreadLocal内存泄露的地方。碰到这种情况,则会引发替换这个位置的动作
如果上面两种情况都没发生,即上面的第(1)种情况,则新创建一个Entry对象放入槽中

private Entry getEntry(ThreadLocal key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);
}

当命中的时候,也就是根据当前ThreadLocal计算出来的i恰好是当前ThreadLocal设置的值的时候,可以直接根据hashcode来计算出位置,当没有命中的时候,这里没有命中分为三种情况:

  • 当前ThreadLocal之前没有设值过,并且当前槽位没有值。
  • 当前槽位有值,但是对于的不是当前threadlocal,且那个ThreadLocal没有被回收。
  • 当前槽位有值,但是对于的不是当前threadlocal,且那个ThreadLocal被回收了。

上面三种情况都会调用getEntryAfterMiss方法。调用getEntryAfterMiss方法会引发数组的遍历。

总结一下ThreadLocal的性能,一个线程对应多个ThreadLocal实例的场景中
在没有命中的情况下基本上一次hash就可以找到位置
如果发生没有命中的情况,则会引发性能会急剧下降,当在读写操作频繁的场景,这点将成为性能诟病。

1.1.2 实例


两个线程操作同一object 对象,显然非线程安全,但是由于使用了 FTL, 线程安全!

结果表明内存地址不同,并非操作同一个 object!

让T1每1s 中新生成一个 object 对象
T2验证当前 object 是否与之前状态相同

显然,每个线程拿到的对象都是线程独享的!
某线程对变量的修改不影响其他线程!
通过对象隔离优化了程序性能!

1.1.3 Netty FastThreadLocal源码解析

1.1.3.1 创建


创建时重写一下初始值方法


实际上在构造FastThreadLocal实例的时候就决定了这个实例的索引

index 为 private 且非 static, 说明每个实例都有该值

再看看索引的生成相关代码

index 从0开始计数


nextIndex是InternalThreadLocalMap父类的一个全局静态的AtomicInteger类型的对象,这意味着所有的FastThreadLocal实例将共同依赖这个指针来生成唯一的索引,而且是线程安全的
Netty重新设计了更快的FastThreadLocal,主要实现涉及

  • FastThreadLocalThread
  • FastThreadLocal
  • InternalThreadLocalMap

FastThreadLocalThread是Thread类的简单扩展,主要是为了扩展threadLocalMap属性

FastThreadLocal提供的接口和传统的ThreadLocal一致,主要是set和get方法,用法也一致
不同地方在于FastThreadLocal的值是存储在InternalThreadLocalMap这个结构里面的,传统的ThreadLocal性能槽点主要是在读写的时候hash计算和当hash没有命中的时候发生的遍历,我们来看看FastThreadLocal的核心实现

InternalThreadLocalMap实例和Thread对象一一对应
UnpaddedInternalThreadLocalMap维护着一个数组:

这个数组用来存储跟同一个线程关联的多个FastThreadLocal的值,由于FastThreadLocal对应indexedVariables的索引是确定的,因此在读写的时候将会发生随机存取,非常快。

另外这里有一个问题,nextIndex是静态唯一的,而indexedVariables数组是实例对象的,因此我认为随着FastThreadLocal数量的递增,这会造成空间的浪费

1.1.3.2 get方法实现

获取 ThreadLocalMap





首先拿到当前线程,再判断是否为 FTL 线程快速获取否则慢速获取

  • 让我们先分析一下 slowGet方法

    首先会获取一个 ThreadLocal 变量

    拿到 JDK 的 ThreadLocal 变量,用于给每个线程拿到InternalThreadLocalMap变量,所以过程较慢,该方法称为 slowGet 可想而知!

    由于在创建 ThreadLocal 时,并没有重写 initValue 方法,所以可能为 null
  • 接下啦看 fastGet 方法



直接通过索引取出对象



通过每个线程独享的 ThreadLocalMap 对象借助在 JVM 中每个 FTL 的唯一索引

1.2 轻量级对象池 Recycler

1.2.1 Recycler的使用



所以不使用 new 而是直接复用

Netty使用

1.2.2 Recycler的创建

  • 创建方式为直接new 一个 Recycler 对象,然后重写 newObject 方法
  • 转到构造方法

再看看每个Recycler 的结构是如何的

  • 每个Recycler 中对应每条线程都持有一个 Stack 对象
  • 下面图示说明

看下 Stack对象 的 element 参数,一些默认处理器的数组,该数组实际存放对象池,每个处理器都包装了一个对象,handler 可被外部对象引用.,从而回收该对象

  • 参数列表

  • 其中,radiomask 控制对象回收的比率,所以并非每次调用recycler 都会发生回收

  • maxcapacity 池子最大元素容量

  • 线程1残留的对象会缓存到线程2中继续释放
    所以 maxdelayedqueues 也就是可以缓存对象的线程数.如若再有个线程3,而队列结构在线程2,那3会直接抛弃1的残留对象.

  • availablesharedcapacity:线程1中创建的对象能够在其他线程中缓存的对象的最大个数.

    以上即为 Stack 对象所有成员变量.

下面回到Recycler的构造方法,看其入参.



该数值即为Stack 数组元素能有多少个

  • 再看看如下构造方法的参数.
  • 默认值为2
  • 看看新的构造器的 radio 参数
  • 默认值8
  • 两倍CPU核心数
  • 自然该值为7

1.2.3 回收对象到 Recycler

1.2.3.1 同线程回收

  • 客户端开始调用

  • Recycler抽象类
  • 将当前对象压栈
  • 如下,首先判断当前线程,thread 即为S tack 对象中保存的成员变量,若是创建该 stack 的线程,则直接压栈Stack 中,若不是再 pushlater.先分析 pushnow.

  • 首先验证两个 id,由于默认初始值为0,所以通过判断.

  • 接下来将其都赋值为第三个 id,该值在整个Recycler 中都是唯一确定值

紧接着判断当前 size是否已到 maxcapacity,若达到上限,直接丢弃该对象即直接 return;否则继续判断 drop 处理器


首先判断,若该对象之前未被回收过,继续判断;
至今已经回收了多少个对象,其中 rm 为7,即111(二进制表示),即每隔8个对象,就会来此判断一次,将其与7进行与运算后,若不为0,则返回 true,表示只回收八分之一的对象.

继续回到 pushnow 的流程,接下来判断 size 是否等于数组的容量.
因为 element 是一个数组,并不是一开始就创建maxcapacity 容量大小,若容量不够了,则进行两倍大小扩容,再将其加入数组.

1.2.3.2 异线程回收对象

  • 本节食用指南
1.2.3.2.1 获取 WeakOrderQueue(以下简称WOQ)

由前面 pushnow 进入同线程回收, pushlater 即进入异线程回收过程.

  • 先看看这么个东西是啥
    其类型就很神奇了,首先是个FTL,即每个线程都有一个 map,map 的key=stack 表示对于不同线程,对不同 stack 来说对应于不同的WOQ.
    那么它为何要定义成一个 map 结构呢,假设有3个线程T1/2/3;
    T1创建的对象肯可能跑到T3中回收,T2中创建的对象也可能到了T3回收.
    那么元素就是T1以及T2的WOQ.

假设当前在T2中,接下呢就通过get(this)拿到T1的WOQ,其中的 this 指的就是T1的 Stack.
然后若 queue==null,即表示T2从未回收过T1的对象,接下来开始判断
当前的即T2已经回收过的线程数 size,若不小于 mDQ,说明已经不能再回收其他线程的对象了!
给WOQ设置 dummy 标志,即对应下面的若下次看到一个线程标志了 dummy 直接return;什么也不做.

以上即为第一个过程,从FTL中拿一个Stack 对应的WOQ.

1.2.3.2.2 创建 WeakOrderQueue

若之前没拿到呢,那就直接创建一个WOQ吧!

  • 接下来让我们看看一个线程创建WOQ是如何与待回收对象的Stack 进行绑定的.

    其中的 this 即为 stack,是在T1中维护的,thread 即表示当前线程T2.
    allocat就是为了给当前线程T2分配一个在T1中的Stack 对应的一个WOQ.

    首先判断,T1中的 Stack还能否再分配LINK_capacity 个内存,若不能直接返回 null;
    若可以,就直接 new 一个WOQ.
    让我们具体看看其实现.

    此函数意义为:该 Stack 允许外部线程给它缓存多少个对象
    经过CAS操作设置该值为Stack 可为其他线程共享的回收对象的个数.

容量足够,则直接创建一个WOQ,下面来看看其数据结构.

一个链表结构.将其 handle与Link 进行分离,极大地提升了性能,
因为不必判断当前T2能否回收T1的对象,而只需判断当前的L ink 中是否有空的,则可直接将 hande 塞进去.因为在前面一次性的判断过,从T1中是否能批量分配这么多对象(以减少很多操作的频率).


使用同步,将WOQ插到Stack 的头部.

1.2.3.2.3 将对象追加到 WeakOrderQueue
  • 一开始呢,就是这么创建一个WOQ,默认有16个 handle

  • T2已经拿到queue,接着就是添加元素.


首先设置 上次回收 id.

  • 该 id 为WOQ的 id,所以是以WOQ为基础的

然后拿到尾指针,获取Link 的长度,若已经等于 link_capacity,说明已经不可写了;
继续判断 想办法看看T1是否还能再分配一个Link来保存待回收的对象.
不允许,则直接丢弃;
允许,则直接创建Link并重新赋值 tail 节点.

创建完后,拿到其写指针,即 tail 的长度(0).所以 tail 节点也已经又有了足够的存储空间,将 handle 追加进去.再将该 handle 的 stack 指针重置为 null,因为已经不属于原来的 stack 了.
最后,写指针+1.

1.2.4 从 Recycler 获取对象

本节分析若当前 stack 为空

若当前线程T1去获取对象,若 stack 中有对象,则直接拿出.T1所拥有的对象即为T1拥有的 stack 中的对象,若发现其中为空,会尝试与 和T1的 stack 关联的WOQ中的 T1创建的,但是在其他线程中去回收的对象.那么,T1中对象不足,就需要在其他线程中去回收.

其中的 cusor 指针即当前所需要回收的对象

  • 弹栈获取元素
  • 若 size 为0,则从其他线程回收
    若已经回收到则直接 return true.没有则重置两个指针,将 cusor 指向头结点,意味着准备从头开始回收.

接下来具体分析这段长代码

		boolean scavengeSome() {
            WeakOrderQueue prev;
            // 先拿到 cusor
            WeakOrderQueue cursor = this.cursor;
            // cusor 节点无对象
            if (cursor == null) {
                prev = null;
                // 指向头结点
                cursor = head;
                // 头结点依旧为空,已经没有与之关联的WOQ,直接返回 false.
                if (cursor == null) {
                    return false;
                }
            } else {
                prev = this.prev;
            }

            boolean success = false;
            // 此处 do/while 循环只为去寻找与 stack 关联的WOQ,看看到底能不能碰到一个对象.
            do {
            	// transfer 即为了将WOQ中的对象传输到 stack 中.成功获取则结束循环!
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }
				
				// 没有回收成功,则看往 cusor 的下一个节点
                WeakOrderQueue next = cursor.next;
                // owner 为与当前WOQ关联的一个线程(对应图中的T4)
                // 为空,说明T4已经不存在!随后即,做一些善后清理工作
                if (cursor.owner.get() == null) {
                    // If the thread associated with the queue is gone, unlink it, after
                    // performing a volatile read to confirm there is no data left to collect.
                    // We never unlink the first queue, as we don't want to synchronize on updating the head.
                    // 判断节点中是否还有数据
                    if (cursor.hasFinalData()) {
                    	// 就需要想办法将数据传输到 stack 中
                        for (;;) {
                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }
                    // 处理完该节点后,即将其删除,通过传统的指针的删除方法
                    if (prev != null) {
                        prev.setNext(next);
                    }
                  //  T4还存活,继续看后继节点.
                } else {
                    prev = cursor;
                }

                cursor = next;
			// cusor 为空时,诶就结束循环啦!
            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }
  • 下面看传输方法
        // transfer as many items as we can from this queue to the stack, returning true if any were transferred
        @SuppressWarnings("rawtypes")
        boolean transfer(Stack<?> dst) {
            // 先找到头结点
            Link head = this.head;
            if (head == null) {
            	// WOQ无数据,直接返回 false
                return false;
            }

			// 说明当前Link 的所有数据已被取走.
            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                this.head = head = head.next;
            }
            
			// 从该索引开始取对象
            final int srcStart = head.readIndex;
            int srcEnd = head.get();
           // 当前Link 要传输到 Stack 的对象个数.
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

			// dst 为当前Stack
            final int dstSize = dst.size;
            // 将对象都传输到Stack 所需容量.
            final int expectedCapacity = dstSize + srcSize;

			 // 所需容量大于Stack 的大小,
            if (expectedCapacity > dst.elements.length) {
            	// 因此进行扩容
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                // 可传输的最后一个对象
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle element = srcElems[i];
                    //为0说明未被回收过
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    reclaimSpace(LINK_CAPACITY);

                    this.head = head.next;
                }

                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                // The destination stack is full already.
                return false;
            }
        }

1.3 小结


参考

Java读源码之Netty深入剖析

全部评论

相关推荐

在评审的大师兄很完美:像这种一般就是部门不匹配 转移至其他部门然后挂掉 我就是这样被挂了
点赞 评论 收藏
分享
害怕一个人的小黄鸭胖乎乎:笑死了,没有技术大牛,招一堆应届生,不到半年,代码就成屎山了
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务