一篇文章快速掌握redis、mp、es、rocketmq使用

仿b站前后端分离微服务项目,

实现了以下功能:

视频的上传、查看与上传时获取封面

视频的点赞、评论、可同时新增和删除多个收藏记录的收藏、多功能的弹幕

用户的个人信息查看编辑、用户之间的关注

用户的个人主页权限修改、查看、由个人主页权限动态决定的用户个人主页内容的获取

手机号、邮箱、图形验证码的多种方式登录

支持临时会话的服务器为代理的一对一实时私聊

基于讯飞星火的文生文、文生图、(全网首发)智能PPT

关注up动态视频、评论、点赞、私聊消息的生成与推送

基于es实现的视频和用户的聚合搜索、推荐视频

网关的路由和统一鉴权与授权

基于双token的七天内无感刷新token

防csrf、xss、抓包、恶意上传脚本攻击

统一处理异常和泛型封装响应体、自定义修改响应序列化值

简易的仿redis缓存读取与数据过期剔除实现

xxl-job+ redis+ rocketmq+ es+ 布隆过滤器的自定义es与mysql数据同步

slueth+zipkin的多服务间请求链路追踪

集中多服务日志到一个文件目录下与按需添加特定内容入日志

多服务的详细接口文档

项目地址LABiliBili,github地址GitHub - aigcbilibili/aigcbilibili: 仿bilibili前后端实现,演示地址https://labilibili.com/video/演示.mp4,如果大家觉得有帮助的话可以去github点个小星星♪(・ω・)ノ

下面是项目中redis、mybatis-plus、es、rocketmq的使用

(由于使用父子模块写法,所以子模块引入依赖的版本都在父pom.xml文件中查看,下方不再给出版本号)

redis的使用

准备工作

引入SpringBoot集成后的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置连接地址

spring:
 redis:
  host:localhost:84
#  password:   默认为空

在容器中注册bean

@Configuration
public class RedisConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Bean
    public <T> RedisTemplate<String, T> objectRedisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, T> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());

        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public RedisTemplate<String, byte[]> bytesRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, byte[]> template = new RedisTemplate<>();
        template.setKeySerializer(RedisSerializer.string());
        template.setValueSerializer(RedisSerializer.byteArray());
        template.setHashKeySerializer(RedisSerializer.string());
        template.setHashValueSerializer(RedisSerializer.byteArray());
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setKeySerializer(RedisSerializer.string());
        template.setValueSerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        template.setHashValueSerializer(RedisSerializer.string());
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }


}

若要在项目代码基础上做修改需注意在项目代码中用的@Resource注解,而该注解默认是byName,这里的Name指的是配置类中标注了@Bean注解的方法的名字,如使用redis时键值类型都为String类型时则注入redisTemplate,分别为String和object类型时则使用objectRedisTemplate,否则项目运行不会报错但实际调用方法时会报与预期类型不匹配。

使用redis

引入依赖、编写配置文件、编写配置类后就可以在业务层注入需要的某个redisTemplate的Bean来使用了,spring-boot-starter-data-redis针对redis数据结构封装了两层,调用两层后才能针对redis中某种数据结构进行增删改查操作,以最常见的键值数据结构为例

redisTemplate.opsForValue().set(Object key,Object value);
存入一对键值对,默认永不超时
redisTemplate.opsForValue().set(Object key,Object value,long timeout,TimeUnit timeUnit);
存入一对键值对,在以传入的时间单位为单位的前提下过了timeout时间后过期
redisTemplate.opsForValue().get(Object key);
根据键名获取键值对的值

走到这一步已经可以自如使用redis了,而redis的存取方法有相当多的重载版本,可以根据需求去使用

mybatis-plus的自动填充值处理器、条件mapper的使用

引入依赖

<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>

按特定注解编写实体类

@Data
@Accessors(chain = true)
@TableName("chat_notice")
public class Chat {

    @TableField("sender_id")
    private Integer senderId;

    @TableField("content")
    private String content;

    @TableField("receiver_id")
    private Integer receiverId;

    @TableField(value = "create_time",fill = FieldFill.INSERT)
    private LocalDateTime createTime;

    @TableId(type = IdType.AUTO)
    private Integer id;
    @TableField(value = "status",fill = FieldFill.INSERT)
    private Integer status;
}

其中@TableField是一般字段,负责将数据库表的列名与Java对象属性名映射到一起,@TableId是id字段,不同type值决定了id增长策略,默认雪花算法,当前代码是设定为自增主键。@TableName标识该实体类对应数据库中的表。

