1、阻塞队列image.png

测试代码:

  1. public class BlockingQueueTests {
  2. public static void main(String[] args) {
  3. BlockingQueue queue = new ArrayBlockingQueue(10);
  4. new Thread(new Producer(queue)).start();
  5. new Thread(new Consumer(queue)).start();
  6. new Thread(new Consumer(queue)).start();
  7. new Thread(new Consumer(queue)).start();
  8. }
  9. }
  10. class Producer implements Runnable {
  11. private BlockingQueue<Integer> queue;
  12. public Producer(BlockingQueue<Integer> queue) {
  13. this.queue = queue;
  14. }
  15. @Override
  16. public void run() {
  17. try {
  18. for (int i = 0; i < 100; i++) {
  19. Thread.sleep(20);
  20. queue.put(i);
  21. System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
  22. }
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. class Consumer implements Runnable {
  29. private BlockingQueue<Integer> queue;
  30. public Consumer(BlockingQueue<Integer> queue) {
  31. this.queue = queue;
  32. }
  33. @Override
  34. public void run() {
  35. try {
  36. while (true) {
  37. Thread.sleep(new Random().nextInt(1000));
  38. queue.take();
  39. System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
  40. }
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }

2、Kafka

1、下载官网:kafka.apache.org
2、解压缩后到指定目录下d:/AAcommunity/kafka
修改配置config中zookeeper.properties的dataDir改变存放位置d:/AAjava/work/data/zookeeper
修改配置config中server.properties的log.dirs改变存放位置d:/AAjava/work/data/kafka-logs
image.png
测试:
1、新窗口启动启动zookeeper
进入到kafka安装目录下,执行bin\windows\zookeeper-server-start.bat config\zookeeper.properties指定配置文件
2、新窗口启动启动kafka
进入到kafka安装目录下,执行bin\windows\kafka-server-start.bat config\server.properties指定配置文件
3、新窗口启动创建主题test
进入到windows目录下 ==>kafka-topics.bat —create —bootstrap-server localhost:9092 —replication-factor 1 —partitions 1 —topic test 没有提示即创建成功
4、查看是否创建成功
kafka-topics.bat —list —bootstrap-server localhost:9092
5、生产者发送消息
kafka-console-producer.bat —broker-list localhost:9092 —topic test
回车发送消息内容
hello
world
6、新窗口启动消费者读取消息
进入到windows目录下
kafka-console-consumer.bat —bootstrap-server localhost:9092 —topic test —from-beginning

3、spring整合kafka

1、导入依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2、配置kafka

  1. #配置KafkaProperties
  2. spring.kafka.bootstrap-servers=localhost:9092
  3. spring.kafka.consumer.group-id=test-consumer-group
  4. spring.kafka.consumer.enable-auto-commit=true
  5. spring.kafka.consumer.auto-commit-interval=3000

3、测试代码
Producer

  1. @Component
  2. public class Producer{
  3. @Autowired
  4. private KafkaTemplate kafkaTemplate;
  5. public void sendMessage(String title,String content){
  6. kafkaTemplate.send(title,content);
  7. }
  8. }

Consumer

  1. @Component
  2. public class Consumer{
  3. @KafkaListener(topics = {"test"})
  4. public void readMessage(ConsumerRecord record){
  5. System.out.println(record.value());
  6. }
  7. }

KafkaTset

  1. public class KafkaTest {
  2. @Resource
  3. private Producer producer;
  4. @Test
  5. public void testKafka(){
  6. producer.sendMessage("test","kafka");
  7. producer.sendMessage("test","test");
  8. try {
  9. Thread.sleep(10*900);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }

4、发送系统通知

1、Event

  1. public class Event {
  2. private String topic;//主题
  3. private int userId;//当前用户id
  4. private int entityType;//实体类型
  5. private int entityId;//实体id
  6. private int entityUserId;//实体用户id
  7. private Map<String, Object> data = new HashMap<>();
  8. public String getTopic() {
  9. return topic;
  10. }
  11. public Event setTopic(String topic) {
  12. this.topic = topic;
  13. return this;
  14. }
  15. public int getUserId() {
  16. return userId;
  17. }
  18. public Event setUserId(int userId) {
  19. this.userId = userId;
  20. return this;
  21. }
  22. public int getEntityType() {
  23. return entityType;
  24. }
  25. public Event setEntityType(int entityType) {
  26. this.entityType = entityType;
  27. return this;
  28. }
  29. public int getEntityId() {
  30. return entityId;
  31. }
  32. public Event setEntityId(int entityId) {
  33. this.entityId = entityId;
  34. return this;
  35. }
  36. public int getEntityUserId() {
  37. return entityUserId;
  38. }
  39. public Event setEntityUserId(int entityUserId) {
  40. this.entityUserId = entityUserId;
  41. return this;
  42. }
  43. public Map<String, Object> getData() {
  44. return data;
  45. }
  46. public Event setData(String key, Object value) {
  47. this.data.put(key, value);
  48. return this;
  49. }
  50. }

2、CommunityConstant添加四个常量

  1. /**
  2. * 主题:评论
  3. */
  4. String TOPIC_COMMENT = "comment";
  5. /**
  6. * 主题:点赞
  7. */
  8. String TOPIC_LIKE = "like";
  9. /**
  10. * 主题:关注
  11. */
  12. String TOPIC_FOLLOW = "follow";
  13. /**
  14. * 系统用户id
  15. */
  16. int SYSTEM_USER_ID = 1;

3、EventProducer

  1. @Component
  2. public class EventProducer {
  3. @Autowired
  4. private KafkaTemplate template;
  5. //将整个message传入,后面页面再按需选择需要的信息
  6. public void fireEvent(Event event){
  7. template.send(event.getTopic(), JSONObject.toJSONString(event));
  8. }
  9. }

4、EventConsumer

  1. @Component
  2. public class EventConsumer implements CommunityConstant {
  3. private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
  4. @Autowired
  5. private MessageService messageService;
  6. @Autowired
  7. private DiscussPostService discussPostService;
  8. @Autowired
  9. private ElasticsearchService elasticsearchService;
  10. @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
  11. public void handleCommentMessagee(ConsumerRecord record) {
  12. if (record == null || record.value() == null) {
  13. logger.error("消息的内容为空!");
  14. return;
  15. }
  16. Event event = JSONObject.parseObject(record.value().toString(), Event.class);
  17. if (event == null) {
  18. logger.error("消息格式错误");
  19. return;
  20. }
  21. //发送站内消息
  22. Message message = new Message();
  23. message.setFromId(SYSTEM_USER_ID);
  24. message.setToId(event.getEntityUserId());
  25. message.setConversationId(event.getTopic());//存储主题
  26. message.setCreateTime(new Date());
  27. Map<String, Object> content = new HashMap<>();
  28. content.put("userId", event.getUserId());
  29. content.put("entityType", event.getEntityType());
  30. content.put("entityId", event.getEntityId());
  31. //将map的数据存进content
  32. if (!event.getData().isEmpty()) {
  33. for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
  34. content.put(entry.getKey(), entry.getValue());
  35. }
  36. }
  37. //最后将content存进message的Content
  38. message.setContent(JSONObject.toJSONString(content));
  39. messageService.addMessage(message);
  40. }
  41. }

5、CommentMapper添加根据id查询帖子

  1. Comment findCommentById(int id);

6、comment-mapper.xml

  1. <select id="findCommentById" resultType="Comment">
  2. select <include refid="selectFields"></include>
  3. from comment
  4. where id=#{id}
  5. </select>

7、CommentService

  1. public Comment findCommentById(int entityId) {
  2. return commentMapper.findCommentById(entityId);
  3. }

8、CommentController

  1. // 触发评论事件
  2. Event event = new Event()
  3. .setTopic(TOPIC_COMMENT)
  4. .setUserId(hostHolder.getUser().getId())
  5. .setEntityType(comment.getEntityType())
  6. .setEntityId(comment.getEntityId())
  7. .setData("postId", discussPostId);
  8. if (comment.getEntityType() == ENTITY_TYPE_POST) {
  9. DiscussPost target = discussPostService.selectDiscussPostById(comment.getEntityId());
  10. event.setEntityUserId(target.getUserId());
  11. } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
  12. Comment target = commentService.findCommentById(comment.getEntityId());
  13. event.setEntityUserId(target.getUserId());
  14. }
  15. eventProducer.fireEvent(event);

9、重构LikeController:添加参数postId

  1. // 触发点赞事件
  2. if (likeStatus == 1) {
  3. Event event = new Event()
  4. .setTopic(TOPIC_LIKE)
  5. .setUserId(hostHolder.getUser().getId())
  6. .setEntityType(entityType)
  7. .setEntityId(entityId)
  8. .setEntityUserId(entityUserId)
  9. .setData("postId", postId);
  10. eventProducer.fireEvent(event);
  11. }

10、FollowController

  1. // 触发关注事件
  2. Event event = new Event()
  3. .setTopic(TOPIC_FOLLOW)
  4. .setUserId(hostHolder.getUser().getId())
  5. .setEntityType(entityType)
  6. .setEntityId(entityId)
  7. .setEntityUserId(entityId);
  8. eventProducer.fireEvent(event);

11、页面(略)
discuss-detail.html
discuss.js

12、空指针异常
ServiceLogAspect.java

  1. if(attributes==null){
  2. return;
  3. }

5、显示系统通知

• 通知列表

  • 显示评论、点赞、关注三种类型的通知
    • 通知详情
  • 分页显示某一类主题所包含的通知
    • 未读消息
  • 在页面头部显示所有的未读消息数量

1、MessageMapper

  1. /**
  2. * 查询某个主题下最新的通知
  3. * @param userId
  4. * @param topic
  5. * @return
  6. */
  7. Message selectLatestNotice(int userId, String topic);
  8. /**
  9. * 查询某个主题所包含的通知数量
  10. * @param userId
  11. * @param topic
  12. * @return
  13. */
  14. int selectNoticeCount(int userId, String topic);
  15. /**
  16. * 查询未读的通知的数量
  17. * @param userId
  18. * @param topic
  19. * @return
  20. */
  21. int selectNoticeUnreadCount(int userId, String topic);
  22. /**
  23. * 查询某个主题所包含的通知列表
  24. * @param userId
  25. * @param topic
  26. * @param offset
  27. * @param limit
  28. * @return
  29. */
  30. List<Message> selectNotices(int userId, String topic, int offset, int limit);

2、message-mapper.xml

  1. <select id="selectLatestNotice" resultType="Message">
  2. select <include refid="selectFields"></include>
  3. from message
  4. where id in (
  5. select max(id) from message
  6. where status != 2
  7. and from_id = 1
  8. and to_id = #{userId}
  9. and conversation_id = #{topic}
  10. )
  11. </select>
  12. <select id="selectNoticeCount" resultType="int">
  13. select count(id) from message
  14. where status != 2
  15. and from_id = 1
  16. and to_id = #{userId}
  17. and conversation_id = #{topic}
  18. </select>
  19. <select id="selectNoticeUnreadCount" resultType="int">
  20. select count(id) from message
  21. where status = 0
  22. and from_id = 1
  23. and to_id = #{userId}
  24. <if test="topic!=null">
  25. and conversation_id = #{topic}
  26. </if>
  27. </select>
  28. <select id="selectNotices" resultType="Message">
  29. select <include refid="selectFields"></include>
  30. from message
  31. where status != 2
  32. and from_id = 1
  33. and to_id = #{userId}
  34. and conversation_id = #{topic}
  35. order by create_time desc
  36. limit #{offset}, #{limit}
  37. </select>

3、MessageService

  1. public Message findLatestNotice(int userId, String topic) {
  2. return messageMapper.selectLatestNotice(userId, topic);
  3. }
  4. public int findNoticeCount(int userId, String topic) {
  5. return messageMapper.selectNoticeCount(userId, topic);
  6. }
  7. public int findNoticeUnreadCount(int userId, String topic) {
  8. return messageMapper.selectNoticeUnreadCount(userId, topic);
  9. }
  10. public List<Message> findNotices(int userId, String topic, int offset, int limit) {
  11. return messageMapper.selectNotices(userId, topic, offset, limit);
  12. }

4、MessageController
注意再私信接口中也要添加上查询未读通知数量:
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
model.addAttribute(“noticeUnreadCount”, noticeUnreadCount);

  1. /**
  2. * 通知接口
  3. * @param model
  4. * @return
  5. */
  6. @GetMapping("/notice/list")
  7. public String getNoticeList(Model model) {
  8. User user = hostHolder.getUser();
  9. // 查询评论类通知
  10. Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);
  11. Map<String, Object> messageVO = new HashMap<>();
  12. if (message != null) {
  13. messageVO.put("message", message);
  14. String content = HtmlUtils.htmlUnescape(message.getContent());
  15. Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
  16. messageVO.put("user", userService.selectUserById((Integer) data.get("userId")));
  17. messageVO.put("entityType", data.get("entityType"));
  18. messageVO.put("entityId", data.get("entityId"));
  19. messageVO.put("postId", data.get("postId"));
  20. int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);
  21. messageVO.put("count", count);
  22. int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);
  23. messageVO.put("unread", unread);
  24. }
  25. model.addAttribute("commentNotice", messageVO);
  26. // 查询点赞类通知
  27. message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);
  28. messageVO = new HashMap<>();
  29. if (message != null) {
  30. messageVO.put("message", message);
  31. String content = HtmlUtils.htmlUnescape(message.getContent());
  32. Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
  33. messageVO.put("user", userService.selectUserById((Integer) data.get("userId")));
  34. messageVO.put("entityType", data.get("entityType"));
  35. messageVO.put("entityId", data.get("entityId"));
  36. messageVO.put("postId", data.get("postId"));
  37. int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);
  38. messageVO.put("count", count);
  39. int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);
  40. messageVO.put("unread", unread);
  41. }
  42. model.addAttribute("likeNotice", messageVO);
  43. // 查询关注类通知
  44. message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);
  45. messageVO = new HashMap<>();
  46. if (message != null) {
  47. messageVO.put("message", message);
  48. String content = HtmlUtils.htmlUnescape(message.getContent());
  49. Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
  50. messageVO.put("user", userService.selectUserById((Integer) data.get("userId")));
  51. messageVO.put("entityType", data.get("entityType"));
  52. messageVO.put("entityId", data.get("entityId"));
  53. int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);
  54. messageVO.put("count", count);
  55. int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);
  56. messageVO.put("unread", unread);
  57. }
  58. model.addAttribute("followNotice", messageVO);
  59. // 查询未读消息数量
  60. int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
  61. model.addAttribute("letterUnreadCount", letterUnreadCount);
  62. int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
  63. model.addAttribute("noticeUnreadCount", noticeUnreadCount);
  64. return "/site/notice";
  65. }
  66. /**
  67. * 通知详情接口
  68. * @param topic
  69. * @param page
  70. * @param model
  71. * @return
  72. */
  73. @GetMapping("/notice/detail/{topic}")
  74. public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {
  75. User user = hostHolder.getUser();
  76. page.setLimit(5);
  77. page.setPath("/notice/detail/" + topic);
  78. page.setRows(messageService.findNoticeCount(user.getId(), topic));
  79. List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());
  80. List<Map<String, Object>> noticeVoList = new ArrayList<>();
  81. if (noticeList != null) {
  82. for (Message notice : noticeList) {
  83. Map<String, Object> map = new HashMap<>();
  84. // 通知
  85. map.put("notice", notice);
  86. // 内容
  87. String content = HtmlUtils.htmlUnescape(notice.getContent());
  88. Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
  89. map.put("user", userService.selectUserById((Integer) data.get("userId")));
  90. map.put("entityType", data.get("entityType"));
  91. map.put("entityId", data.get("entityId"));
  92. map.put("postId", data.get("postId"));
  93. // 通知作者
  94. map.put("fromUser", userService.selectUserById(notice.getFromId()));
  95. noticeVoList.add(map);
  96. }
  97. }
  98. model.addAttribute("notices", noticeVoList);
  99. // 设置已读
  100. List<Integer> ids = getLetterIds(noticeList);
  101. if (!ids.isEmpty()) {
  102. messageService.readMessage(ids);
  103. }
  104. return "/site/notice-detail";
  105. }

