RocketMQ实战—6.生产优化及运维方案

大纲

1.RocketMQ集群如何进行权限机制的控制

2.如何对RocketMQ集群进行消息堆积的追踪

3.如何处理RocketMQ的百万消息积压问题

4.针对RocketMQ集群崩溃的金融级高可用方案

5.为RocketMQ增加消息限流功能保证其高可用

6.从Kafka迁移到RocketMQ的双写双读方案

1.RocketMQ集群如何进行权限机制的控制

(1)RocketMQ进行权限控制的必要性

(2)在RocketMQ中实现权限控制的步骤

(1)RocketMQ进行权限控制的必要性

如果一个公司有很多技术团队,每个技术团队都会使用RocketMQ集群中的部分Topic。那么此时可能就会有一个问题:如果订单团队使用的Topic,被商品团队不小心写入了错误的脏数据,那就可能会导致订单团队的Topic里的数据出错。

所以此时就需要在RocketMQ中引入权限功能,也就是规定好订单团队的用户只能使用"OrderTopic"。然后商品团队的用户只能使用"ProductTopic",各团队互相之间不能使用对方的Topic。

(2)在RocketMQ中实现权限控制的步骤

步骤一:首先需要在Broker端放一个额外的ACL权限控制配置文件。配置文件里面需要规定好权限,包括什么用户对哪些Topic有什么操作权限,这样各个Broker才知道每个用户的权限。

步骤二:然后在每个Broker的配置文件里需要设置aclEnable=true,开启权限控制。

步骤三:接着在每个Broker机器的目录下放一个plain_acl.yml的配置文件。这个目录是${ROCKETMQ_HOME}/store/config,这个配置文件的具体权限配置如下:

# 这个参数就是全局性的白名单
# 这里定义的ip地址,都是可以访问Topic的
globalWhiteRemoteAddresses:
- 13.21.33.*
- 192.168.0.*

# 这个accounts就是说,你在这里可以定义很多账号
# 每个账号都可以在这里配置对哪些Topic具有一些操作权限
accounts:

# 这个accessKey其实就是用户名的意思,比如我们这里叫做“订单技术团队”
- accessKey: OrderTeam

# 这个secretKey其实就是这个用户名的密码
secretKey: 123456

# 下面这个是当前这个用户名下哪些机器要加入白名单的
whiteRemoteAddress:

# admin指的是这个账号是不是管理员账号
admin: false

# 这个指的是默认情况下这个账号的Topic权限和ConsumerGroup权限
defaultTopicPerm: DENY
defaultGroupPerm: SUB

# 这个就是这个账号具体的堆一些账号的权限
# 下面就是说当前这个账号对两个Topic,都具备PUB|SUB权限,就是发布和订阅的权限
# PUB就是发布消息的权限,SUB就是订阅消息的权限
# DENY就是拒绝你这个账号访问这个Topic
topicPerms:
- CreateOrderInformTopic=PUB|SUB
- PaySuccessInformTopic=PUB|SUB

# 下面就是对ConsumerGroup的权限,也是同理的
groupPerms:
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
# 下面就是另外一个账号了,比如是商品技术团队的账号
- accessKey: ProductTeam
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*

# 如果admin设置为true,就是具备一切权限
admin: true

上面配置需要注意的是:如果一个账号没有对某个Topic显式指定权限,那么就会采用默认Topic权限。

步骤四:最后在生产者和消费者中,指定分配到的RocketMQ账号。这样,当生产者或消费者使用一个账号时,就只能访问有权限的Topic。

DefaultMQProducer producer = new DefaultMQProducer(
    "OrderProducerGroup",
    new AclClientRPCHook(new SessionCredentials(OrderTeam, "123456"))    
);

上面的代码就是在创建Producer时,传入了一个AclClientRPCHook实例。在AclClientRPCHook里就可以设置这个Producer的账号密码,对于创建Consumer也是同理。通过这样的方式就可以在Broker端设置好每个账号对Topic的访问权限,然后不同的技术团队使用不同的账号即可。