自动填充值处理器的使用

package ljl.bilibili.handler;
 
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import org.apache.ibatis.reflection.MetaObject;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.Date;
import java.util.UUID;
/**
 *自动填充处理器
 */

@Component
public class MyMetaObjectHandler implements MetaObjectHandler {
 
 
    @Override
    public void insertFill(MetaObject metaObject) {
        this.setFieldValByName("cover", "https://labilibili.com/user-cover/default.png", metaObject);
        this.setFieldValByName("nickname", "新用户"+ UUID.randomUUID().toString().substring(0,10), metaObject);
        this.setFieldValByName("createTime", LocalDateTime.now(), metaObject);
        this.setFieldValByName("danmakuCount", 0, metaObject);
        this.setFieldValByName("playCo unt",0, metaObject);
        this.setFieldValByName("likeCount",0, metaObject);
        this.setFieldValByName("collectCount",0, metaObject);
        this.setFieldValByName("commentCount",0, metaObject);
        this.setFieldValByName("status",0,metaObject);
        this.setFieldValByName("collectGroup",0,metaObject);
        this.setFieldValByName("remotelyLike",0,metaObject);
        this.setFieldValByName("fansList",0,metaObject);
        this.setFieldValByName("idolList",0,metaObject);
    }
 
    @Override
    public void updateFill(MetaObject metaObject) {
        this.setFieldValByName("updateTime", LocalDateTime.now(), metaObject);
    }
}

编写一个实现了MetaObjectHandler接口的处理器类并使用@Component注解注册到容器中,实现insertFill和updateFill实现新增和修改时自动修改值

 @TableField(value = "create_time",fill = FieldFill.INSERT)
    private LocalDateTime createTime;
this.setFieldValByName("createTime", LocalDateTime.now(), metaObject);

在需要自动更新的实体属性的@TableField注解中标注fill的类型,然后在实现的方法中将该属性字段名、自动赋予的值填进去,最后一个参数固定为传递进实现方法的参数metaObject。这里需注意自动赋予的值的类型必须和该实体属性的类型一致。

固定格式的mapper

@Mapper
public interface CommentNoticeMapper extends BaseMapper<CommentNotice> {
}

继承BaseMapper并将泛型的值限定为该Mapper对应操作的实体类的类名,在该Mapper类上标记@Mapper注解即可在业务层引入该Mapper后使用

增删改查操作的使用

前提都是在业务代码中引入了该Mapper

@Resource
CommentNoticeMapper commentNoticeMapper;

查询

创建一个查询需要用到的封装查询条件wrapper
LambdaQueryWrapper<CommentNotice> wrapper=new LambdaQueryWrapper<>();
   等价于where 列值(↑commentNotice实体对应的属性)=1
        wrapper.eq(CommentNotice::getStatus,1);
   等价于where 列值(↑commentNotice实体对应的属性)!=1
        wrapper.ne(CommentNotice::getStatus,1);
   等价于where 列值(↑commentNotice实体对应的属性)in 存id的集合
        List<Integer> ids=new ArrayList<>();
        wrapper.in(CommentNotice::getVideoId,ids);
   等价于where 列值<1
      int value=1;
        wrapper.le(CommentNotice::getStatus,value);
   等价于where 列值>0 
        wrapper.ge(CommentNotice::getStatus,0);
   等价于按status来升序排列  
        wrapper.orderByAsc(CommentNotice::getStatus);
   等价于按status来降序排列
        wrapper.orderByDesc(CommentNotice::getStatus);  
   等价于按status来分组 
        wrapper.groupBy(CommentNotice::getStatus);
   等价于按status、videoid来分组  
        wrapper.groupBy(CommentNotice::getStatus,CommentNotice::getVideoId);  
   等价于senderName值like "%man"  
        wrapper.like(CommentNotice::getSenderName,"man");
   按条件查询一个对象,注意该查询若查出记录不止一条会报错  
        CommentNotice commentNotice=commentNoticeMapper.selectOne(wrapper);
   按条件查询一个对象集合
        List<CommentNotice> commentNoticeList=commentNoticeMapper.selectList(wrapper);
   根据id集合查询对象集合 
        List<CommentNotice> commentNoticeList1=commentNoticeMapper.selectBatchIds(ids);
   根据id查询一个对象 
       CommentNotice commentNotices=commentNoticeMapper.selectById(1);  


增加

插入一个对应Mapper泛型的实体类
commentNoticeMapper.insert(new CommentNotice());
插入后该对象的id会自动赋值,有时可以利用该特性

