问答社区-系统通知(kafka)

1.发送系统通知

需求
触发事件:

  • 评论后,发布通知
  • 点赞后,发布通知
  • 关注后,发布通知

处理事件:

  • 封装事件对象
  • 开发事件的生产者
  • 开发事件的消费者
    图片说明

1、新建事件对象

set的时候返回当前对象,这样就可以一直set了

/**
 * 事件
 */
public class Event {
    //事件主题
    private String topic;
    //事件触发的人
    private int userId;
    //目标实体
    private int entityType;
    private int entityId;
    private int entityUserId;

    private Map<String, Object> data = new HashMap<>();

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key,Object value) {
        this.data.put(key,value);
        return this;
    }
}

2、新建事件生产者

使用kafkatemplate,将对应的事件发布到指定的topic

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    //处理事件
    public void fireEvent(Event event){
        //将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }


}

3、新建事件消费者

每当事件发生,将对应的数据新增到message表中

@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误");
            return;
        }

        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        Map<String, Object> content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());

        if (!event.getData().isEmpty()) {
            for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
                content.put(entry.getKey(), entry.getValue());
            }
        }

        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);

    }
}

4、找到评论、点赞、关注的controller,新增事件触发

CommentController

@RequestMapping(value = "/add/{discussPostId}", method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        //触发评论事件
        Event event = new Event().setTopic(TOPIC_COMMENT).
                setUserId(hostHolder.getUser().getId()).
                setEntityType(comment.getEntityType()).
                setEntityId(comment.getEntityId()).
                setData("postId", discussPostId);
        if (comment.getEntityType() == ENTITY_TYPE_POST) {
            int userId = discussPostService.findDiscussPostById(comment.getEntityId()).getUserId();
            event.setEntityUserId(userId);
        }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
        eventProducer.fireEvent(event);

        return "redirect:/discuss/detail/" + discussPostId;
    }

likeController

@RequestMapping(value = "/like", method = RequestMethod.POST)
    @ResponseBody
    public String like(int entityType, int entityId,int entityUserId,int postId) {
        User user = hostHolder.getUser();
        //点赞
        likeService.like(user.getId(), entityType, entityId,entityUserId);
        //数量
        long entityLikeCount = likeService.findEntityLikeCount(entityType, entityId);
        //状态
        int entityLikeStatus = user == null ? 0 : likeService.findEntityLikeStatus(user.getId(), entityType, entityId);

        Map<String, Object> map = new HashMap<>();
        map.put("likeCount", entityLikeCount);
        map.put("likeStatus", entityLikeStatus);

        if(entityLikeStatus == 1){
            Event event = new Event().setTopic(TOPIC_LIKE)
                    .setUserId(hostHolder.getUser().getId())
                    .setEntityType(entityType)
                    .setEntityId(entityId)
                    .setEntityUserId(entityUserId)
                    .setData("postId",postId);
            eventProducer.fireEvent(event);
        }

        return CommunityUtil.getJSONString(0, null, map);
    }

followController

@LoginRequired
    @RequestMapping(value = "/follow", method = RequestMethod.POST)
    @ResponseBody
    public String follow(int entityType, int entityId) {
        User user = hostHolder.getUser();

        followService.follow(user.getId(), entityType, entityId);
        //触发事件
        Event event = new Event().setTopic(TOPIC_FOLLOW)
                .setUserId(user.getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityId);
        eventProducer.fireEvent(event);

        return CommunityUtil.getJSONString(0, "已关注!");
    }

2.显示系统通知

需求:
通知列表

  • 显示评论、点赞、关注三种类型的通知
    通知详情
  • 分页显示某一主题所包含的通知
    未读消息
  • 在页面头部显示所有的未读消息数量

1.开发持久层

