Nacos服务注册原理分析

在分布式服务中,原来的单体服务会被拆分成一个个微服务,服务注册实例到注册中心,服务消费者通过注册中心获取实例列表,直接请求调用服务。

服务是如何注册到注册中心,服务如果挂了,服务是如何检测?带着这些问题,我们从源码上对服务注册进行简单的源码分析。

版本 2.1.1

  • Nacos Server:2.1.1
  • spring-cloud-starter-alibaba:2.1.1.RELEASE
  • spring-boot:2.1.1.RELEASE

方便统一版本,客户端和服务端版本号都为2.1.1

客户端

启动nacos服务注册和发现需要添加maven依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>${latest.version}</version>
</dependency>

根据maven依赖找到对应的spring.factories文件:

spring.factories文件里找到启动配置类信息,SpringBoot服务启动时会将这些配置类信息注入到bean容器中。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

服务注册的核心配置类为:NacosDiscoveryAutoConfiguration,该类配置三个bean对象:

  • NacosServiceRegistry
  • NacosRegistration
  • NacosAutoServiceRegistration

NacosAutoServiceRegistration

NacosAutoServiceRegistration继承了抽象类AbstractAutoServiceRegistrationAbstractAutoServiceRegistration抽象类又实现了ApplicationListener接口。

实现ApplicationListener接口的方法,会在Spring容器初始化完成之后调用onApplicationEvent方法:

public void onApplicationEvent(WebServerInitializedEvent event) {
  bind(event);
}

调用bind方法:

public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
    // 调用 start 方法
		this.start();
	}

调用了start方法:

public void start() {
  if (!isEnabled()) {
    if (logger.isDebugEnabled()) {
      logger.debug("Discovery Lifecycle disabled. Not starting");
    }
    return;
  }
  if (!this.running.get()) {
    this.context.publishEvent(
        new InstancePreRegisteredEvent(this, getRegistration()));
    register();
    if (shouldRegisterManagement()) {
      registerManagement();
    }
    this.context.publishEvent(
        new InstanceRegisteredEvent<>(this, getConfiguration()));
    this.running.compareAndSet(false, true);
  }

}

调用了register方法,最终调用的是NacosServiceRegistry类的register方法。

NacosServiceRegistry

根据上文可知,服务器启动后调用NacosServiceRegistry类的register方法,该方法实现将实例注册到服务端

public void register(Registration registration) {
  if (StringUtils.isEmpty(registration.getServiceId())) {
    log.warn("No service to register for nacos client...");
    return;
  }

  String serviceId = registration.getServiceId();
  String group = nacosDiscoveryProperties.getGroup();
  // 创建实例
  Instance instance = getNacosInstanceFromRegistration(registration);

  try {
    // 注册实例 
    namingService.registerInstance(serviceId, group, instance);
    log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
        instance.getIp(), instance.getPort());
  }
  catch (Exception e) {
    log.error("nacos registry, {} register failed...{},", serviceId,
        registration.toString(), e);
  }
}

创建实例,然后通过namingService.registerInstance方法注册实例,然后查看registerInstance方法:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        // 封装心跳包
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
        // 发送心跳包
        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
    // 发送实例 
    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

registerInstance主要做两件事:

  • 发送心跳包

beatReactor.addBeatInfo使用定时服务,每隔5s向服务端发送一次心跳请求,通过http请求发送心跳信息,路径为/v1/ns/instance/beat

心跳请求定时任务使用线程池ScheduledThreadPoolExecutor.schedule(),而该方法只会调用一次,定时任务的实现是在每次请求任务只会再调用一次ScheduledThreadPoolExecutor.schedule(),简单说就是nacos在发送心跳的时候,会调用schedule方法,在schedule要执行的任务中,如果正常发送完心跳,会再次调用schedule方法。

那为什么不直接调用周期执行的线程池ScheduledThreadPoolExecutor.scheduleAtFixedRate()?可能是由于发送心跳服务发生异常后,定时任务还会继续执行,但是周期执行的线程池遇到报错后也不会重复调用执行的任务。