删除

按条件删除
LambdaQueryWrapper<CommentNotice> wrapper=new LambdaQueryWrapper<>();
commentNoticeMapper.delete(wrapper);
wrapper上文已提过,此处省略
 List<Integer> ids=new ArrayList<>();
 commentNoticeMapper.deleteBatchIds(ids);
根据id集合批量删除
 commentNoticeMapper.deleteById(1);
根据实体对象中的id值删除
 commentNoticeMapper.deleteById(new CommentNotice());

修改

创建一个用于更新值的wrapper
LambdaUpdateWrapper<CommentNotice> wrapper=new LambdaUpdateWrapper<>();
更新status的值为1
wrapper.set(CommentNotice::getStatus,1);
条件更新方法同查询,eq、ne、in等,不再赘述

es的使用

引入依赖

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

配置连接地址

elasticsearch-client:
    hosts: localhost:9200

编写配置类

@Component
@ConfigurationProperties(prefix = "elasticsearch-client")
用于读取配置文件中elasticsearch-client对应的值
public class ElasticsearchClientProperties {

    private List<String> hosts;

    private String username;

    private String password;

    public List<String> getHosts() {
        return hosts;
    }

    public void setHosts(List<String> hosts) {
        this.hosts = hosts;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

@Configuration
@Slf4j
public class ElasticsearchClientConfig {
使用Properties类中读取的配置文件的属性
    private final ElasticsearchClientProperties properties;

    public ElasticsearchClientConfig(ElasticsearchClientProperties properties) {
        this.properties = properties;
    }

    @ConditionalOnMissingBean(RestHighLevelClient.class)
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        if(properties==null){
            log.error("properties==null");
        }else {
          List<String> list = properties.getHosts();
          if(list.size()==0){
              log.error("list==0");
          }
        }

        HttpHost[] hosts = properties.getHosts().stream()
                .map(HttpHost::create).toArray(HttpHost[]::new);
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(hosts)
                        .setHttpClientConfigCallback(httpClientBuilder ->
                                httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
                                        .setIoThreadCount(8).build())
                        )
                        .setRequestConfigCallback(requestConfigBuilder ->
                                requestConfigBuilder.setConnectTimeout(300000)
                                        .setSocketTimeout(300000)
                        )
        );

        log.info("Connecting Elasticsearch...");
        while (true) {
            try {
                if (client.ping(RequestOptions.DEFAULT)) {
                    break;
                }
            } catch (Exception e) {
                log.warn("Connecting Elasticsearch Failed, Retry in 10 seconds...");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e1) {
                    log.error("", e1);
                }
            }
        }
        log.info("Elasticsearch Connected!");
        return client;
    }
}

es的使用

es与mysql可以类比理解,es的索引类比mysql的表,es的列和列数据类型类似mysql表的列和列数据类型

使用前提条件是引入了resthighlevelclient对象

@Resource
public RestHighLevelClient client;

创建索引

  try {
             填充索引列名和数据类型
            String index = esIndexRequest.getIndexName();
            Map<String, String> map = esIndexRequest.getProperties();
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.startObject(INDEX_START_OBJECT_MAPPINGS);
                {
                    builder.startObject(INDEX_START_OBJECT_PROPERTIES);
                    {
                        // 添加其他字段
                        for (Map.Entry<String, String> entry : map.entrySet()) {
                            builder.startObject(entry.getKey());
                            {
                                builder.field(INDEX_FIELD_TYPE, entry.getValue());
                            }
                            builder.endObject();
                        }
                    }
                    builder.endObject();
                }
                builder.endObject();
            }
            builder.endObject();
         创造创建索引对象
            CreateIndexRequest request = new CreateIndexRequest(index);
          将列名数据类型填充进创建索引对象
            request.source(builder);
            client.indices().create(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

删除索引

创建删除索引对象     
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
        boolean exists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        if (exists) {
            DeleteIndexRequest request = new DeleteIndexRequest(indexName);
            client.indices().delete(request, RequestOptions.DEFAULT);
            System.out.println("索引删除了");
        } else {
            System.out.println("索引不存在");
        }
        return true;
    }

查询文档