这里面会显示评论、点赞、关注的通知
根据需求,需要新增加三个查询
1)查询未读消息数量
2)查询最新一条消息
3)查询其中某个主题的通知数量
4)查询某个主题包含的通知列表
这里贴上代码,要注意的是:
查询未读消息数量有两种,一种是某个主题的未读消息,另一种是整个通知的未读消息数量
这里就可以加一个<if>动态判断参数的传递,做到复用</if>

    <select id="selectLatestNotice" resultType="Message">
        select <include refid="selectFields"></include>
        from message
        where id in (
        select max(id) from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}
        )
    </select>

    <select id="selectNoticeCount" resultType="int">
        SELECT count(id)
        FROM message
        WHERE status != 2
              AND from_id = 1
              AND to_id = #{userId}
              AND conversation_id = #{topic}
    </select>

    <select id="selectNoticeUnreadCount" resultType="int">
        select count(id)
        from message
        where status = 0
        and from_id = 1
        and to_id = #{userId}
        <if test="topic!=null"> //这里判断有没有传过来确切主题
            and conversation_id=#{topic}
        </if>
    </select>
    <select id="selectNotices" resultType="Message">
        select <include refid="selectFields"></include>
        from message
        where status != 2
        and from_id = 1
        and to_id = #{userId}
        and conversation_id = #{topic}
        order by create_time desc
        LIMIT #{offset},#{limit}
    </select>

2.服务层

这一层主要是调用持久层的方法,执行SQL语句,就不贴了

3.开发控制层

当点击通知按钮,调用messageService中的查询方法,将最新一条的数据查询出来,并把相关信息传递到前台页面

//通知
    @RequestMapping(path = "/notice/list", method = RequestMethod.GET)
    public String getNoticeList(Model model) {
        User user = hostHolder.getUser();

        // 查询评论类通知
        Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);
        if (message != null) {
            Map<String, Object> messageVO = new HashMap<>();
            messageVO.put("message", message);

            String content = HtmlUtils.htmlUnescape(message.getContent());
            Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

            messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
            messageVO.put("entityType", data.get("entityType"));
            messageVO.put("entityId", data.get("entityId"));
            messageVO.put("postId", data.get("postId"));

            int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);
            messageVO.put("count", count);

            int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);
            messageVO.put("unread", unread);
            model.addAttribute("commentNotice", messageVO);
        }

        // 查询未读消息数量
        int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
        model.addAttribute("letterUnreadCount", letterUnreadCount);
        int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
        model.addAttribute("noticeUnreadCount", noticeUnreadCount);

        return "/site/notice";
    }

通知列表页面
将查询到的帖子List中的每一个通知实体放到一个大的集合中去
其中通知实体以及其中包含的内容(一个小通知)放到一个map中
将每一个小通知放到List集合中返回给主页面

@RequestMapping(path = "/notice/detail/{topic}", method = RequestMethod.GET)
    public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {
        User user = hostHolder.getUser();

        page.setLimit(5);
        page.setRows(messageService.findNoticeCount(user.getId(), topic));
        page.setPath("/notice/detail/" + topic);

        List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());
        List<Map<String, Object>> noticeVoList = new ArrayList<>();
        if (noticeList != null) {
            Map<String, Object> map = new HashMap<>();
            for (Message notice : noticeList) {
                //通知
                map.put("notice", notice);
                //内容
                String content = HtmlUtils.htmlUnescape(notice.getContent());
                Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
                map.put("user", userService.findUserById((Integer) data.get("userId")));
                map.put("entityType", data.get("entityType"));
                map.put("entityId", data.get("entityId"));
                map.put("postId", data.get("postId"));
                //通知作者
                map.put("fromUser", userService.findUserById(notice.getFromId()));

                noticeVoList.add(map);
            }

        }
        model.addAttribute("notices", noticeVoList);
        List<Integer> ids = getLetterIds(noticeList);
        if (!ids.isEmpty()) {
            messageService.readMessage(ids);
        }

        return "/site/notice-detail";
    }
全部评论

相关推荐

object3:开始给部分🌸孝子上人生第一课了
点赞 评论 收藏
分享
专心打鱼:互联网搬运工,贴子都要偷
点赞 评论 收藏
分享
评论
1
收藏
分享
牛客网
牛客企业服务