2.如何对RocketMQ集群进行消息堆积的追踪

(1)开启RocketMQ的消息轨迹功能的步骤

(2)配置好消息轨迹功能后消息轨迹的处理流程

(1)开启RocketMQ的消息轨迹功能的步骤

有时候需要了解一条消息的消息轨迹,来协助排查线上问题。比如想知道消息是什么时候从哪个Producer发出来的,什么时候进入到了哪个Broker的哪个Topic,什么时候被哪个Consumer消费的。此时就可以使用RocketMQ的消息轨迹功能,其配置步骤如下:

步骤一:首先在Broker的配置文件里开启消息轨迹追踪的功能,也就是设置traceTopicEnable = true。开启了该功能之后,启动Broker时就会自动创建出一个内部的Topic:RMQ_SYS_TRACE_TOPIC,这个Topic会存储所有消息的轨迹追踪数据。

步骤二:然后在发送消息和消费消息时开启消息轨迹追踪的功能。也就是在创建Producer和Consumer时,其构造函数的第二个参数enableMsgTrace设置为true。

DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup", true);

(2)配置好消息轨迹功能后消息轨迹的处理流程

当Broker、Producer、Consumer都配置好消息轨迹追踪后:

首先,Producer发送消息时就会上报这个消息的轨迹数据到RMQ_SYS_TRACE_TOPIC里。此时上报的数据包括:Producer的信息、发送消息的时间、消息是否发送成功、发送消息的耗时。

接着,消息到达Broker之后,Broker也会记录消息的轨迹数据。此时记录的数据包括:消息存储的Topic、消息存储的位置、消息的key和tags。

然后,消息被Consumer消费时,Consumer也会上报一些轨迹数据到RMQ_SYS_TRACE_TOPIC里。此时上报的数据包括:Consumer的信息、投递消息的时间、这是第几轮投递消息、消息消费是否成功、消费这条消息的耗时。

最后,如果想要查询消息轨迹,只需要在RocketMQ控制台里查询。其导航栏就有一个消息轨迹,在里面可以创建查询任务。可以根据messageId、message key或者Topic来查询。查询任务执行完毕后,就可以看到消息轨迹的界面了。在消息轨迹的界面里就会展示出Producer、Broker、Consumer上报的轨迹数据了。

3.如何处理RocketMQ的百万消息积压问题

(1)产生消息积压问题的案例背景

(2)直接丢弃消息来解决消息积压问题

(3)在旧Topic上扩容消费者来解决消息积压问题

(4)通过新Topic扩容消费者来解决消息积压问题

(5)消息积压问题的处理总结

(1)产生消息积压问题的案例背景

曾经有一个系统,它就是由生产者和消费者两部分组成的。生产者负责不停地把消息写入RocketMQ里,然后消费者负责从RocketMQ里消费消息。这个系统运行时是有高峰期和低谷期的,在晚上几个小时的高峰期内,大概会有100多万条消息进入RocketMQ。此外,消费者从RocketMQ里获取到消息后,会依赖NoSQL数据库进行一些业务逻辑的处理。

有一天晚上,消费者依赖的NoSQL数据库挂了,导致消费者没法继续从RocketMQ里消费数据进行处理。然后生产者在晚上几个小时的高峰期内,往RocketMQ写入了100多万条消息,这些消息都被积压了。

处理这种紧急的线上事故,一般有如下几种方案。

(2)直接丢弃消息来解决消息积压问题

如果这些消息是允许丢失的,那么此时可以紧急修改消费者的代码:在代码里对所有获取到的消息直接丢弃,不做任何处理。这样可以迅速让积压在RocketMQ里的百万消息被处理掉,只不过处理方式就是全部丢弃而已。

(3)在旧Topic上扩容消费者来解决消息积压问题

如果这些消息是不允许丢失的,那么可以先等待消费者依赖的NoSQL数据库恢复,恢复后就可以根据线上Topic的MessageQueue数量来决定如何处理。

