RPC项目知识点
23秋招写了三个项目:一个RPC、一个springboot、一个算法,至于面试官会不会问,私企会问一些,国企像银行那种一般不会过问RPC项目,自己整理了一些知识点,大家有需要的可以看看,有点乱,大家见谅
都是些理论知识用于学习,面经大家可以搜索别的帖子,这个项目要是面试官深究就会很难,所以没有弄清楚,不要写上去
PS:感谢几位牛友的资料https://www.nowcoder.com/discuss/353159058410643456?sourceSSR=users
https://www.nowcoder.com/discuss/353157479641063424?sourceSSR=users
简介
做这个项目的原因
两个不同的服务器上的服务提供的方法不在一个内存空间,需要网络编程才能传递方法调用所需要的参数,方法调用的结果也需要通过网络编程来接收。如果手动网络编程来实现这个调用过程的话工作量大,因为需要考虑底层传输方式(TCP 还是UDP)、序列化方式等等方面。RPC可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。
原理
①服务端启动的时候先扫描,将服务名称及其对应的地址(ip+port)注册到注册中心,这样客户端才能根据服务名称找到对应的服务地址。
②客户端去注册中心找服务地址,有了服务地址之后,客户端就可以通过网络请求服务端了。
②客户端调用远程方法的时候,实际会通过创建代理对象来传输网络请求。
④客户端生成request对象(类名、方法名以及相关参数)作为消息体,拼接消息头,然后通过Netty传输过去。
⑤当客户端发起请求的时候,多台服务器都可以处理这个请求。通过负载均衡选一台服务器。
⑥服务器收到客户端传过来的信息后进行解码,看看客户端需要要干什么,然后反射调用方法得到result,把result封装成rpcMessage,再传回去。
⑦客户端收到rpcMessage,解码,拿到自己想要的结果;
背景
Dubbo、Motan、gRPC
Dubbo
微服务框架,为大规模微服务实践提供高性能 RPC 通信、流量治理、可观测性等解决方案, 涵盖 Java、Golang 等多种语言 SDK 实现。提供了从服务定义、服务发现、服务通信到流量管控等几乎所有的服务治理能力,支持 Triple 协议、应用级服务发现、Dubbo Mesh等特性。
Motan
新浪微博开源的一款 RPC 框架,据说在新浪微博正支撑着千亿次调用。不过笔者倒是很少看到有公司使用,而且网上的资料也比较少。
gRPC
Google 开源的一个高性能、通用的开源 RPC 框架。其由主要面向移动应用开发并基于 HTTP/2 协议标准而设计,基于 ProtoBuf 序列化协议开发,并且支持众多开发语言。
Thrift
Facebook 开源的跨语言的 RPC 通信框架,给 Apache ,由于其跨语言特性和出色的性能,在很多互联网公司得到应用。
对比
gRPC 和 Thrift 虽然支持跨语言的 RPC 调用,只提供了最基本的 RPC 框架功能,缺乏一系列配套的服务化组件和服务治理功能的支撑。
Dubbo 功能完善程度、生态系统还是社区活跃。Dubbo在国内有很多成功的案例比如当当网、滴滴等等,是经得起生产考验的成熟稳定的 RPC 框架。
服务端 暴露->扫描->注册
定义了两个注解:RcpService注册服务,RpcReference消费服务。生命周期都为 runtime,也就是程序运行期能够读取的注解。
RcpService注册服务,用于暴露服务
服务端的HelloServiceImpl实现类上面,将标有@RpcService的注解的bean进行缓存spring包里面实现扫描。
@RpcService(group = "test1", version = "version1") public class HelloServiceImpl implements HelloService{}
这个注解有两个属性:
version :服务版本,主要是为后续不兼容升级提供可能
group : 服务所在的组。主要用于处理一个接口有多个类实现的情况。
RpcService这个注解还使用了 Spring 提供的 @Component 注解。这样的话,使用 RpcService注解的类就会交由 Spring 管理(前提是被被@ComponentScan注解扫描到 )。
RpcScan自定义包扫描,来扫描自定义的注解
public @interface RpcScan { String[] basePackage(); }
两个启动类上面:@RpcScan(basePackage = {"github.javaguide"})
放在启动类上(main 方法所在的类),标识服务的扫描的包的范围,找到标记有 RpcService 的类,并注册。
扫描
【提供扫描的类】
CustomScanner类继承ClassPathBeanDefinitionScanner类;
使用父类的scan方法,来进行扫描,传入的参数就是包的类名(github.javaguide);
可以看到会先得到一个int,也就是已经存在的BeanDefinition,这个在AnnotatedBeanDefinitionReader中将讲过,也就是spring内置的几个bd;
然后通过doScan方法正式进行扫描;
返回的是通过扫描得到的类数;
【扫描】
registerBeanDefinitions类;
扫描被RpcService和Component注释的类;
RpcReference注册服务
注入属性 将代理对象注入 自动装配服务实现类
在客户端的HelloController类里面,
@RpcReference(version = "version1", group = "test1") private HelloService helloService;
HelloService 是接口,HelloServiceImpl是该接口的实现类。
CustomScannerRegistrar去扫描被@RpcService和Component注解的bean;
向zookeeper注册服务
在创建bean之前先查看类是否被注解,在SpringBeanPostProcessor这个类中,实现 BeanPostProcessor接口并且重写了里面的postProcessBeforeInitializtion方法和postProcessAfterInitialization方法。
【postProcessBeforeInitialization(Object bean, String beanName)】
Spring Bean 在实例化之前会调用 BeanPostProcessor 接口的 postProcessBefore Initialization()方法。在方法中获取RpcService注解,发布服务。
①入参的bean就是github.javaguide.serviceimpl.HelloServiceImpl;
②通过bean反射调用得到RpcService对象;
③创建RpcServiceConfig对象,把RpcService和bean的信息传进去,version,group,服务名;
④接口serviceProvider.publishService(rpcServiceConfig)发布服务;
【postProcessAfterInitialization(Object bean, String beanName)】
遍历类的属性上否有RpcReference注解,如果有的话,就通过反射将这个属性赋值即可。
①查看bean的字段中有没有RpcReference注解;
②获取代理对象RpcClientProxy;
③通过反射注入属性;
④返回bean;
服务发布
【ServiceProvider接口】
addService本地添加;
getService;
publishService连接zookeeper,实现真正的注册;
【ZkServiceProviderImpl】serviceProvider的实现类
变量:
Map<String, Object> serviceMap:代理对象名,方法;
Set<String> registeredService:代理对象名;
(zk包里面的)ServiceRegistry serviceRegistry:接口,用来连接zk;
方法:
addService(在本地记录一些信息);
getService(去map里面找);
publishService(连接zk,实现真正的注册)
publishService(RpcServiceConfig rpcServiceConfig)
调用serviceRegistry.registerService方法,传入方法名和地址;
进入zk包下面的ZkServiceRegistryImpl实现类,连接zk创建结点;
传输实体
RpcRequest implements Serializable
调用对象是要进行序列化的,所以实现序列化接口
serialVersionUID = 1905122041950251207L;序列化ID,为了保证序列化前和反序列化后的数据是一致的
private String requestId; //UUID.randomUUID().toString()
private String interfaceName;
private String methodName;
private Object[] parameters;
private Class<?>[] paramTypes;
private String version; 为后续不兼容情况的升级提供记录信息
private String group;处理一个接口有多个实现类
RpcResponse<T> implements Serializable
serialVersionUID = 715745410605631233L;//自定义序列化ID
private String requestId; //为了对应请求,这里也需要附带上请求的id
private Integer code;状态码 200 500
private String message; “成功”“失败”
private T data;消息体
在invoke方法里面会检查,rpcResponse是否为空,requestId是否相同,状态码。
RpcMessage
private byte messageType;
private byte codec;
private byte compress;
private int requestId; 使用ATOMIC_INTEGER生成
private Object data;
data里面还有一个requestId,使用UUID生成。
步骤:RpcRequest或RpcResponse -> RpcMessage -> 编码器
注册中心
--zookeeper理论知识--
为什么用zookeeper做注册中心?
作为注册中心而言,配置是不经常变动的,只有当新版本发布或者服务器出故障时会变动。CP 不合适于配置经常变动的,而 AP 在遇到问题时可以牺牲其一致性来保证系统服务的高可用性,既返回旧数据。
只是一个demo,环境稳定,流量小,不会遇到注册中心的实例(节点)半数以上都挂了的情况。所以在实际生产环境中,选择 Zookeeper 还是选择 Eureka ,这个就要取决于系统架构师对于业务环境的权衡了。
对比zookeeper和Eureka
CP,AP;
强一致性,高可用;
3个角色,平等;
选取leader时不可用,不保证强一致性。
数据节点
zk数据模型中的最小数据单元,数据模型是一棵树,由斜杠(/)分割的路径名唯一标识,数据节点可以存储数据内容及一系列属性信息,同时还可以挂载子节点,构成一个层次化的命名空间。
会话(Session)
zk客户端与zk服务器之间的一个TCP长连接,通过这个长连接,客户端能够使用心跳检测与服务器保持有效的会话,也能向服务器发送请求并接收响应,还可接收服务器的Watcher事件通知。
Session的sessionTimeout,是会话超时时间,如果这段时间内,客户端未与服务器发生任何沟通(心跳或请求),服务器端会清除该session数据,客户端的TCP长连接将不可用,这种情况下,客户端需要重新实例化一个Zookeeper对象。
事务 ZXID
事务是指能够改变Zookeeper服务器状态的操作,一般包括数据节点的创建与删除、数据节点内容更新和客户端会话创建与失效等操作。对于每个事务请求,zk都会为其分配一个全局唯一的事务ID,即ZXID,是一个64位的数字,高32位表示该事务发生的集群选举周期(集群每发生一次leader选举,值加1),低32位表示该事务在当前选择周期内的递增次序(leader每处理一个事务请求,值加1,发生一次leader选择,低32位要清0)。
事务日志
所有事务操作都是需要记录到日志文件中的,可通过 dataLogDir配置文件目录,文件是以写入的第一条事务zxid为后缀,方便后续的定位查找。zk会采取“磁盘空间预分配”的策略,来避免磁盘Seek频率,提升zk服务器对事务请求的影响能力。默认设置下,每次事务日志写入操作都会实时刷入磁盘,也可以设置成非实时(写到内存文件流,定时批量写入磁盘),但那样断电时会带来丢失数据的风险。
数据快照
数据快照是zk数据存储中另一个非常核心的运行机制。数据快照用来记录zk服务器上某一时刻的全量内存数据内容,并将其写入到指定的磁盘文件中,可通过dataDir配置文件目录。可配置参数snapCount,设置两次快照之间的事务操作个数,zk节点记录完事务日志时,会统计判断是否需要做数据快照(距离上次快照,事务操作次数等于snapCount/2~snapCount 中的某个值时,会触发快照生成操作,随机值是为了避免所有节点同时生成快照,导致集群影响缓慢)。
过半
所谓“过半”是指大于集群机器数量的一半,即大于或等于(n/2+1),此处的“集群机器数量”不包括observer角色节点。leader广播一个事务消息后,当收到半数以上的ack信息时,就认为集群中所有节点都收到了消息,然后leader就不需要再等待剩余节点的ack,直接广播commit消息,提交事务。选举中的投票提议及数据同步时,也是如此,leader不需要等到所有learner节点的反馈,只要收到过半的反馈就可进行下一步操作。
zookeeper服务节点挂掉之后,怎么删除它?
使用临时节点,会话失效,节点自动清除。
zookeeper集群节点宕机了怎么发现剔除的?
发现:watcher机制
剔除:临时节点
zookeeper心跳检测更新列表并利用watcher机制发给客户端
分布式协议[1]
ZAB是paxos的改版,Mysql是paxos、redis sentinel是raft、zookeeper是ZAB。
心跳机制
服务提供者定时向注册中心发送本机地址(心跳数据包),而注册中心的监控则维持一个channelId和具体地址的map,并且通过IdleHandler监听空闲事件,到达一定的空闲次数则认为不活跃,当不活跃时(这里的不活跃条件是5分钟内3次以上没有发送心跳包),zookeeper删除相应的url节点,但后续的逻辑没有继续做。
删掉结点后,通知客户端。
注册中心对于服务端掉线时怎么处理
移出ip链表,发送给客户端,等待服务器上线,重新连接;
集群
多装几个,修改配置文件文件;
因为是在一台机器上模拟集群,所以端口不能重复,这里用2181~2183,2287~2289,以及3387~3389相互错开;
在每个zk server配置文件的dataDir所对应的目录下,必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同;
【启动】ZooKeeper zk = new ZooKeeper(" 172.28.20.102:2181, 172.28.20.102:2182, 172.28.20.102:2183", 300000, new DemoWatcher());
数据模型
zk维护的数据主要有:客户端的会话(session)状态及数据节点(dataNode)信息。
--zookeeper实际使用--
注册过程
【Curator】
①创建CuratorFramework对象zkClient,zkClient:地址127.0.0.1:2181,重试策略,start连接zookeeper;
②先去本地路径map里面看看有没有创建过;
③创建结点,根结点:完整的服务名;子节点:IP+port;
④创建完了把路径添加到map中去
服务调用
①参数为RpcRequest对象,从中取出服务名,创建对象zkClient,连接zookeeper;
②获取结点下面的子节点,存到本地map中去,返回string的list;
③在本地Map<String, List<String>>中找;
④没找到,连接zookeeper,获取子节点路径列表,在map中保存;
⑤负载均衡策略选出一个子节点服务地址进行连接;
监听器
Zookeeper会通过心跳检测机制,来判断服务提供端的运行状态,来决定是否应该把这个服务从地址列表剔除。
监听器的作用在于监听某一节点,若该节点的子节点发生变化,比如增加减少,更新操作的时候,我们可以自定义回调函数。
一旦这个节点下的子节点发生变化,Zookeeper Server就会发送一个事件通知客户端。
客户端收到事件以后,就会把本地缓存的这个服务地址删除,这样后续就不会把请求发送到失败的节点上,完成服务下线感知。
为什么不选择Redis作为注册中心?
zookeeper临时节点自动宕机自动清除;
zookeeper服务容灾?zookeeper服务节点挂掉之后,怎么删除它?
容灾:在集群若干台故障后,整个集群仍然可以对外提供可用的服务。
一般配置奇数台去构成集群,以避免资源的浪费。
三机房部署是最常见的、容灾性最好的部署方案。
删除:使用临时节点,会话失效,节点自动清除。
zookeeper的问题
崩溃恢复无法提供服务、写的性能瓶颈是一个问题、选举过程速度缓慢、无法进行有效的权限控制;
负载均衡
【从现有服务地址list中选择一个,参数:List<String> serviceAddresses, RpcRequest rpcRequest】
背景
系统中的某个服务的访问量大,将这个服务部署在了多台服务器上,当客户端发起请求的时候,多台服务器都可以处理这个请求。如何正确选择处理该请求的服务器就很关键。负载均衡为了避免单个服务器响应同一请求,容易造成服务器宕机、崩溃等问题。
随机int
一致性哈希
【原理】hash空间组成一个虚拟的圆环,将各个服务器使用 Hash 函数进行哈希,可以选择服务器的IP或主机名作为关键字进行哈希,从而确定每台机器在哈希环上的位置。将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针寻找,第一台遇到的服务器就是其应该定位到的服务器。
【优点】对于节点的增减都只需重定位环空间中的一小部分数据,只有部分缓存会失效,不至于将所有压力都在同一时间集中到后端服务器上,具有较好的容错性和可扩展性。
【结构】一个服务名对应一棵树,ConcurrentHashMap<String,ConsistentHashSelec
tor(TreeMap)>,服务名和TreeMap的对应。
【插入】ConsistentHashSelector中有TreeMap<Long, String>(<哈希值,地址>)。传入一个装着地址的list,对每一个地址生成160个虚拟结点,求MD5,再求hash,最后存放在树中。
【查询】入参为rpcRequest,拿到服务名,去Map里面找对应的selector;如果为空,则新建;如果不为空,对服务名求MD5,拿到hashcode。每个hashcode对应一棵树。
【TreeMap】<哈希值Long,地址String>,到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 String。找到了就返回ip + port,没找到就返回第一个。
dubbo的负载均衡算法
①RandomLoadBalance:根据权重随机选择(对加权随机算法的实现);
②LeastActiveLoadBalance:最小活跃数负载均衡。初始状态下所有服务提供者的活跃数均为 0(每个服务提供者的中特定方法都对应一个活跃数),每收到一个请求后,对应的服务提供者的活跃数 +1,当这个请求处理完之后,活跃数 -1。
Dubbo 就认为谁的活跃数越少,谁的处理速度就越快,性能也越好,这样的话,我就优先把请求给活跃数少的服务提供者处理;
③ConsistentHashLoadBalance:一致性哈希;
④RoundRobinLoadBalance:加权轮询负载均衡,加权轮询就是在轮询的基础上,让更多的请求落到权重更大的服务提供者上。
序列化协议
不使用JDK自带
①不支持跨语言调用;
②性能差:相比于其他序列化框架性能更低,主要原因是序列化之后的字节数组体积较大,导致传输成本加大。
为什么选Kryo?
Kryo 是专门针对 Java 语言序列化方式并且性能非常好,并且 Dubbo 官网的一篇文章中提到说推荐使用 Kryo 作为生产环境的序列化方式。
序列化协议对比[2]
Kryo、Hessian、Protobuf
Kryo特点
Kryo由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的字节码体积。
因为 Kryo 不是线程安全的。使用 ThreadLocal 来存储 Kryo 对象,一个线程一个 Kryo 实例。
I / O
【I/O】linux系统内核read()、write()函数,read把内核缓冲区中的数据复制到用户缓冲区中,write把用户缓冲区的数据写入到内核缓冲区中。
【阻塞和非阻塞】进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,是一种读取或者写入操作函数的实现方式。用户进程发起read操作,阻塞则会一直等待内存缓冲区数据完整后再解除阻塞;非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。
【同步/异步】是用户线程与内核的交互方式。①同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;②异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
非阻塞IO怎么判断数据是否准备好
轮询
BIO NIO AIO
【BIO】服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。
应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间。
【NIO】应用程序会一直发起 read 调用,等待数据从内核空间拷贝到用户空间的这段时间里,线程依然是阻塞的,直到在内核把数据拷贝到用户空间。
【IO多路复用(异步阻塞)】多路网络连接可以复用一个I/O线程。应用程序不断进行 I/O 系统调用轮询数据是否已经准备好是十分消耗 CPU 资源的。IO 多路复用模型中,线程首先发起 select 调用,询问内核数据是否准备就绪,阻塞等待select系统调用返回。等内核把数据准备好了返回一个ready,用户线程再发起 read 调用。read 调用的过程(数据从内核空间 -> 用户空间)还是阻塞的。
【AIO异步非阻塞】基于事件和回调机制实现的,发起IO操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作,Proactor。
适用场景
BIO方式适用于连接数目比较小,长请求且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,下载一个大文件;
NIO的适用场景:高并发数目多,比较轻,高访问量,短请求,聊天服务器,弹幕系统,服务器间通讯;
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂。
多路复用
IO多路复用的本质就是通过系统内核缓冲IO数据,让单个线程可以监视多个文件描述符(FD),一旦某个描述符读就绪或者写就绪,可以通知程序进行相应的读写操作,也就是使用单个线程同时处理多个网络连接IO,它的原理就是select、poll、epoll不断轮询所负责的socket,当某个socket有数据达到了,就通知用户进程。
select
过程是阻塞的。仅知道有几个I/O事件发生了,但并不知道具体是哪几个socket连接有I/O事件,还需要轮询去查找,时间复杂度为O(n),处理的请求数越多,所消耗的时间越长。
【select函数执行流程】①从用户空间拷贝fd_set(注册的事件集合)到内核空间;②遍历所有fd文件,并将当前进程挂到每个fd的等待队列中,当某个fd文件设备收到消息后,会唤醒设备等待队列上睡眠的进程,那么当前进程就会被唤醒;③如果遍历完所有的fd没有I/O事件,则当前进程进入睡眠,当有某个fd文件有I/O事件或当前进程睡眠超时后,当前进程重新唤醒再次遍历所有fd文件。
【缺点】①单个进程所打开的FD是有限制的,通过 FD_SETSIZE 设置,默认1024;②每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大。
poll
poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的。
epoll epoll_create() , epoll_ctl() , epoll_wait()
事件驱动机制,修改主动轮询为被动通知,当有事件发生时,被动接收通知。所以epoll模型注册套接字后,主程序可做其他事情,当事件发生时,接收到通知后再去处理。epoll会把哪个流发生哪种I/O事件通知我们。epoll是事件驱动,即每个事件关联上fd,每当fd就绪,系统注册的回调函数就会被调用,将就绪的fd放到readyList里面,是基于红黑树实现的。
【流程】①通过epoll_create() 函数创建一个文件,返回一个文件描述符(Linus系统一切对象皆为文件)fd ② 创建socket接口号4,绑定socket号与端口号,监听事件,标记为非阻塞。通过epoll_ctl() 函数将该socket号 以及 需要监听的事件(如listen事件)写入fd中。③循环调用epoll_wait() 函数进行监听,返回已经就绪事件序列的长度(返回0则说明无状态,大于0则说明有n个事件已就绪)。例如如果有客户端进行连接,则,再调用accept()函数与4号socket进行连接,连接后返回一个新的socket号,且需要监听读事件,则再通过epoll_ctl()将新的socket号以及对应的事件(如read读事件)写入fd中,epoll_wait()进行监听。循环往复。
【优点】不需要再遍历所有的socket号来获取每一个socket的状态,只需要管理活跃的连接。即监听在通过epoll_create()创建的文件中注册的socket号以及对应的事件。只有产生就绪事件,才会处理,所以操作都是有效的,为O(1)。
epoll的水平触发和边缘触发
LT(水平触发):当被监控的fd上有IO事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!
ET(边缘触发):当被监控的fd上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。
NIO
NIO组件
一个线程对应一个selector,一个selector对应多个channel(连接),每个channel 都会对应一个Buffer;
buffer:可以读写数据的内存块;
channel:用于数据的读写;
selector:
NIO的缺点
①NIO的类库和API繁杂,使用麻烦,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
②工作量和难度都非常大。
③JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK 1.6版本修复了该问题,但是直到JDK 1.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而已,它并没有得到根本性解决。
Reactor线程模型
Reactor模式基于事件驱动,分为单Reactor单线程模型、单Reactor多线程模型、主从Reactor多线程模型。
单reactor单线程
【工作原理】
服务器端用一个线程通过多路复用搞定所有的I0操作(建立连接,读、写等)。但是如果客户端连接数量较多,将无法支撑,NIO案例就属于这种模型。
【特点】
模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。只有一个线程,无法完全发挥多核CPU的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
单reactor多线程
【原理】
一个线程负责管理连接,一组线程负责处理IO操作。
【特点】
充分的利用多核cpu 的处理能力,多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。
主从 Reactor 多线程
①主线程 MainReactor对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件;
②当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor;
③SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理;
④当有新事件发生时,subreactor 就会调用对应的handler处理;
⑤handler 通过read 读取数据,分发给后面的worker 线程处理;
⑥worker 线程池分配独立的worker 线程进行业务处理,并返回结果;
⑦handler 收到响应的结果后,再通过send 将结果返回给client;
Reactor 和 Proactor区别
①Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。
②主要区别就是真正的读取和写入操作是有谁来完成的,Reactor中需要应用程序自己读取或者写入数据,而Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO设备。
Netty基础理论知识
对Netty的认识
①基于NIO的client-server框架,使用它可以快速简单地开发网络应用程序。
②简化了TCP和UDP套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。
③支持多种协议如FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。
Netty优点
①高并发:Netty 是一款基于 NIO开发的网络通信框架,对比于 BIO,并发性能得到了很大提高,修复了已经发现 NIO BUG。
②传输快:传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输。
③封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。
④API使用简单,开发门槛低;功能强大,预置了多种编解码功能,支持多种主流协议;
零拷贝[3]
①Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
②Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。
③Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。
Netty 高性能
①IO线程模型:同步非阻塞,用最少的资源做更多的事。
②内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
③内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
④可靠性,链路有效性检测:链路空闲检测机制,读/写空闲超时机制;
⑤内存保护机制:通过内存池重用ByteBuf;ByteBuf的解码保护;优雅停机:不再接收新消息、退出前的预处理操作、资源的释放操作。
⑥串行无锁化设计:消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。
⑦高性能序列化协议:支持 protobuf 等高性能序列化协议。
⑧安全性:SSL V2和V3,TLS,SSL单向认证、双向认证和第三方CA认证。
⑨TCP参数配置:SO_RCVBUF和SO_SNDBUF:通常建议值为128K或者256K;SO_TCPNODELAY:NAGLE算法通过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提高网络应用效率。但是对于时延敏感的应用场景需要关闭该优化算法
Netty 和 Tomcat 的区别
作用不同:Tomcat 是 Servlet 容器,可以视为 Web 服务器,而 Netty 是异步事件驱动的网络应用程序框架和工具用于简化网络编程,例如TCP和UDP套接字服务器。
协议不同:Tomcat 是基于 http 协议的 Web 服务器,而 Netty 能通过编程自定义各种协议,因为 Netty 本身自己能编码/解码字节流,所有 Netty 可以实现,HTTP 服务器、FTP 服务器、UDP 服务器、RPC 服务器、WebSocket 服务器、Redis 的 Proxy 服务器、MySQL 的 Proxy 服务器等等。
Netty 和 Socket的区别
【socket】
Socket编程主要涉及到客户端和服务端两个方面,首先是在服务器端创建一个服务器套接字(ServerSocket),并把它附加到一个端口上,服务器从这个端口监听连接。
客户端请求与服务器进行连接的时候,根据服务器的域名或者IP地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作。
【socket缺点】
①需对传输的数据进行解析,转化成应用级的数据;
②对开发人员的开发水平要求高;
③相对于Http协议传输,增加了开发量;
Netty 发送消息方式
Netty 有两种发送消息的方式:
①直接写入 Channel 中,消息从 ChannelPipeline 当中尾部开始移动;
②(用的这个)写入和 ChannelHandler 绑定的 ChannelHandlerContext 中,消息从 ChannelPipeline 中的下一个 ChannelHandler 中移动。
Netty线程模型
Netty线程模型
Netty主要基于主从Reactors多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。
①Netty抽象出两组线程池,Boss负责客户端的连接,Worker专门负责网络的读写,都是NioEventLoopGroup;
②NioEventLoopGroup一个事件循环组,一个组中含有多个事件循环,每一个事件循环是NioEventLoop;
③NioEventLoop 表示不断循环的执行处理任务的线程,每个NioEventLoop都有一个 selector,用于监听绑定在其上的socket的网络通讯。还有一个TaskQueue;
④Boss里面的NioEventLoop循环
轮询accept事件;
处理accept事件,与client建立连接,生成 NioScocketChannel,并将其注册到某个workerNioEventLoop上的selector;
处理任务队列的任务,即runAllTasks;
⑤Worker里面的NioEventLoop循环
轮询read/write事件;
处理i/o事件,在NioScocket;
Channel处理;处理任务队列的任务,即runAllTasks;
⑥每个WorkerNioEventLoop处理业务时,会使用pipeline,pipeline 中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器;
Netty异步模型
①异步和同步的区别是:当一个异步过程开始时,调用者是不能立刻得到结果的。是调用完成之后通过回调来通知调用者的。
②Netty的IO操作是异步的,Bind、 Write、 Connect 等操作会返回一个Channel Future。
③调用者虽然不能直接获得结果,但是可以通过Future-Listener机制监听,让调用者获得结果。
④Netty模型是建立在future - callback 机制的,callback 就是回调。Future的核心思想是,假设一个方法fun计算过程非常耗时,等待fun 返回显然不合适。那么可以在调用fun 的时候,立马返回一个Future, 后续可以通过Future 去监控方法fun 的处理过程(即: Future-Listener 机制)。
处理方法
①客户端:通过ChannelFuture 接口的addListener() 方法注册一个ChannelFuture Listener ,当操作执行成功或者失败时,监听就会自动触发返回结果。
②服务器:可以通过ChannelFuture接口的sync() 方法让异步的操作编程同步的。
Netty核心组件
一个线程- > 一个EventLoop - > 多个channel - > 一个pepiline - > 很多handler
Bytebuf (字节容器)
通过字节流进行传输的。
Channel (网络读写操作抽象类)
通过Channel可以进行I/O操作,客户端成功连接服务端,就会新建一个Channel 同该用户端进行绑定,为每个Channel分配一个EventLoop。
Bootstrap启动类
设置线程组,设置通道NioSocketChannel,连接超时时间,handler(pipeline、心跳IdleStateHandler、编码器、解码器、处理器),connect某个端口(连接TCP),添加监听器,连接成功获得channel,放到CompletableFuture中。
Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,来实现所有的网络交换。
ServerBootstrap服务端启动类
设置2个线程组,设置通道NioServerSocketChannel,设置保持活动连接状态,childHandler(pipeline、心跳IdleStateHandler、编码器、解码器、处理器),启动服务器(并绑定bind端口),异步改为同步,生成了一个 ChannelFuture 对象。
ServerBootstarp在调用 bind()方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。
EventLoopGroup,NioEventLoopGroup
EventLoop 的主要作用实际就是责监听网络事件并调用事件处理器进行相关I/0操作(读写)的处理,处理连接的生命周期中所发生的事件。客户端一个线程组,服务器两个,boss接收客户端的连接,worker专门负责网络的读写。
Channel-Pepiline-handler
1个Channel 包含1个ChannelPipeline。1个ChannelPipeline上可以有多个ChannelHandler。ChannelHandler是消息的具体处理器,主要负责处理客户端/服务端接收和发送的数据。
ChannelFuture
Netty中所有的I/O操作都为异步的,不能立刻得到操作结果。法一(客户端):通过ChannelFuture 接口的addListener() 方法注册一个ChannelFutureListener ,当操作执行成功或者失败时,监听就会自动触发返回结果。法二(服务器):可以通过ChannelFuture接口的sync() 方法让异步的操作编程同步的。
客户端channel中收到Response后,会自动先进入pipeline中add的Decoder()解码(从bytebuf到Object),解码后的object数据再进入handler强转成Response类型后续处理
Netty实际应用
客户端
客户端NettyRpcClient继承RpcRequestTransport
【NettyRpcClient()】
①配置一个启动类,一个线程组;
②配置channel为NioSocketChannel;
③配置handler,对应一个pipeline;
④配置pipeline,心跳机制(0,5,0),编码器,解码器,处理器。(如果 5 秒内没有数据发送到服务器,则发送心跳请求)
【doConnect(InetSocketAddress):Channel】用于连接服务端并返回对应的Channel,使用completableFuture接收。知道了服务端的地址之后,可以通过NettyClient 成功连接服务端了,(有了Channel 之后就能发送数据到服务端了)。
(异步变同步)通过ChannelFuture 接口的addListener() 方法注册一个Channel FutureListener ,当操作执行成功或者失败时,监听就会自动触发返回结果。
【sendRpcRequest(rpcRequest):CompletableFuture】发RpcRequest到服务端
①创建返回值CompletableFuture;
②根据rpcRequest获得服务地址,已经负载均衡了;
③调用getChannel,获取服务器地址相关通道;
④新建新建rpcMessage,传入rpcRequest,指定序列化、压缩方式;
⑤channel.writeAndFlush(rpcMessage),channel数据冲刷,发动数据到服务器;
【getChannel(inetSocketAddress):Channel】连接复用
根据地址去channelProvider的map里面找,Map<String, Channel> channelMap,找到了就复用,没找到就连接服务器返回新的channel,并且保存channel。
客户端NettyClientHandler继承ChannelInboundHandlerAdapter
【NettyClientHandler()】单例生成未被处理的请求UnprocessedRequests,单例生成NettyRpcClient。
【channelRead(ChannelHandlerContext ctx, Object msg)】读取服务器发送的消息。
①ctx:上下文对象, 含有管道pipeline , 通道channel, 地址;
②Object msg: 就是服务器发送的数据;
③判断信息的类型,如果是心跳就简单收下。如果是response,就去UnprocessedRequests的map里面记录这个request已经被处理了。
【userEventTriggered(ChannelHandlerContext ctx, Object evt)】心跳机制。判断信息是不是心跳机制,是就返回ping,刷新到channel中去。
【exceptionCaught(ChannelHandlerContext ctx, Throwable cause)】处理客户端消息发生异常时调用,关闭ctx。
客户端UnprocessedRequests
Map<String, CompletableFuture<RpcResponse<Object>>> 键:requestId 值:feature
用于存放未被服务端处理的请求(建议限制map容器大小,避免未处理请求过多OOM),处理完了就complete。
【put(String requestId, CompletableFuture<RpcResponse<Object>> future)】客户端发送的时候,把还没有处理的请求存进来。
【complete(RpcResponse<Object> rpcResponse)】客户端收到之后,把其从map中删掉。
客户端ChannelProvider 连接复用
Map<String, Channel> 键:服务端实例地址和序列化算法的字符串表示,值: Channel 对象
当客户端请求到一个服务端地址时,会到 map 里面查询是否已经建立过连接,如果建立过连接则查看状态是否存活,存活则直接使用该 Channel 对象进行传输。没活着就从map里面删掉。连接未建立则建立连接后再将连接放入。
服务器
服务器NettyRpcServer
【端口号】
【注册中心registerService】
【registerService(RpcServiceConfig rpcServiceConfig)】调用ServiceProvider.publ ishService(rpcServiceConfig)完成服务注册;
【start()】
①设置端口号,得到主机地址;
②配置一个启动类,两个线程组;
③配置channel为NioSocketChannel;
④TCP开启Nagle算法,尽可能的发送大数据快,减少网络传输。设置保持活动连接状态。
⑤配置handler,对应一个pipeline;
配置pipeline,心跳机制(30,0,0),编码器,解码器,处理器。(30 秒之内没有收到客户端请求的话就关闭连接);
⑥绑定端口并且同步生成了一个 ChannelFuture 对象,sync异步变同步。
服务器NettyRpcServerHandler继承ChannelInboundHandlerAdapter
【作用】
将标有@RpcService的注解的bean进行缓存spring包里面实现扫描,标在了HelloServiceImpl上面。@RpcService(group = "test1", version = "version1")。
接收客户端的请求。
根据传递进来的beanName从缓存中查找。
通过反射调用bean中的方法 handler包中的RpcRequestHandler类。
给客户端响应。
【channelRead(ChannelHandlerContext ctx, Object msg)】即发送又接收
ctx:上下文对象, 含有管道pipeline , 通道channel, 地址;
Object msg: 就是服务器发送的数据;
①将信息msg转为RpcMessage类型;
②判断信息的类型,如果是心跳就简单返回pong。
③不是就把msg转成RpcRequest;
④Object result = rpcRequestHandler.handle(rpcRequest)反射调用,拿到结果。
⑤ctx.writeAndFlush(rpcMessage).addListener将结果写入到缓存,并刷新。
RpcRequestHandler
【handle】
①去注册中心找,得到实现类Object;
②反射调用invokeTargetMethod;
【invokeTargetMethod】
传入request和实现类,反射调用拿到result;
异步改为同步
Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future 和ChanellFutures, 他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
自定义协议
步骤:RpcRequest或RpcResponse -> RpcMessage -> 编码器
通过设计协议,我们定义需要传输哪些类型的数据,并且还会规定每一种类型的数据应该占多少字节。这样我们在接收到二级制数据之后,就可以正确的解析出我们需要的数据。
编码器 RpcMessage变为ByteBuf
encode(ChannelHandlerContext ctx, RpcMessage a, ByteBuf out)
ctx:上下文对象, 含有管道pipeline , 通道channel, 地址
【自定义编码器 负责网络传输出去的消息,将消息格式转换为字节数组,然后写入到字节数据的容器ByteBuf对象中。】
【信息头】(魔数4:筛选来到服务端的数据包,识别出这个数据包并非是遵循自定义协议)(版本1:version)(信息长度4:总长度,头有16B+body)(信息类型1:心跳还是普通)(序列化类型1)(压缩类型1:GZIP)(requestid4:ATOMIC_INTEGER递增)
【信息体】byte[]
如果是心跳,写body为ping或者是pong;
不是心跳,序列化,压缩,求长度,记录长度。
解码器 ByteBuf变为RpcMessage
decode(ChannelHandlerContext ctx, ByteBuf in)
【背景】TCP 是面向连接的,面向流的,提供高可靠性服务。发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。
【长度】继承LengthFieldBasedFrameDecoder,它们处理粘拆包的主要思想是在生成的数据包中添加一个长度字段,用于记录当前数据包的长度。
--lengthFieldOffset字段长度偏移量:magic code 是 4B,version 是 1B,所以值为 5;
--lengthFieldLength字段长度中的字节数:full length为4B,所以值为 4;
--lengthAdjustment添加到长度字段值的补偿值,负数:full length包括所有数据并读取前9个字节,所以左边的长度是(fullLength-9)。 所以值是-9;
--initialBytesToStrip跳过的字节数:我们将手动检查魔数和版本,所以不要剥离任何字节,所以值为 0;
【过程】
调用父类的解码方法,传入(ctx,in),得到ByteBuf对象in;
检查魔数和版本号;
拿到长度,信息类型,序列化号,压缩号,requestId;
新建RpcMessage对象,设置序列化号,压缩号,requestId;
判断信息类型,如果是心跳的ping,设置RpcMessage的data为pong,返回;
判断信息类型,如果是心跳的pong,设置RpcMessage的data为ping,返回;
都不是,用减法求出body的长度bodyLength;
根据bodyLength生成byte[],提取in对应的长度;
解压,反序列化,变成RpcRequest或者RpcResponse;
设置RpcMessage的data为RpcRequest或者RpcResponse;
序列化
现在项目里的编码器是通过SerializationType的getName方法获得序列化方式,然后通过扩展类加载器得到序列化类的全限定类名加载类文件,创建序列化对象,这个时候的对象就是KryoSerializer类的对象,然后serializer调用serialize就用的是kryo序列化。
未完待续...
#你的秋招进展怎么样了#