5、页面(略)
letter.html 修改通知的路径
notice.html
notice-detail.html
index.html 修改数量

6、添加拦截器
MessageInterceptor

  1. @Component
  2. public class MessageInterceptor implements HandlerInterceptor {
  3. @Autowired
  4. private HostHolder hostHolder;
  5. @Autowired
  6. private MessageService messageService;
  7. @Override
  8. public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
  9. User user = hostHolder.getUser();
  10. if (user != null && modelAndView != null) {
  11. int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
  12. int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
  13. modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);
  14. }
  15. }
  16. }

WebMvcConfig

  1. @Configuration
  2. public class WebMvcConfig implements WebMvcConfigurer {
  3. @Autowired
  4. private LoginTicketInterceptor loginTicketInterceptor;
  5. @Autowired
  6. private LoginRequiredInterceptor loginRequiredInterceptor;
  7. @Autowired
  8. private MessageInterceptor messageInterceptor;
  9. @Override
  10. public void addInterceptors(InterceptorRegistry registry) {
  11. registry.addInterceptor(loginTicketInterceptor)
  12. .excludePathPatterns("/**/*.css","/**/*.js","/**/*.png","/**/*.jpg","/**/*.jpeg");
  13. registry.addInterceptor(loginRequiredInterceptor)
  14. .excludePathPatterns("/**/*.css","/**/*.js","/**/*.png","/**/*.jpg","/**/*.jpeg");
  15. registry.addInterceptor(messageInterceptor)
  16. .excludePathPatterns("/**/*.css","/**/*.js","/**/*.png","/**/*.jpg","/**/*.jpeg");
  17. }
  18. }