假设线上Topic有20个MessageQueue,然后只有4个消费者在消费,那么每个消费者会从5个MessageQueue里获取消息。此时如果仅仅依靠4个消费者来消费肯定是会继续积压消息的,毕竟RocketMQ里已经积压了百万消息了。

所以此时可以临时申请16台机器多部署16个消费者实例,然后让20个消费者同时消费。每个消费者消费一个MessageQueue的消息,此时消费的速度会提高5倍,积压的百万消息很快就会被处理完。

但是这里需要考虑消费者依赖的NoSQL数据库必须要能抗住临时增加5倍的读写压力,因为原来只有4个消费者在读写NoSQL,现在临时变成了20个消费者了。当处理完百万积压的消息后,就可以下线多余的16台机器了。

这是最常见的处理百万消息积压的办法。

(4)通过新Topic扩容消费者来解决消息积压问题

如果Topic总共就只有4个MessageQueue,然后只有4个消费者呢?这时就没办法扩容消费者了,因为加再多的消费者,还是只有4个MessageQueue,没法降低原来消费者的消费压力了。

所以此时需要临时修改那4个消费者的代码,让它们获取到消息后不依赖NoSQL,直接把消息写入一个新的Topic,这时候的速度是很快的,因为仅仅是读写RocketMQ而已。然后新的Topic会有20个MessageQueue,于是部署20台临时增加的消费者去消费新的Topic,消费新的Topic时才依赖NoSQL。通过将积压的消息转移到一个新的Topic,来解决无法扩容消费者的问题。

(5)消息积压问题的处理总结

如果MessageQueue比较多,可以直接扩容消费者,那么就直接临时增加消费者实例来扩容消费者。

如果MessageQueue比较少,不能直接扩容消费者,那么就把积压在原Topic的消息写入到新Topic。在消费新Topic的消息时,临时部署足够多的消费者实例,来实现间接扩容消费者。

4.针对RocketMQ集群崩溃的金融级高可用方案

金融级的系统如果依赖了RocketMQ集群,那么应该如何设计RocketMQ集群崩溃时的高可用方案?

通常会在发送消息到RocketMQ的系统中设计高可用的降级方案,这个降级方案的思路如下:

在发送消息到RocketMQ的代码里通过try catch捕获异常,如果发现异常就进行重试。如果连续重试3次还是失败,则说明RocketMQ集群可能彻底崩溃了。此时需要把这条重要的消息进行持久化:可以是数据库、本地磁盘文件、NoSQL存储。之后需要不停地尝试发送消息到RocketMQ,一旦发现RocketMQ集群恢复,则通过后台线程把之前持久化存储的消息查询出来,然后按顺序发送到RocketMQ,这样才能保证消息不会因为RocketMQ集群彻底崩溃而丢失。

注意:对消息进行持久化的时候要保证它的顺序。

只要使用这个方案,哪怕RocketMQ集群突然崩溃了,系统也不会丢失消息。这种高可用的方案设计,对于一些和金钱相关的金融系统、广告系统来说,是非常有必要的。

5.为RocketMQ增加消息限流功能保证其高可用

为什么要给RocketMQ增加限流功能保证其高可用性?因为限流功能可以对RocketMQ提供保护,避免因为Bug等原因导致短时间往RocketMQ写入大量数据而出现故障。

比如下面的代码,因为某些原因导致在while循环中向RocketMQ发消息。如果系统是部署在10多台机器上的,那么可能出现10多台机器都频繁往RocketMQ写消息,瞬间导致RocketMQ集群的TPS飙升,压垮RocketMQ集群。

try {
    //业务代码
    producer.send(message);
} catch (Exception e) {
    while (true) {
        producer.send(message);
    }
}

所以针对这种情况,一般会改造RocketMQ的源码。在Broker接收消息时,引入限流机制。只允许一秒内写入多少条消息,避免因为一些异常情况,导致RocketMQ集群挂掉。

6.从Kafka迁移到RocketMQ的双写双读方案

假设系统原来使用的MQ是Kafka,现在要从Kafka迁移到RocketMQ,那么这个迁移过程应该怎么做?