创建查询对象
SearchRequest searchRequest=new SearchRequest("video");
创建搜索源
SearchSourceBuilder sourceBuilder=new SearchSourceBuilder();
创建查询条件对象,例如匹配所有和精确匹配某些字段
MatchAllQueryBuilder matchAllQueryBuilder=new MatchAllQueryBuilder();
MultiMatchQueryBuilder multiMatchQueryBuilder=new MultiMatchQueryBuilder();
sourceBuilder.query(multiMatchQueryBuilder);
sourceBuilder.query(matchAllQueryBuilder);
执行查询
client.search(searchRequest,RequestOptions.DEFAULT);

创建文档

绑定索引
IndexRequest indexRequest = new IndexRequest("video");
对应请求中的map
Map map;
添加新增源
indexRequest.source(map, XContentType.JSON);
client.index(indexRequest, RequestOptions.DEFAULT);

删除文档

创建删除文档对象
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(“video”);
绑定id到要被删除文档,id类型为String
deleteRequest.id(id);
client.delete(deleteRequest, RequestOptions.DEFAULT);

更新文档

绑定索引
UpdateRequest updateRequest= new IndexRequest().index("video").id("3");
对应请求中的值的map
Map map;
将要被更新的值绑定进去
updateRequest.doc(map);
添加修改源
updateRequest.source(map, XContentType.JSON);
client.index(updateRequest, RequestOptions.DEFAULT);
map可以理解为一个对象,修改文档值就类似把这个传入的map也就是传入的对象的值一一复刻到文档中去

批量操作文档

IndexRequest indexRequest=new IndexRequest("video");
indexRequest.source(map);
UpdateRequest updateRequest=new UpdateRequest().index("video").id("3");
 updateRequest.doc(map);
创建批量操作对象
BulkRequest bulkRequest=new BulkRequest();
添加操作,可在一个批量操作对象中添加多个不同类型的操作
bulkRequest.add(indexRequest);
bulkRequest.add(updateRequest);
执行批量操作
 client.bulk(bulkRequest, RequestOptions.DEFAULT);

注意,indexRequest其实也可以用作更新,无id时创建新的文档,有id时更新现有文档,和updateRequest的区别在于indexRequest会覆盖原有值为空,比如indexRequest要操作的文档有3个列name,id,cover,若只有name和id有值那么更新原有文档时会将原文档的cover值覆盖为null,但使用updateRequest便不会出现这种情况。

rocketmq的使用

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

配置地址

rocketmq:
  name-server: localhost:9876
  producer:
    group: test

rocketmq-spring-boot-starter已经封装好了核心类RocketmqTemplate,因此无需编写配置类即可使用该类。

发送消息

主题名称
String topic="video-encode";
使用Springboot自带Jackson将对象json化
ObjectMapper objectMapper=new ObjectMapper();
String jsonMessage=objectMapper.writeValueAsString(uploadVideo);
异步发送,可以利用到回调函数检测发送情况
rocketMQTemplate.asyncSend(topic, jsonMessage, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.info("Rocket发射成功");
    }

    @Override
    public void onException(Throwable throwable) {
        throwable.getMessage();
    log.error("坠机:");
    }
});

消费消息

@Service
@RocketMQMessageListener(
主题名
        topic = "comment",
消费所在组
        consumerGroup = "comment-group",
消费模式,有并发型和按消息发送顺序传递型
        consumeMode = ConsumeMode.CONCURRENTLY
)
public class CommentConsumer implements RocketMQListener<MessageExt> {
    @Resource
    CommentNoticeMapper commentNoticeMapper;
    @Resource
    ObjectMapper objectMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
发送的json消息经过rocketmq内部流通变成MessageExt,再转成json对象
        String json = new String(messageExt.getBody(), StandardCharsets.UTF_8);
使用Jackson反序列化成自定义对象
 LikeNotice likeNotice = objectMapper.readValue(json, LikeNoticeAddOrDelete.class);

到此,redis、mybatis-plus、es、rocketmq的常见使用文档就都已完成♪(・ω・)ノ

#25届暑期实习##25届秋招提前批##美团##腾讯##最后再改一次简历#

该专栏存放前后端分离仿b站微服务项目相关教程

全部评论
1
1 回复 分享
发布于 07-14 20:09 湖南
牛逼 佬 弟弟佩服
点赞 回复 分享
发布于 11-09 16:42 河北

相关推荐

我朋友的华子2012,HR已经开始问意向地区了,好急
不讲武德的黑眼圈很能干:急得不行 也不说评级 不知道报的多少啊😡
点赞 评论 收藏
分享
10-09 22:05
666 C++
找到工作就狠狠玩CSGO:报联合国演讲,报电子烟设计与制造
点赞 评论 收藏
分享
2 12 评论
分享
牛客网
牛客企业服务