问答社区-系统通知(kafka)
1.发送系统通知
需求
触发事件:
- 评论后,发布通知
- 点赞后,发布通知
- 关注后,发布通知
处理事件:
- 封装事件对象
- 开发事件的生产者
- 开发事件的消费者
1、新建事件对象
set的时候返回当前对象,这样就可以一直set了
/** * 事件 */ public class Event { //事件主题 private String topic; //事件触发的人 private int userId; //目标实体 private int entityType; private int entityId; private int entityUserId; private Map<String, Object> data = new HashMap<>(); public String getTopic() { return topic; } public Event setTopic(String topic) { this.topic = topic; return this; } public int getUserId() { return userId; } public Event setUserId(int userId) { this.userId = userId; return this; } public int getEntityType() { return entityType; } public Event setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityId() { return entityId; } public Event setEntityId(int entityId) { this.entityId = entityId; return this; } public int getEntityUserId() { return entityUserId; } public Event setEntityUserId(int entityUserId) { this.entityUserId = entityUserId; return this; } public Map<String, Object> getData() { return data; } public Event setData(String key,Object value) { this.data.put(key,value); return this; } }
2、新建事件生产者
使用kafkatemplate,将对应的事件发布到指定的topic
@Component public class EventProducer { @Autowired private KafkaTemplate kafkaTemplate; //处理事件 public void fireEvent(Event event){ //将事件发布到指定的主题 kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event)); } }
3、新建事件消费者
每当事件发生,将对应的数据新增到message表中
@Component public class EventConsumer implements CommunityConstant { private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); @Autowired private MessageService messageService; @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW}) public void handleCommentMessage(ConsumerRecord record) { if (record == null || record.value() == null) { logger.error("消息的内容为空"); return; } Event event = JSONObject.parseObject(record.value().toString(), Event.class); if (event == null) { logger.error("消息格式错误"); return; } //发送站内通知 Message message = new Message(); message.setFromId(SYSTEM_USER_ID); message.setToId(event.getEntityUserId()); message.setConversationId(event.getTopic()); message.setCreateTime(new Date()); Map<String, Object> content = new HashMap<>(); content.put("userId", event.getUserId()); content.put("entityType", event.getEntityType()); content.put("entityId", event.getEntityId()); if (!event.getData().isEmpty()) { for (Map.Entry<String, Object> entry : event.getData().entrySet()) { content.put(entry.getKey(), entry.getValue()); } } message.setContent(JSONObject.toJSONString(content)); messageService.addMessage(message); } }
4、找到评论、点赞、关注的controller,新增事件触发
CommentController
@RequestMapping(value = "/add/{discussPostId}", method = RequestMethod.POST) public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) { comment.setUserId(hostHolder.getUser().getId()); comment.setStatus(0); comment.setCreateTime(new Date()); commentService.addComment(comment); //触发评论事件 Event event = new Event().setTopic(TOPIC_COMMENT). setUserId(hostHolder.getUser().getId()). setEntityType(comment.getEntityType()). setEntityId(comment.getEntityId()). setData("postId", discussPostId); if (comment.getEntityType() == ENTITY_TYPE_POST) { int userId = discussPostService.findDiscussPostById(comment.getEntityId()).getUserId(); event.setEntityUserId(userId); }else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){ Comment target = commentService.findCommentById(comment.getEntityId()); event.setEntityUserId(target.getUserId()); } eventProducer.fireEvent(event); return "redirect:/discuss/detail/" + discussPostId; }
likeController
@RequestMapping(value = "/like", method = RequestMethod.POST) @ResponseBody public String like(int entityType, int entityId,int entityUserId,int postId) { User user = hostHolder.getUser(); //点赞 likeService.like(user.getId(), entityType, entityId,entityUserId); //数量 long entityLikeCount = likeService.findEntityLikeCount(entityType, entityId); //状态 int entityLikeStatus = user == null ? 0 : likeService.findEntityLikeStatus(user.getId(), entityType, entityId); Map<String, Object> map = new HashMap<>(); map.put("likeCount", entityLikeCount); map.put("likeStatus", entityLikeStatus); if(entityLikeStatus == 1){ Event event = new Event().setTopic(TOPIC_LIKE) .setUserId(hostHolder.getUser().getId()) .setEntityType(entityType) .setEntityId(entityId) .setEntityUserId(entityUserId) .setData("postId",postId); eventProducer.fireEvent(event); } return CommunityUtil.getJSONString(0, null, map); }
followController
@LoginRequired @RequestMapping(value = "/follow", method = RequestMethod.POST) @ResponseBody public String follow(int entityType, int entityId) { User user = hostHolder.getUser(); followService.follow(user.getId(), entityType, entityId); //触发事件 Event event = new Event().setTopic(TOPIC_FOLLOW) .setUserId(user.getId()) .setEntityType(entityType) .setEntityId(entityId) .setEntityUserId(entityId); eventProducer.fireEvent(event); return CommunityUtil.getJSONString(0, "已关注!"); }
2.显示系统通知
需求:
通知列表
- 显示评论、点赞、关注三种类型的通知
通知详情 - 分页显示某一主题所包含的通知
未读消息 - 在页面头部显示所有的未读消息数量
1.开发持久层
这里面会显示评论、点赞、关注的通知
根据需求,需要新增加三个查询
1)查询未读消息数量
2)查询最新一条消息
3)查询其中某个主题的通知数量
4)查询某个主题包含的通知列表
这里贴上代码,要注意的是:
查询未读消息数量有两种,一种是某个主题的未读消息,另一种是整个通知的未读消息数量
这里就可以加一个<if>动态判断参数的传递,做到复用</if>
<select id="selectLatestNotice" resultType="Message"> select <include refid="selectFields"></include> from message where id in ( select max(id) from message where status != 2 and from_id = 1 and to_id = #{userId} and conversation_id = #{topic} ) </select> <select id="selectNoticeCount" resultType="int"> SELECT count(id) FROM message WHERE status != 2 AND from_id = 1 AND to_id = #{userId} AND conversation_id = #{topic} </select> <select id="selectNoticeUnreadCount" resultType="int"> select count(id) from message where status = 0 and from_id = 1 and to_id = #{userId} <if test="topic!=null"> //这里判断有没有传过来确切主题 and conversation_id=#{topic} </if> </select> <select id="selectNotices" resultType="Message"> select <include refid="selectFields"></include> from message where status != 2 and from_id = 1 and to_id = #{userId} and conversation_id = #{topic} order by create_time desc LIMIT #{offset},#{limit} </select>
2.服务层
这一层主要是调用持久层的方法,执行SQL语句,就不贴了
3.开发控制层
当点击通知按钮,调用messageService中的查询方法,将最新一条的数据查询出来,并把相关信息传递到前台页面
//通知 @RequestMapping(path = "/notice/list", method = RequestMethod.GET) public String getNoticeList(Model model) { User user = hostHolder.getUser(); // 查询评论类通知 Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT); if (message != null) { Map<String, Object> messageVO = new HashMap<>(); messageVO.put("message", message); String content = HtmlUtils.htmlUnescape(message.getContent()); Map<String, Object> data = JSONObject.parseObject(content, HashMap.class); messageVO.put("user", userService.findUserById((Integer) data.get("userId"))); messageVO.put("entityType", data.get("entityType")); messageVO.put("entityId", data.get("entityId")); messageVO.put("postId", data.get("postId")); int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT); messageVO.put("count", count); int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT); messageVO.put("unread", unread); model.addAttribute("commentNotice", messageVO); } // 查询未读消息数量 int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null); model.addAttribute("letterUnreadCount", letterUnreadCount); int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null); model.addAttribute("noticeUnreadCount", noticeUnreadCount); return "/site/notice"; }
通知列表页面
将查询到的帖子List中的每一个通知实体放到一个大的集合中去
其中通知实体以及其中包含的内容(一个小通知)放到一个map中
将每一个小通知放到List集合中返回给主页面
@RequestMapping(path = "/notice/detail/{topic}", method = RequestMethod.GET) public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) { User user = hostHolder.getUser(); page.setLimit(5); page.setRows(messageService.findNoticeCount(user.getId(), topic)); page.setPath("/notice/detail/" + topic); List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit()); List<Map<String, Object>> noticeVoList = new ArrayList<>(); if (noticeList != null) { Map<String, Object> map = new HashMap<>(); for (Message notice : noticeList) { //通知 map.put("notice", notice); //内容 String content = HtmlUtils.htmlUnescape(notice.getContent()); Map<String, Object> data = JSONObject.parseObject(content, HashMap.class); map.put("user", userService.findUserById((Integer) data.get("userId"))); map.put("entityType", data.get("entityType")); map.put("entityId", data.get("entityId")); map.put("postId", data.get("postId")); //通知作者 map.put("fromUser", userService.findUserById(notice.getFromId())); noticeVoList.add(map); } } model.addAttribute("notices", noticeVoList); List<Integer> ids = getLetterIds(noticeList); if (!ids.isEmpty()) { messageService.readMessage(ids); } return "/site/notice-detail"; }