首先要做到双写,也就是在所有的Producer系统中,同时往Kafka和RocketMQ写入消息。一般会让双写持续1周左右,因为MQ里面数据也就最多保留一周。当双写持续一周后,Kafka和RocketMQ里的数据基本一模一样了。

但光是双写还不够,还需要同时进行双读。在双写的同时,所有Consumer消费者都要同时从Kafka和RocketMQ里获取消息,并且用一模一样的逻辑进行处理。只不过从Kafka里获取到的消息还是执行核心的逻辑进行处理,落入数据库或者其他存储。而从RocketMQ里获取到的消息,虽然也用同样逻辑进行处理,但不会把处理结果落入数据库或其他存储。

Consumer消费消息时,需要统计每天从Kafka和RocketMQ读取和处理的消息量,以及记录对应的消息处理结果到某个临时存储中。这样一段时间后,就可以对比从Kafka和RocketMQ读取和处理的消息量是否一致、处理的消息结果是否一致。如果是,那么就可以进行正式切换了。

基本上对于类似中间件的迁移,都会采取这种双写双读的方案。双写一段时间后,再观察结果是否都一致。如果是,那么再进行切换。

后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

#大家都开始春招面试了吗# 1. 传统的 CSS 文件(全局样式)传统的 CSS 方案通常是将所有的样式放在一个或多个 CSS 文件中,通过  标签引入。在小型项目或单页面应用中,简单直接。使用场景:- 小型项目- 快速开发和原型设计- 个人项目优点:- 结构简单,易于理解。- 实现快速,适合小项目。缺点:- 不利于维护:当项目增大时,CSS 可能变得冗长且重复。- 样式冲突:没有隔离,容易出现不同组件间的样式污染。2. CSS 预处理器(如 Sass / LESS / Stylus)CSS 预处理器通过扩展 CSS,提供了变量、嵌套规则、混合宏(mixin)等功能,提升了样式的复用性和可维护性。使用场景:- 中型项目- 需要模块化和可扩展性的项目- 需要多次复用样式的项目优点:- 支持变量、嵌套、函数等增强功能,提高代码可维护性。- 提高代码的模块化与复用性。- 代码组织更加清晰,能避免重复的 CSS 代码。缺点:- 编译过程:需要将 Sass 或 LESS 编译成 CSS。- 学习曲线相对较陡(尤其对于初学者)。示例(Sass):// _variables.scss$primary-color: #3498db;$font-size: 16px;// _layout.scss.container {  width: 80%;  margin: 0 auto;}// main.scss@import 'variables';@import 'layout';body {  font-size: $font-size;}.header {  background-color: $primary-color;}3. BEM(块元素修饰符)命名法BEM 是一种 CSS 类命名方法论,适用于大型项目,帮助前端开发者更好地组织和管理样式。BEM 将样式拆分成更小的、功能化的组件,通过定义明确的命名规范来减少样式冲突和提高代码的可维护性。使用场景:- 大型项目,尤其是前端与后端分离时- 团队协作项目- 需要高度模块化和可复用的项目优点:- 命名规范清晰,避免命名冲突。- 样式组件化,提升可复用性。- 便于多人协作开发。缺点:- 代码量相对较多,类名冗长。- 初学者学习曲线较陡。示例:/* BEM 风格 */.button {  padding: 10px;  background-color: blue;}.button--primary {  background-color: #3498db;}.button__icon {  margin-right: 5px;}4. CSS-in-JSCSS-in-JS 是将 CSS 写在 JavaScript 代码中的一种方法,通常与 React 等框架一起使用。它将 CSS 作为 JavaScript 对象来定义,样式和组件逻辑耦合在一起,从而实现样式的动态计算和管理。使用场景:- React、Vue 等组件化框架项目- 需要根据组件状态动态修改样式- 小型项目或者需要样式和组件解耦的大型项目优点:- 样式与组件逻辑结合,易于管理和维护。- 动态生成样式,支持主题切换、响应式设计等功能。- 减少全局样式冲突,样式范围仅限于组件内部。缺点:- 对性能有一定影响(样式计算和注入)。- 需要配置或使用框架(如 styled-components、emotion)。示例(styled-components):// React + styled-components 示例import styled from 'styled-components';const Button = styled.button`  padding: 10px;  background-color: ${props => props.primary ? '#3498db' : '#ccc'};  color: white;  border: none;`;const App = () => (      Primary Button    Secondary Button  );5. CSS 模块化(CSS Modules)CSS 模块化方案允许将 CSS 写在独立的文件中,并且通过自动生成的唯一类名避免样式冲突。它通常与 Webpack 一起使用,支持作用域限定的 CSS。使用场景:- 现代前端开发,尤其是使用 Webpack 或 React/Vue 等框架时。- 需要避免全局样式污染的项目。优点:- 避免了全局命名空间污染。- 类名自动生成,保证唯一性和作用域隔离。- 支持组件化,提升可维护性。缺点:- 需要额外的配置和工具(如 Webpack loader)。- 类名经过编译后可能较长。示例(React + CSS Modules):// App.module.css.button {  padding: 10px;  background-color: #3498db;  color: white;}// App.jsximport React from 'react';import styles from './App.module.css';const App = () => (  Click Me);6. Tailwind CSS(原子化 CSS)Tailwind CSS 是一种实用工具优先(utility-first)CSS 框架,提供了大量的原子类(单一功能的 CSS 类),可以快速构建复杂的布局。使用场景:- 快速开发和原型设计- 喜欢“原子类”方式的开发者- 开发团队需要减少自定义样式的项目优点:- 提供了高度复用的类,减少了自定义 CSS 的需求。- 代码量少,开发速度快。- 通过配置定制化 Tailwind,适应项目需求。缺点:- 类名会非常多,容易使 HTML 代码显得冗长。- 学习成本较高,特别是对于不熟悉原子类概念的开发者。示例:      Click Me  7. Atomic CSS(原子化 CSS)原子化 CSS 和 Tailwind 类似,旨在通过提供小的、独立的 CSS 类来构建样式。每个类只做一件事,组合起来形成完整的样式。使用场景:- 大型项目,尤其是需要高效的布局和样式调整时。- 需要减少样式重复的项目。优点:- 高效的代码复用。- 提高样式的可维护性。- 可以轻松地修改布局和样式,而无需修改整个 CSS。缺点:- 类名较长,HTML 中的样式代码显得冗长。- 学习曲线较高。---用的多的CSS技术Flex典型用途:- 创建水平和垂直居中的布局- 实现响应式设计- 动态调整容器内部元素的排列媒体查询媒体查询是响应式设计的核心工具之一,它使得在不同设备上展示不同样式成为可能。常见用法是根据屏幕尺寸来调整布局。典型用途:- 响应式设计- 根据屏幕宽度调整页面布局或字体大小- 控制不同设备上的显示效果Position(定位)定位是 CSS 中的一个基本概念,通过 position 属性可以控制元素的定位方式。常见的有 static, relative, absolute, fixed 和 sticky 等。典型用途:- 创建浮动元素- 固定元素(如固定导航栏)- 在容器内相对定位元素Transitions(过渡效果)CSS 过渡效果可以让你在元素的属性发生变化时添加平滑的过渡效果,常用于按钮、图片、导航栏等的交互效果。典型用途:- 鼠标悬停时的平滑动画效果- 改变背景色、宽度、透明度等属性时的平滑过渡伪类和伪元素用于在不添加额外 HTML 元素的情况下,对元素的特定状态或部分应用样式。典型用途:- :hover, :focus, :active 等交互状态- ::before, ::after 等用于生成内容和装饰Overflow(溢出处理)overflow 属性用于处理内容溢出的情况,常见用途是创建可滚动的区域或隐藏溢出内容。典型用途:- 创建带滚动条的容器- 隐藏溢出内容
点赞 评论 收藏
分享
评论
1
1
分享

创作者周榜

更多
牛客网
牛客企业服务