【RocketMQ|源码分析】namesrv启动&停止

简介

namesrv在是RocketMQ中一个十分重要的组件,它相当于是Kafka中的zookeeper,Spring Cloud Alibaba中的nacos。它的主要作用是为消息生产者和消息消费者提供关于topic的路由信息。namesrv具有存储路由信息,并且能够管理broker节点(路由注册,路由删除等功能)。

namesrv源码分析

namesrv环境准备

namesrv的启动入口函数在org.apache.rocketmq.namesrv.NamesrvStartup#main直接通过idea启动会提示需要在环境变量中配置ROCKETMQ_HOME

在idea的环境变量配置后,再次启动NamesrvStartup

启动成功后,idea控制台会输出成功提示

namesrv启动过程

namesrv启动过程可以分为如下三个阶段

  1. 解析命令行并创建配置
  2. 创建NamesrvController并启动
  3. 创建ControllerManager并启动

NamesrvStartup#main入口方法中调用了main0和controllerManagerMain

public static void main(String[] args) {
    main0(args);
    controllerManagerMain();
}

在main0方法的解析了命令行中的参数,并创建了NamesrvController类

public static NamesrvController main0(String[] args) {
    try {
        // 1. 使用Apache Commons CLI解析启动命令行中的配置
        parseCommandlineAndConfigFile(args);
        // 2. 创建一个NamesrvController类
        NamesrvController controller = createAndStartNamesrvController();
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
​
    return null;
}

parseCommandlineAndConfigFile解析命令行中的参数,生成CommandLine对象,如果命令行中包含-c选项,则会读取参数中的配置文件路径,并设置NamesrvConfig、NettyServerConfig、NettyClientConfig三个参数。

如果命令行中包含-p选项,则会打印所有可配置项,并直接退出,启动效果如下图所示。

public static void parseCommandlineAndConfigFile(String[] args) throws Exception {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
​
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());
    if (null == commandLine) {
        System.exit(-1);
        return;
    }
    // 创建配置
    namesrvConfig = new NamesrvConfig();
    nettyServerConfig = new NettyServerConfig();
    nettyClientConfig = new NettyClientConfig();
    // 默认端口9876
    nettyServerConfig.setListenPort(9876);
    // 解析配置文件
    if (commandLine.hasOption('c')) {
      // 省略部分代码
    }
​
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    if (commandLine.hasOption('p')) {
      // 打印所有配置项
    }
    // 必须要有ROCKETMQ_HOME环境变量
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
​
}

参数解析并配置完成之后就是创建并启动NamesrvController,可以从下面源码中日志看到,NamesrvController启动之后,namesrv就算启动了,由此可见NamesrvController在namesrv扮演者举足轻重的地位。

public static NamesrvController createAndStartNamesrvController() throws Exception {
​
    // 创建NamesrvController
    NamesrvController controller = createNamesrvController();
    // 启动NamesrvController
    start(controller);
    // netty服务器端配置
    NettyServerConfig serverConfig = controller.getNettyServerConfig();
    String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());
    log.info(tip);
    System.out.printf("%s%n", tip);
    return controller;
}

创建NamesrvController的方法也十分简单,将刚才配置解析创建好的三个配置namesrvConfig, nettyServerConfig, nettyClientConfig作为构造参数创建NamesrvController

public static NamesrvController createNamesrvController() {
​
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
    return controller;
}

NamesrvController创建完成之后,就开始启动NamesrvController,NamesrvController启动方法位于NamesrvStartup#start,整个启动的过程分为两步,先初始化NamesrvController(controller.initialize),然后再启动NamesrvController(controller.start)。

下面源码中在NamesrvController初始化和启动之间还添加了一个JVM关闭的钩子函数,在JVM退出之前会先调用NamesrvController.shutdown方法关闭NamesrvController,及时释放资源。

public static NamesrvController start(final NamesrvController controller) throws Exception {
    // 省略部分代码
    // 初始化失败,则关闭shutdown namesrv
    boolean initResult = controller.initialize();
    // 初始化失败,则退出
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    // 添加jvm关闭钩子方法
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
        controller.shutdown();
        return null;
    }));
    // 启动controller
    controller.start();
​
    return controller;
}

NamesrvController初始化源码如下,它创建了通信用的组件,初始化发送请求和接受请求的线程池及队列等,具体细节在这里就不展开了

public boolean initialize() {
    // 加载kv config
    loadConfig();
    // 初始化NettyRemotingServer,NettyRemotingClient
    initiateNetworkComponents();
    // 初始化服务端请求处理线程池和客户端请求线程池
    initiateThreadExecutors();
    // 注册请求处理器
    registerProcessor();
    // 启动定时任务线程池
    startScheduleService();
    // 初始化ssl
    initiateSslContext();
    // 初始化初始化一个ZoneRouteRPCHook,处理根据topicname获取路由信息请求
    initiateRpcHooks();
    return true;
}

NamesrvController启动的过程启动了如下三个非常关键的组件

  • RemotingServer
  • RemotingClient
  • RouteInfoManager

public void start() throws Exception {
    // remotingServer启动
    this.remotingServer.start();
    // 省略部分代码
    this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()
        + ":" + nettyServerConfig.getListenPort()));
    // remotingClient启动
    this.remotingClient.start();
    // 文件监视服务
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
    // 路由信息管理启动
    this.routeInfoManager.start();
}

至此,namesrv就算启动完成了。如果配置了enableControllerInNamesrv还会创建并启动ControllerManager。

ControllerManager是支持RocketMQ集群自动主从切换的组件,这里就不详细阐述了

public static ControllerManager controllerManagerMain() {
    try {
        if (namesrvConfig.isEnableControllerInNamesrv()) {
            return createAndStartControllerManager();
        }
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

将上述启动过程整理成思维导图如下

namesrv关闭

namesrv的关闭其实就是NamesrvController的关闭,关闭方法在org.apache.rocketmq.namesrv.NamesrvController#shutdown,方法中处理的是网络客户端的关闭、线程池关闭以及路由信息Manager关闭。

public void shutdown() {
    // 网络通信客户端关闭
    this.remotingClient.shutdown();
    // 网络通信服务端关闭
    this.remotingServer.shutdown();
    // 默认线程池关闭
    this.defaultExecutor.shutdown();
    // 客户端请求线程池关闭
    this.clientRequestExecutor.shutdown();
    // 定时调度线程池关闭
    this.scheduledExecutorService.shutdown();
    // 不健康broker扫描线程池
    this.scanExecutorService.shutdown();
    // 路由信息Manager关闭
    this.routeInfoManager.shutdown();
    // fileWatchService关闭
    if (this.fileWatchService != null) {
        this.fileWatchService.shutdown();
    }
}

总结

namesrv的启动和停止其实就是NamesrvController的启动和停止,NamesrvController启动时需要读取配置信息,创建配置类,再根据配置类创建和初始化NamesrvController。NamesrvController处理的是资源的回收。

#java##java研发##后端#
全部评论
原来是这样
点赞 回复 分享
发布于 2023-03-28 17:57 吉林

相关推荐

牛客339922477号:都不用reverse,直接-1。一行。啥送分题
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务