线程任务BeatTaskrun方法,,每次执行会先判断isStopped,如果是false,说明心跳停止,就不会触发下次执行任务。如果使用定时任务scheduleAtFixedRate,即使心跳停止还会继续执行任务,造成资源不必要浪费。

  • 注册实例

registerService主要封装实例信息,比如ipportservicename,将这些信息通过http请求发送给服务端。路径为/v1/ns/instance

根据上面流程,查看以下的流程图:

服务端

服务端就是注册中心,服务注册到注册中心,在https://github.com/alibaba/nacos/releases/tag/2.1.1下载源码部署到本地,方便调式和查看,部署方式详见我的另外一篇文章Nacos 源码环境搭建

服务端主要接收两个信息:心跳包实例信息

心跳包

客户端向服务请求的路径为/v1/ns/instance/beat,对应的服务端为InstanceController类的beat方法:

@PutMapping("/beat")
@Secured(action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    // 判断是否有心跳,存在心跳就转成RsInfo
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
            serviceName, namespaceId);
    // 获取实例信息
    BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
    builder.setRequest(request);
    int resultCode = getInstanceOperator()
            .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
    result.put(CommonParams.CODE, resultCode);
    // 下次发送心跳包间隔
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
            getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

handleBeat方法中执行线程任务ClientBeatProcessorV2run方法,延长lastHeartBeatTime时间。注册中心会定时查询实例,当前时间 - lastHeartBeatTime > 设置时间(默认15秒),就标记实例为不健康实例。如果心跳实例不健康,发送通知给订阅方,变更实例。

服务端在15秒没有收到心跳包会将实例设置为不健康,在30秒没有收到心跳包会将临时实例移除掉。

实例注册

客户端请求的地址是/nacos/v1/ns/instance, 对应的是服务端是在InstanceController类。找到类上对应的post请求方法上。

注册流程:

InstanceController#register ——>InstanceOperatorClientImpl#registerInstance ——>ClientOperationServiceProxy#registerInstance ——>EphemeralClientOperationServiceImpl#registerInstance

创建 Service

服务注册后,将服务存储在一个双层map集合中:

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

通过是否存在ephemeral,true,走AP模式,否则走CP模式。

Nacos 默认就是采用的AP模式使用Distro协议实现。实现的接口是EphemeralConsistencyService对节点信息的持久化主要是调用put方法,

会先写入到DataStore中:

public void onPut(String key, Record value) {
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
         // 数据持久化到缓存中
        dataStore.put(key, datum);
    }

    if (!listeners.containsKey(key)) {
        return;
    }

    notifier.addTask(key, DataOperation.CHANGE);
}

总结

  • 从依赖上找到需要启动的是要加载的服务注册类NacosDiscoveryAutoConfiguration,主要配置三个对象 NacosServiceRegistryNacosRegistrationNacosAutoServiceRegistration
  • NacosServiceRegistry类的register方法,封装实例和心跳信息 通过http请求,定时发送发送心跳包,默认时间间隔是5秒。通过http请求,发送实例信息。
  • 服务端 接收到心跳请求,更新心跳包最新时间。服务端在15秒没有收到心跳包会将实例设为不健康,在30秒没有收到心跳包会将临时实例移除掉。接收到服务注册接口,通过ephemeral判断是否走AP还是走CP,AP模式使用Distro协议。通过调用EphemeralConsistencyService接口实现,持久化实例信息。

参考

  • Nacos源码之注册中心的实现
  • Nacos 服务注册源码分析
全部评论
谢谢大佬的分享
点赞 回复 分享
发布于 2023-02-09 10:12 广东
可以,干货满满
点赞 回复 分享
发布于 2023-02-09 10:30 重庆

相关推荐

