1、阻塞队列
测试代码:
public class BlockingQueueTests {public static void main(String[] args) {BlockingQueue queue = new ArrayBlockingQueue(10);new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();}}class Producer implements Runnable {private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {for (int i = 0; i < 100; i++) {Thread.sleep(20);queue.put(i);System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());}} catch (Exception e) {e.printStackTrace();}}}class Consumer implements Runnable {private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {Thread.sleep(new Random().nextInt(1000));queue.take();System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());}} catch (Exception e) {e.printStackTrace();}}}
2、Kafka
1、下载官网:kafka.apache.org
2、解压缩后到指定目录下d:/AAcommunity/kafka
修改配置config中zookeeper.properties的dataDir改变存放位置d:/AAjava/work/data/zookeeper
修改配置config中server.properties的log.dirs改变存放位置d:/AAjava/work/data/kafka-logs
测试:
1、新窗口启动启动zookeeper
进入到kafka安装目录下,执行bin\windows\zookeeper-server-start.bat config\zookeeper.properties指定配置文件
2、新窗口启动启动kafka
进入到kafka安装目录下,执行bin\windows\kafka-server-start.bat config\server.properties指定配置文件
3、新窗口启动创建主题test
进入到windows目录下 ==>kafka-topics.bat —create —bootstrap-server localhost:9092 —replication-factor 1 —partitions 1 —topic test 没有提示即创建成功
4、查看是否创建成功
kafka-topics.bat —list —bootstrap-server localhost:9092
5、生产者发送消息
kafka-console-producer.bat —broker-list localhost:9092 —topic test
回车发送消息内容
hello
world
6、新窗口启动消费者读取消息
进入到windows目录下
kafka-console-consumer.bat —bootstrap-server localhost:9092 —topic test —from-beginning
3、spring整合kafka
1、导入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2、配置kafka
#配置KafkaPropertiesspring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=3000
3、测试代码
Producer
@Componentpublic class Producer{@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String title,String content){kafkaTemplate.send(title,content);}}
Consumer
@Componentpublic class Consumer{@KafkaListener(topics = {"test"})public void readMessage(ConsumerRecord record){System.out.println(record.value());}}
KafkaTset
public class KafkaTest {@Resourceprivate Producer producer;@Testpublic void testKafka(){producer.sendMessage("test","kafka");producer.sendMessage("test","test");try {Thread.sleep(10*900);} catch (InterruptedException e) {e.printStackTrace();}}}
4、发送系统通知
1、Event
public class Event {private String topic;//主题private int userId;//当前用户idprivate int entityType;//实体类型private int entityId;//实体idprivate int entityUserId;//实体用户idprivate Map<String, Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key, Object value) {this.data.put(key, value);return this;}}
2、CommunityConstant添加四个常量
/*** 主题:评论*/String TOPIC_COMMENT = "comment";/*** 主题:点赞*/String TOPIC_LIKE = "like";/*** 主题:关注*/String TOPIC_FOLLOW = "follow";/*** 系统用户id*/int SYSTEM_USER_ID = 1;
3、EventProducer
@Componentpublic class EventProducer {@Autowiredprivate KafkaTemplate template;//将整个message传入,后面页面再按需选择需要的信息public void fireEvent(Event event){template.send(event.getTopic(), JSONObject.toJSONString(event));}}
4、EventConsumer
@Componentpublic class EventConsumer implements CommunityConstant {private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})public void handleCommentMessagee(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误");return;}//发送站内消息Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());//存储主题message.setCreateTime(new Date());Map<String, Object> content = new HashMap<>();content.put("userId", event.getUserId());content.put("entityType", event.getEntityType());content.put("entityId", event.getEntityId());//将map的数据存进contentif (!event.getData().isEmpty()) {for (Map.Entry<String, Object> entry : event.getData().entrySet()) {content.put(entry.getKey(), entry.getValue());}}//最后将content存进message的Contentmessage.setContent(JSONObject.toJSONString(content));messageService.addMessage(message);}}
5、CommentMapper添加根据id查询帖子
Comment findCommentById(int id);
6、comment-mapper.xml
<select id="findCommentById" resultType="Comment">select <include refid="selectFields"></include>from commentwhere id=#{id}</select>
7、CommentService
public Comment findCommentById(int entityId) {return commentMapper.findCommentById(entityId);}
8、CommentController
// 触发评论事件Event event = new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId", discussPostId);if (comment.getEntityType() == ENTITY_TYPE_POST) {DiscussPost target = discussPostService.selectDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());} else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {Comment target = commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);
9、重构LikeController:添加参数postId
// 触发点赞事件if (likeStatus == 1) {Event event = new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId", postId);eventProducer.fireEvent(event);}
10、FollowController
// 触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);
11、页面(略)
discuss-detail.html
discuss.js
12、空指针异常
ServiceLogAspect.java
if(attributes==null){return;}
5、显示系统通知
• 通知列表
- 显示评论、点赞、关注三种类型的通知
• 通知详情 - 分页显示某一类主题所包含的通知
• 未读消息 - 在页面头部显示所有的未读消息数量
1、MessageMapper
/*** 查询某个主题下最新的通知* @param userId* @param topic* @return*/Message selectLatestNotice(int userId, String topic);/*** 查询某个主题所包含的通知数量* @param userId* @param topic* @return*/int selectNoticeCount(int userId, String topic);/*** 查询未读的通知的数量* @param userId* @param topic* @return*/int selectNoticeUnreadCount(int userId, String topic);/*** 查询某个主题所包含的通知列表* @param userId* @param topic* @param offset* @param limit* @return*/List<Message> selectNotices(int userId, String topic, int offset, int limit);
2、message-mapper.xml
<select id="selectLatestNotice" resultType="Message">select <include refid="selectFields"></include>from messagewhere id in (select max(id) from messagewhere status != 2and from_id = 1and to_id = #{userId}and conversation_id = #{topic})</select><select id="selectNoticeCount" resultType="int">select count(id) from messagewhere status != 2and from_id = 1and to_id = #{userId}and conversation_id = #{topic}</select><select id="selectNoticeUnreadCount" resultType="int">select count(id) from messagewhere status = 0and from_id = 1and to_id = #{userId}<if test="topic!=null">and conversation_id = #{topic}</if></select><select id="selectNotices" resultType="Message">select <include refid="selectFields"></include>from messagewhere status != 2and from_id = 1and to_id = #{userId}and conversation_id = #{topic}order by create_time desclimit #{offset}, #{limit}</select>
3、MessageService
public Message findLatestNotice(int userId, String topic) {return messageMapper.selectLatestNotice(userId, topic);}public int findNoticeCount(int userId, String topic) {return messageMapper.selectNoticeCount(userId, topic);}public int findNoticeUnreadCount(int userId, String topic) {return messageMapper.selectNoticeUnreadCount(userId, topic);}public List<Message> findNotices(int userId, String topic, int offset, int limit) {return messageMapper.selectNotices(userId, topic, offset, limit);}
4、MessageController
注意再私信接口中也要添加上查询未读通知数量:
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
model.addAttribute(“noticeUnreadCount”, noticeUnreadCount);
/*** 通知接口* @param model* @return*/@GetMapping("/notice/list")public String getNoticeList(Model model) {User user = hostHolder.getUser();// 查询评论类通知Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);Map<String, Object> messageVO = new HashMap<>();if (message != null) {messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.selectUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));messageVO.put("postId", data.get("postId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);messageVO.put("unread", unread);}model.addAttribute("commentNotice", messageVO);// 查询点赞类通知message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);messageVO = new HashMap<>();if (message != null) {messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.selectUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));messageVO.put("postId", data.get("postId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);messageVO.put("unread", unread);}model.addAttribute("likeNotice", messageVO);// 查询关注类通知message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);messageVO = new HashMap<>();if (message != null) {messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.selectUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);messageVO.put("unread", unread);}model.addAttribute("followNotice", messageVO);// 查询未读消息数量int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);model.addAttribute("letterUnreadCount", letterUnreadCount);int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);model.addAttribute("noticeUnreadCount", noticeUnreadCount);return "/site/notice";}/*** 通知详情接口* @param topic* @param page* @param model* @return*/@GetMapping("/notice/detail/{topic}")public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {User user = hostHolder.getUser();page.setLimit(5);page.setPath("/notice/detail/" + topic);page.setRows(messageService.findNoticeCount(user.getId(), topic));List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());List<Map<String, Object>> noticeVoList = new ArrayList<>();if (noticeList != null) {for (Message notice : noticeList) {Map<String, Object> map = new HashMap<>();// 通知map.put("notice", notice);// 内容String content = HtmlUtils.htmlUnescape(notice.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);map.put("user", userService.selectUserById((Integer) data.get("userId")));map.put("entityType", data.get("entityType"));map.put("entityId", data.get("entityId"));map.put("postId", data.get("postId"));// 通知作者map.put("fromUser", userService.selectUserById(notice.getFromId()));noticeVoList.add(map);}}model.addAttribute("notices", noticeVoList);// 设置已读List<Integer> ids = getLetterIds(noticeList);if (!ids.isEmpty()) {messageService.readMessage(ids);}return "/site/notice-detail";}
5、页面(略)
letter.html 修改通知的路径
notice.html
notice-detail.html
index.html 修改数量
6、添加拦截器
MessageInterceptor
@Componentpublic class MessageInterceptor implements HandlerInterceptor {@Autowiredprivate HostHolder hostHolder;@Autowiredprivate MessageService messageService;@Overridepublic void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {User user = hostHolder.getUser();if (user != null && modelAndView != null) {int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);}}}
WebMvcConfig
@Configurationpublic class WebMvcConfig implements WebMvcConfigurer {@Autowiredprivate LoginTicketInterceptor loginTicketInterceptor;@Autowiredprivate LoginRequiredInterceptor loginRequiredInterceptor;@Autowiredprivate MessageInterceptor messageInterceptor;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(loginTicketInterceptor).excludePathPatterns("/**/*.css","/**/*.js","/**/*.png","/**/*.jpg","/**/*.jpeg");registry.addInterceptor(loginRequiredInterceptor).excludePathPatterns("/**/*.css","/**/*.js","/**/*.png","/**/*.jpg","/**/*.jpeg");registry.addInterceptor(messageInterceptor).excludePathPatterns("/**/*.css","/**/*.js","/**/*.png","/**/*.jpg","/**/*.jpeg");}}