12-07 16:52
已编辑
蚌埠坦克学院 Java
timeline技术1面&nbsp;11.22技术2面&nbsp;+&nbsp;hr&nbsp;11.28这家公司很好,作息规律&nbsp;10-10点半弹性打卡&nbsp;晚上6点下班&nbsp;双休,秋招hc少,很多都转正。语言kotolin&nbsp;+&nbsp;akka等&nbsp;要转。一、技术1面1.&nbsp;自我介绍(名字、学校、专业、项目)2.&nbsp;问有没有offer、到岗时间3.&nbsp;项目1:我负责的模块有哪些(流媒体、权限模块)4.&nbsp;说完之后,我讲流媒体模块的功能&nbsp;怎么设计&nbsp;(分片上传,流媒体转码)5.&nbsp;流媒体模块经过优化的(从之间传-》分片、从限定上传格式-》使用ffpeg转码)6.&nbsp;流媒体哪部分使用了redis,我视频id和视频在minio的访问地址,存入redis提升性能。7.&nbsp;还问了:项目服务器集群,5台流媒体模块(计算资源)、3台(机构、用户)、2台(minio以及redis存储相关)8.&nbsp;问:集群能不能横向扩展?&nbsp;(使用微服务架构、后台管理模块通过nacos&nbsp;+&nbsp;远程调用通信、流媒体模块计算资源使用消息队列&nbsp;+&nbsp;xxljob定时任务,扩展机器主要提供资源、任务统一调度)9.&nbsp;问:项目难点&nbsp;&nbsp;(分片相关的,数据格式转换的、团队合作,需求调整,整体比较复杂)10.&nbsp;问:遇到难点,怎么解决(你的技术思路是怎么来的)&nbsp;(第一从学习项目里面的某些场景获取、第二从csdn或者技术书籍上面)11.&nbsp;rpc框架部分&nbsp;(项目描述,是自己开发,主要是为了实现轻量化的自定义的rpc框架、结合自己的使用需求定制)12.&nbsp;rpc:zookeeper的选举策略(没回答上来)13.&nbsp;问:redis的使用(项目1&nbsp;用了、rpc用在服务列表保存部分)14.&nbsp;问:rpc的难点(netty的LTC解码器解决粘包半包,然后自定义协议,实现调用命令的传输)15.&nbsp;八股:redis的使用16.&nbsp;八股:集合,用过哪些(线程安全&nbsp;+&nbsp;线程不安全的集合)17.&nbsp;八股:ConcurrentHashMap的底层原理(我是按照1.7&nbsp;和&nbsp;1.8的介绍)分段锁和细粒度锁,以及链表&nbsp;|&nbsp;链表&nbsp;+&nbsp;红黑树18.&nbsp;八股:内存模型&nbsp;happensBefore规则,(我没回答上来,但是说了大概是干嘛的)19.&nbsp;八股:垃圾回收算法&nbsp;+&nbsp;G1的整体思路20.&nbsp;八股:AQS抽象队列同步器,作用,干嘛的,底层框架,实现锁机制21.&nbsp;自我评价(觉得自己怎么样)我主要针对&nbsp;公司的兴趣(游戏)、我对技术的兴趣(喜欢开发)、喜欢钻研了解新东西22.&nbsp;反问(部门是具体做什么的、面试流程&nbsp;2技术&nbsp;+&nbsp;1hr)&nbsp;语言(Java&nbsp;kotolin&nbsp;netty&nbsp;redis&nbsp;zookeeper)做的是slg游戏(策略游戏)23.&nbsp;问:我平时玩啥?我说steam的策略游戏&nbsp;+&nbsp;fps&nbsp;(主要文明6&nbsp;钢铁雄心&nbsp;游族的大皇帝)二、技术2面面试官:感觉是hr自我介绍(老样子&nbsp;+&nbsp;项目)&nbsp;同时说对游戏比较感兴趣介绍一下项目(我主要说了第一个,流媒体,图文,权限模块,以及具体的实现思路&nbsp;技术选型等内容,包括后续的测试&nbsp;和&nbsp;上线运维&nbsp;CICD&nbsp;devops&nbsp;以及Jenkins相关的设计问:我了解游戏开发的工作吗?问:我玩哪些游戏吗?(SLG策略游戏,我说游族的页游还有手游)问:我想在这个实习里面学到什么,获取什么东西?反问:最后一面(是,hr面)反问:12月12-15到岗可以吗?(回答,需要和主管商量)综合评价:面试不难,主要是自我介绍,加项目的介绍,后续看视频(基本是商业化面试的思路,复述了一遍)#游族##面经##牛客创作赏金赛#
查看27道真题和解析 牛客创作赏金赛
点赞 评论 收藏
分享
评论
2
10
分享
牛客网
牛客企业服务