课程说明

  • 圈子推荐功能说明
  • 圈子推荐功能流程
  • 圈子推荐功能的实现
  • 小视频推荐功能的实现

1、圈子推荐

1.1、功能说明

在圈子功能中,针对于用户发布的动态信息,系统可以根据用户的发布、浏览、点赞等操作,对动态信息做计算,然后对每个用户进行不同的推荐。

1.2、流程说明

day07-圈子推荐功能实现 - 图2

流程说明:

  • 用户对圈子的动态操作,如:发布、浏览、点赞、喜欢等,就会给RocketMQ进行发送消息;
  • 推荐系统接收消息,并且处理消息数据,处理之后将结果数据写入到MongoDB中;
  • Spark系统拉取数据,然后进行推荐计算;
  • 计算之后的结果数据写入到Redis中,为每个用户都进行个性化推荐;
  • 如果有用户没有数据的,查询MongoDB中的默认数据;

1.3、动态增加自增id

由于我们使用的推荐模型中,动态id需要是Long类型的,而我们之前使用的ObjectId类型的,所以需要增加Long类型的id。

1.3.1、修改Publish对象

  1. package com.tanhua.dubbo.server.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.bson.types.ObjectId;
  6. import org.springframework.data.mongodb.core.mapping.Document;
  7. import java.util.Date;
  8. import java.util.List;
  9. /**
  10. * 发布表,动态内容
  11. */
  12. @Data
  13. @NoArgsConstructor
  14. @AllArgsConstructor
  15. @Document(collection = "quanzi_publish")
  16. public class Publish implements java.io.Serializable {
  17. private static final long serialVersionUID = 8732308321082804771L;
  18. private ObjectId id; //主键id
  19. private Long pid; //Long类型的id,用于推荐引擎使用
  20. private Long userId;
  21. private String text; //文字
  22. private List<String> medias; //媒体数据,图片或小视频 url
  23. private Integer seeType; // 谁可以看,1-公开,2-私密,3-部分可见,4-不给谁看
  24. private List<Long> seeList; //部分可见的列表
  25. private List<Long> notSeeList; //不给谁看的列表
  26. private String longitude; //经度
  27. private String latitude; //纬度
  28. private String locationName; //位置名称
  29. private Long created; //发布时间
  30. }

1.3.2、修改发布逻辑

  1. @Override
  2. public String savePublish(Publish publish) {
  3. // 校验
  4. if (publish.getUserId() == null) {
  5. return null;
  6. }
  7. try {
  8. publish.setCreated(System.currentTimeMillis()); //设置创建时间
  9. publish.setId(ObjectId.get()); //设置id
  10. publish.setPid(this.idService.createId("publish", publish.getId().toHexString()));
  11. this.mongoTemplate.save(publish); //保存发布
  12. Album album = new Album(); // 构建相册对象
  13. album.setPublishId(publish.getId());
  14. album.setCreated(System.currentTimeMillis());
  15. album.setId(ObjectId.get());
  16. this.mongoTemplate.save(album, "quanzi_album_" + publish.getUserId());
  17. //写入好友的时间线中
  18. Criteria criteria = Criteria.where("userId").is(publish.getUserId());
  19. List<Users> users = this.mongoTemplate.find(Query.query(criteria), Users.class);
  20. for (Users user : users) {
  21. TimeLine timeLine = new TimeLine();
  22. timeLine.setId(ObjectId.get());
  23. timeLine.setPublishId(publish.getId());
  24. timeLine.setUserId(user.getUserId());
  25. timeLine.setDate(System.currentTimeMillis());
  26. this.mongoTemplate.save(timeLine, "quanzi_time_line_" + user.getFriendId());
  27. }
  28. return publish.getId().toHexString();
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. //TODO 出错的事务回滚
  32. }
  33. return null;
  34. }
  1. package com.tanhua.dubbo.server.service;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.data.redis.core.RedisTemplate;
  5. import org.springframework.stereotype.Service;
  6. //生成自增长的id,原理:使用redis的自增长值
  7. @Service
  8. public class IdService {
  9. @Autowired
  10. private RedisTemplate<String, String> redisTemplate;
  11. public Long createId(String type, String strId) {
  12. type = StringUtils.upperCase(type);
  13. String idHashKey = "TANHUA_ID_HASH_" + type;
  14. if (this.redisTemplate.opsForHash().hasKey(idHashKey, strId)) {
  15. return Long.valueOf(this.redisTemplate.opsForHash().get(idHashKey, strId).toString());
  16. }
  17. String idKey = "TANHUA_ID_" + type;
  18. Long id = this.redisTemplate.opsForValue().increment(idKey);
  19. this.redisTemplate.opsForHash().put(idHashKey, strId, id.toString());
  20. return id;
  21. }
  22. }

itcast-tanhua-dubbo-service需要增加Redis依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>

1.4、动态计分规则

  • 浏览 +1
  • 点赞 +5
  • 喜欢 +8
  • 评论 + 10
  • 文字长度:50以内1分,50~100之间2分,100以上3分
  • 图片个数:每个图片一分

1.5、发送消息

1.5.1、QuanziMQService

itcast-tanhua-server增加依赖:

  1. <!--RocketMQ相关-->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-spring-boot-starter</artifactId>
  5. <version>2.0.3</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.rocketmq</groupId>
  9. <artifactId>rocketmq-client</artifactId>
  10. <version>4.6.0</version>
  11. </dependency>

配置文件:

  1. # RocketMQ相关配置
  2. rocketmq.name-server=172.16.55.155:9876
  3. rocketmq.producer.group=tanhua
  1. package com.tanhua.server.service;
  2. import com.alibaba.dubbo.config.annotation.Reference;
  3. import com.tanhua.dubbo.server.api.QuanZiApi;
  4. import com.tanhua.dubbo.server.pojo.Publish;
  5. import com.tanhua.server.pojo.User;
  6. import com.tanhua.server.utils.UserThreadLocal;
  7. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Service;
  12. import java.util.Date;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. @Service
  16. public class QuanziMQService {
  17. private static final Logger LOGGER = LoggerFactory.getLogger(QuanziMQService.class);
  18. @Autowired
  19. private RocketMQTemplate rocketMQTemplate;
  20. @Reference(version = "1.0.0")
  21. private QuanZiApi quanZiApi;
  22. /**
  23. * 发布动态消息
  24. *
  25. * @param publishId
  26. * @return
  27. */
  28. public Boolean publishMsg(String publishId) {
  29. return this.sendMsg(publishId, 1);
  30. }
  31. /**
  32. * 浏览动态消息
  33. *
  34. * @param publishId
  35. * @return
  36. */
  37. public Boolean queryPublishMsg(String publishId) {
  38. return this.sendMsg(publishId, 2);
  39. }
  40. /**
  41. * 点赞动态消息
  42. *
  43. * @param publishId
  44. * @return
  45. */
  46. public Boolean likePublishMsg(String publishId) {
  47. return this.sendMsg(publishId, 3);
  48. }
  49. /**
  50. * 取消点赞动态消息
  51. *
  52. * @param publishId
  53. * @return
  54. */
  55. public Boolean disLikePublishMsg(String publishId) {
  56. return this.sendMsg(publishId, 6);
  57. }
  58. /**
  59. * 喜欢动态消息
  60. *
  61. * @param publishId
  62. * @return
  63. */
  64. public Boolean lovePublishMsg(String publishId) {
  65. return this.sendMsg(publishId, 4);
  66. }
  67. /**
  68. * 取消喜欢动态消息
  69. *
  70. * @param publishId
  71. * @return
  72. */
  73. public Boolean disLovePublishMsg(String publishId) {
  74. return this.sendMsg(publishId, 7);
  75. }
  76. /**
  77. * 评论动态消息
  78. *
  79. * @param publishId
  80. * @return
  81. */
  82. public Boolean commentPublishMsg(String publishId) {
  83. return this.sendMsg(publishId, 5);
  84. }
  85. /**
  86. * 发送圈子操作相关的消息
  87. *
  88. * @param publishId
  89. * @param type 1-发动态,2-浏览动态, 3-点赞, 4-喜欢, 5-评论,6-取消点赞,7-取消喜欢
  90. * @return
  91. */
  92. private Boolean sendMsg(String publishId, Integer type) {
  93. try {
  94. User user = UserThreadLocal.get();
  95. Publish publish = this.quanZiApi.queryPublishById(publishId);
  96. //构建消息
  97. Map<String, Object> msg = new HashMap<>();
  98. msg.put("userId", user.getId());
  99. msg.put("date", System.currentTimeMillis());
  100. msg.put("publishId", publishId);
  101. msg.put("pid", publish.getPid());
  102. msg.put("type", type);
  103. this.rocketMQTemplate.convertAndSend("tanhua-quanzi", msg);
  104. } catch (Exception e) {
  105. LOGGER.error("发送消息失败! publishId = " + publishId + ", type = " + type, e);
  106. return false;
  107. }
  108. return true;
  109. }
  110. }

1.5.2、修改MovementsController

  1. package com.tanhua.server.controller;
  2. import com.tanhua.server.service.MovementsService;
  3. import com.tanhua.server.service.QuanziMQService;
  4. import com.tanhua.server.vo.Movements;
  5. import com.tanhua.server.vo.PageResult;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.http.HttpStatus;
  9. import org.springframework.http.ResponseEntity;
  10. import org.springframework.web.bind.annotation.*;
  11. import org.springframework.web.multipart.MultipartFile;
  12. @RestController
  13. @RequestMapping("movements")
  14. public class MovementsController {
  15. @Autowired
  16. private MovementsService movementsService;
  17. @Autowired
  18. private QuanziMQService quanziMQService;
  19. /**
  20. * 发送动态
  21. *
  22. * @param textContent
  23. * @param location
  24. * @param multipartFile
  25. * @return
  26. */
  27. @PostMapping()
  28. public ResponseEntity<Void> savePublish(@RequestParam("textContent") String textContent,
  29. @RequestParam("location") String location,
  30. @RequestParam("longitude") String longitude,
  31. @RequestParam("latitude") String latitude,
  32. @RequestParam(value = "imageContent", required = false) MultipartFile[] multipartFile) {
  33. try {
  34. String publishId = this.movementsService.savePublish(textContent, location, longitude, latitude, multipartFile);
  35. if (StringUtils.isNotEmpty(publishId)) {
  36. // 发送消息
  37. this.quanziMQService.publishMsg(publishId);
  38. return ResponseEntity.ok(null);
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  44. }
  45. /**
  46. * 查询好友动态
  47. *
  48. * @param page
  49. * @param pageSize
  50. * @return
  51. */
  52. @GetMapping
  53. public PageResult queryPublishList(@RequestParam(value = "page", defaultValue = "1") Integer page,
  54. @RequestParam(value = "pagesize", defaultValue = "10") Integer pageSize) {
  55. return this.movementsService.queryPublishList(page, pageSize, false);
  56. }
  57. /**
  58. * 查询推荐动态
  59. *
  60. * @param page
  61. * @param pageSize
  62. * @return
  63. */
  64. @GetMapping("recommend")
  65. public PageResult queryRecommendPublishList(@RequestParam(value = "page", defaultValue = "1") Integer page,
  66. @RequestParam(value = "pagesize", defaultValue = "10") Integer pageSize) {
  67. return this.movementsService.queryPublishList(page, pageSize, true);
  68. }
  69. /**
  70. * 点赞
  71. *
  72. * @param publishId
  73. * @return
  74. */
  75. @GetMapping("/{id}/like")
  76. public ResponseEntity<Long> likeComment(@PathVariable("id") String publishId) {
  77. try {
  78. Long likeCount = this.movementsService.likeComment(publishId);
  79. if (likeCount != null) {
  80. //发送点赞消息
  81. this.quanziMQService.likePublishMsg(publishId);
  82. return ResponseEntity.ok(likeCount);
  83. }
  84. } catch (Exception e) {
  85. e.printStackTrace();
  86. }
  87. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  88. }
  89. /**
  90. * 取消点赞
  91. *
  92. * @param publishId
  93. * @return
  94. */
  95. @GetMapping("/{id}/dislike")
  96. public ResponseEntity<Long> disLikeComment(@PathVariable("id") String publishId) {
  97. try {
  98. Long likeCount = this.movementsService.cancelLikeComment(publishId);
  99. if (null != likeCount) {
  100. //发送取消点赞消息
  101. this.quanziMQService.disLikePublishMsg(publishId);
  102. return ResponseEntity.ok(likeCount);
  103. }
  104. } catch (Exception e) {
  105. e.printStackTrace();
  106. }
  107. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  108. }
  109. /**
  110. * 喜欢
  111. *
  112. * @param publishId
  113. * @return
  114. */
  115. @GetMapping("/{id}/love")
  116. public ResponseEntity<Long> loveComment(@PathVariable("id") String publishId) {
  117. try {
  118. Long loveCount = this.movementsService.loveComment(publishId);
  119. if (null != loveCount) {
  120. //发送喜欢消息
  121. this.quanziMQService.lovePublishMsg(publishId);
  122. return ResponseEntity.ok(loveCount);
  123. }
  124. } catch (Exception e) {
  125. e.printStackTrace();
  126. }
  127. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  128. }
  129. /**
  130. * 取消喜欢
  131. *
  132. * @param publishId
  133. * @return
  134. */
  135. @GetMapping("/{id}/unlove")
  136. public ResponseEntity<Long> disLoveComment(@PathVariable("id") String publishId) {
  137. try {
  138. Long loveCount = this.movementsService.cancelLoveComment(publishId);
  139. if (null != loveCount) {
  140. //发送取消喜欢消息
  141. this.quanziMQService.disLovePublishMsg(publishId);
  142. return ResponseEntity.ok(loveCount);
  143. }
  144. } catch (Exception e) {
  145. e.printStackTrace();
  146. }
  147. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  148. }
  149. /**
  150. * 查询单条动态信息
  151. *
  152. * @param publishId
  153. * @return
  154. */
  155. @GetMapping("/{id}")
  156. public ResponseEntity<Movements> queryById(@PathVariable("id") String publishId) {
  157. try {
  158. Movements movements = this.movementsService.queryById(publishId);
  159. if (null != movements) {
  160. //发送消息
  161. this.quanziMQService.queryPublishMsg(publishId);
  162. return ResponseEntity.ok(movements);
  163. }
  164. } catch (Exception e) {
  165. e.printStackTrace();
  166. }
  167. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  168. }
  169. }

CommentsController:

  1. /**
  2. * 发表评论
  3. *
  4. * @param param
  5. * @return
  6. */
  7. @PostMapping
  8. public ResponseEntity<Void> saveComments(@RequestBody Map<String, String> param) {
  9. try {
  10. String publishId = param.get("movementId");
  11. String content = param.get("comment");
  12. Boolean bool = this.commentsService.saveComments(publishId, content);
  13. if (bool) {
  14. //发送消息
  15. this.quanziMQService.sendCommentPublishMsg(publishId);
  16. return ResponseEntity.ok(null);
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  22. }

1.6、接收消息

1.6.1、创建itcast-tanhua-recommend工程

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>itcast-tanhua</artifactId>
  7. <groupId>cn.itcast.tanhua</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>itcast-tanhua-recommend</artifactId>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter-test</artifactId>
  20. <scope>test</scope>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-data-redis</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-data-mongodb</artifactId>
  29. </dependency>
  30. <!--其他工具包依赖-->
  31. <dependency>
  32. <groupId>org.apache.commons</groupId>
  33. <artifactId>commons-lang3</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>commons-io</groupId>
  37. <artifactId>commons-io</artifactId>
  38. <version>2.6</version>
  39. </dependency>
  40. <!--RocketMQ相关-->
  41. <dependency>
  42. <groupId>org.apache.rocketmq</groupId>
  43. <artifactId>rocketmq-spring-boot-starter</artifactId>
  44. <version>2.0.3</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.apache.rocketmq</groupId>
  48. <artifactId>rocketmq-client</artifactId>
  49. <version>4.6.0</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.projectlombok</groupId>
  53. <artifactId>lombok</artifactId>
  54. </dependency>
  55. <dependency>
  56. <groupId>joda-time</groupId>
  57. <artifactId>joda-time</artifactId>
  58. </dependency>
  59. </dependencies>
  60. </project>

1.6.2、配置文件

application.properties

  1. spring.application.name = itcast-rocketmq
  2. server.port = 18082
  3. # RocketMQ相关配置
  4. rocketmq.name-server=172.16.55.155:9876
  5. rocketmq.producer.group=tanhua
  6. # mongodb相关配置
  7. #spring.data.mongodb.uri=mongodb://192.168.31.81:27017/tanhua
  8. #设置了密码的mongodb配置方式
  9. spring.data.mongodb.username=tanhua
  10. spring.data.mongodb.password=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV
  11. spring.data.mongodb.authentication-database=admin
  12. spring.data.mongodb.database=tanhua
  13. spring.data.mongodb.port=27017
  14. spring.data.mongodb.host=192.168.31.81

log4j.properties

  1. log4j.rootLogger=DEBUG,A1
  2. log4j.appender.A1=org.apache.log4j.ConsoleAppender
  3. log4j.appender.A1.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.A1.layout.ConversionPattern=[%t] [%c]-[%p] %m%n

1.6.3、RecommendQuanZi

存储到MongoDB的中的实体结构。

  1. package com.tanhua.recommend.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.bson.types.ObjectId;
  6. @Data
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class RecommendQuanZi {
  10. private ObjectId id;
  11. private Long userId;// 用户id
  12. private Long publishId; //动态id,需要转化为Long类型
  13. private Double score; //得分
  14. private Long date; //时间戳
  15. }

1.6.4、QuanZiMsgConsumer

  1. package com.tanhua.recommend.msg;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.tanhua.dubbo.server.pojo.Publish;
  5. import com.tanhua.recommend.pojo.RecommendQuanZi;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  8. import org.apache.rocketmq.spring.core.RocketMQListener;
  9. import org.bson.types.ObjectId;
  10. import org.joda.time.DateTime;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.data.mongodb.core.MongoTemplate;
  15. import org.springframework.stereotype.Component;
  16. import org.springframework.util.CollectionUtils;
  17. @Component
  18. @RocketMQMessageListener(topic = "tanhua-quanzi",
  19. consumerGroup = "tanhua-quanzi-consumer")
  20. public class QuanZiMsgConsumer implements RocketMQListener<String> {
  21. private static final ObjectMapper MAPPER = new ObjectMapper();
  22. private static final Logger LOGGER = LoggerFactory.getLogger(QuanZiMsgConsumer.class);
  23. @Autowired
  24. private MongoTemplate mongoTemplate;
  25. @Override
  26. public void onMessage(String msg) {
  27. try {
  28. JsonNode jsonNode = MAPPER.readTree(msg);
  29. Long userId = jsonNode.get("userId").asLong();
  30. Long pid = jsonNode.get("pid").asLong();
  31. String publishId = jsonNode.get("publishId").asText();
  32. Integer type = jsonNode.get("type").asInt();
  33. //1-发动态,2-浏览动态, 3-点赞, 4-喜欢, 5-评论,6-取消点赞,7-取消喜欢
  34. RecommendQuanZi recommendQuanZi = new RecommendQuanZi();
  35. recommendQuanZi.setUserId(userId);
  36. recommendQuanZi.setId(ObjectId.get());
  37. recommendQuanZi.setDate(System.currentTimeMillis());
  38. recommendQuanZi.setPublishId(pid);
  39. switch (type) {
  40. case 1: {
  41. int score = 0;
  42. Publish publish = this.mongoTemplate.findById(new ObjectId(publishId), Publish.class);
  43. int count = StringUtils.length(publish.getText());
  44. if (count >= 0 && count <= 50) {
  45. score += 1;
  46. } else if (count <= 100) {
  47. score += 2;
  48. } else {
  49. score += 3;
  50. }
  51. if (!CollectionUtils.isEmpty(publish.getMedias())) {
  52. score += publish.getMedias().size();
  53. }
  54. recommendQuanZi.setScore(Double.valueOf(score));
  55. break;
  56. }
  57. case 2: {
  58. recommendQuanZi.setScore(1d);
  59. break;
  60. }
  61. case 3: {
  62. recommendQuanZi.setScore(5d);
  63. break;
  64. }
  65. case 4: {
  66. recommendQuanZi.setScore(8d);
  67. break;
  68. }
  69. case 5: {
  70. recommendQuanZi.setScore(10d);
  71. break;
  72. }
  73. case 6: {
  74. recommendQuanZi.setScore(-5d);
  75. break;
  76. }
  77. case 7: {
  78. recommendQuanZi.setScore(-8d);
  79. break;
  80. }
  81. default: {
  82. recommendQuanZi.setScore(0d);
  83. break;
  84. }
  85. }
  86. // String collectionName = "recommend_quanzi_" + new DateTime().toString("yyyyMMdd");
  87. //为了方便测试,将数据写到一张表
  88. String collectionName = "recommend_quanzi";
  89. this.mongoTemplate.save(recommendQuanZi, collectionName);
  90. } catch (Exception e) {
  91. LOGGER.error("处理消息失败~" + msg, e);
  92. }
  93. }
  94. }

1.7、测试

1.7.1、发布动态

day07-圈子推荐功能实现 - 图3

发布4张图片:

day07-圈子推荐功能实现 - 图4

数据:

day07-圈子推荐功能实现 - 图5

消息处理:

day07-圈子推荐功能实现 - 图6

1.7.2、浏览动态

day07-圈子推荐功能实现 - 图7

消息处理:

day07-圈子推荐功能实现 - 图8

1.7.3、点赞

day07-圈子推荐功能实现 - 图9

消息处理:

day07-圈子推荐功能实现 - 图10

1.7.4、取消点赞

day07-圈子推荐功能实现 - 图11

消息处理:

day07-圈子推荐功能实现 - 图12

1.7.5、喜欢

day07-圈子推荐功能实现 - 图13

消息处理:

day07-圈子推荐功能实现 - 图14

1.7.6、取消喜欢

day07-圈子推荐功能实现 - 图15

消息处理:

day07-圈子推荐功能实现 - 图16

1.7.7、评论

day07-圈子推荐功能实现 - 图17

消息处理:

day07-圈子推荐功能实现 - 图18

2、推荐系统

在推荐系统中,我们将基于前面写入到推荐表中的数据通过Spark进行计算,在Spark计算完成后将结果写入到Redis中,以供在业务系统中进行查询。

2.1、导入数据

使用资料中提供的centos7镜像或将资料中的mongodb数据导入到现有的mongodb服务中。

导入说明:

  • 将资料目录中的mongodb.tar.gz上传至mongodb所在服务器
  • 将其解压到/var/lib/docker/volumes目录下,原有mongodb目录要删除
  • 重启mongodb服务

    • docker restart mongodb
  • 使用客户端连接到mongodb服务进行查看
  • 数据中所关联的用户数据在tanhua.sql文件中,将其替换原有的数据

mongodb:

day07-圈子推荐功能实现 - 图19

mysql数据:

day07-圈子推荐功能实现 - 图20

2.2、部署圈子推荐服务

推荐服务我们将基于docker的形式进行部署:

  1. #拉取镜像
  2. docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0
  3. #创建容器
  4. docker create --name tanhua-spark-quanzi --restart=always \
  5. --env MONGODB_HOST=192.168.31.81 \
  6. --env MONGODB_PORT=27017 \
  7. --env MONGODB_USERNAME=tanhua \
  8. --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
  9. --env MONGODB_DATABASE=tanhua \
  10. --env MONGODB_COLLECTION=recommend_quanzi \
  11. --env SCHEDULE_PERIOD=3 \
  12. --env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
  13. registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0
  14. #参数说明
  15. #MONGODB_HOST mongodb服务的地址
  16. #MONGODB_PORT mongodb服务的端口
  17. #MONGODB_USERNAME mongodb服务的认证用户名
  18. #MONGODB_PASSWORD mongodb服务的认证密码
  19. #MONGODB_DATABASE mongodb连接的数据库
  20. #MONGODB_COLLECTION 操作表
  21. #SCHEDULE_PERIOD 下次执行时间间隔,但是为分,默认为10分钟
  22. #REDIS_NODES redis集群地址,也可以使用单节点
  23. #mongodb开启认证服务
  24. #docker create --name mongodb --restart=always -p 27017:27017 -v mongodb:/data/db mongo:4.0.3 --auth
  25. #启动服务,启动之后就会进行执行,在SCHEDULE_PERIOD时间后再次执行
  26. docker start tanhua-spark-quanzi
  27. #查看日志
  28. docker logs -f tanhua-spark-quanzi
  29. #执行完成后会将数据写入到redis中

2.3、测试

进入redis查看是否已经有数据:

day07-圈子推荐功能实现 - 图21

3、修改查询逻辑

之前是通过MongoDB直接查询,而现在需要先从Redis进行命中,如果未命中则需要进行MongoDB查询。

修改server工程中的MovementsService类型:

  1. /**
  2. * 查询动态
  3. *
  4. * @param page
  5. * @param pageSize
  6. * @return
  7. */
  8. public PageResult queryPublishList(Integer page, Integer pageSize, boolean isRecommend) {
  9. PageResult pageResult = new PageResult();
  10. //获取当前的登录信息
  11. User user = UserThreadLocal.get();
  12. PageInfo<Publish> pageInfo = null;
  13. if (isRecommend) { //推荐动态逻辑处理
  14. // 查询Redis
  15. String value = this.redisTemplate.opsForValue().get("QUANZI_PUBLISH_RECOMMEND_" + user.getId());
  16. if (StringUtils.isNotEmpty(value)) {
  17. String[] pids = StringUtils.split(value, ',');
  18. int startIndex = (page - 1) * pageSize;
  19. if(startIndex < pids.length){
  20. int endIndex = startIndex + pageSize - 1;
  21. if (endIndex >= pids.length) {
  22. endIndex = pids.length - 1;
  23. }
  24. List<Long> pidList = new ArrayList<>();
  25. for (int i = startIndex; i <= endIndex; i++) {
  26. pidList.add(Long.valueOf(pids[i]));
  27. }
  28. List<Publish> publishList = this.quanZiApi.queryPublishByPids(pidList);
  29. pageInfo = new PageInfo<>();
  30. pageInfo.setRecords(publishList);
  31. }
  32. }
  33. }
  34. if (null == pageInfo) {
  35. Long userId = isRecommend ? null : user.getId();
  36. pageInfo = this.quanZiApi.queryPublishList(userId, page, pageSize);
  37. }
  38. pageResult.setPagesize(pageSize);
  39. pageResult.setPage(page);
  40. pageResult.setCounts(0);
  41. pageResult.setPages(0);
  42. List<Publish> records = pageInfo.getRecords();
  43. if (CollectionUtils.isEmpty(records)) {
  44. //没有动态信息
  45. return pageResult;
  46. }
  47. List<Movements> movementsList = new ArrayList<>();
  48. for (Publish record : records) {
  49. Movements movements = new Movements();
  50. movements.setId(record.getId().toHexString());
  51. movements.setImageContent(record.getMedias().toArray(new String[]{}));
  52. movements.setTextContent(record.getText());
  53. movements.setUserId(record.getUserId());
  54. movements.setCreateDate(RelativeDateFormat.format(new Date(record.getCreated())));
  55. movementsList.add(movements);
  56. }
  57. List<Long> userIds = new ArrayList<>();
  58. for (Movements movements : movementsList) {
  59. if (!userIds.contains(movements.getUserId())) {
  60. userIds.add(movements.getUserId());
  61. }
  62. }
  63. QueryWrapper<UserInfo> queryWrapper = new QueryWrapper<>();
  64. queryWrapper.in("user_id", userIds);
  65. List<UserInfo> userInfos = this.userInfoService.queryList(queryWrapper);
  66. for (Movements movements : movementsList) {
  67. for (UserInfo userInfo : userInfos) {
  68. if (movements.getUserId().longValue() == userInfo.getUserId().longValue()) {
  69. this.fillValueToMovements(movements, userInfo);
  70. break;
  71. }
  72. }
  73. }
  74. pageResult.setItems(movementsList);
  75. return pageResult;
  76. }

测试: day07-圈子推荐功能实现 - 图22

day07-圈子推荐功能实现 - 图23

可以看到,已经查询到了动态数据。

4、小视频推荐

小视频的推荐和动态推荐的实现逻辑非常的类似。

4.1、增加自增id

  1. package com.tanhua.dubbo.server.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.bson.types.ObjectId;
  6. import org.springframework.data.mongodb.core.mapping.Document;
  7. import java.util.List;
  8. @Data
  9. @NoArgsConstructor
  10. @AllArgsConstructor
  11. @Document(collection = "video")
  12. public class Video implements java.io.Serializable {
  13. private static final long serialVersionUID = -3136732836884933873L;
  14. private ObjectId id; //主键id
  15. private Long vid;
  16. private Long userId;
  17. private String text; //文字
  18. private String picUrl; //视频封面文件
  19. private String videoUrl; //视频文件
  20. private Long created; //创建时间
  21. private Integer seeType; // 谁可以看,1-公开,2-私密,3-部分可见,4-不给谁看
  22. private List<Long> seeList; //部分可见的列表
  23. private List<Long> notSeeList; //不给谁看的列表
  24. private String longitude; //经度
  25. private String latitude; //纬度
  26. private String locationName; //位置名称
  27. }

修改VideoApiImpl逻辑:

  1. @Override
  2. public Boolean saveVideo(Video video) {
  3. if (video.getUserId() == null) {
  4. return false;
  5. }
  6. video.setId(ObjectId.get());
  7. video.setCreated(System.currentTimeMillis());
  8. //生成vid
  9. video.setVid(this.idService.createId("video", video.getId().toHexString()));
  10. this.mongoTemplate.save(video);
  11. return true;
  12. }

4.2、动态计分规则

  • 发布+2
  • 点赞 +5
  • 评论 + 10

4.3、发送消息

4.3.1、VideoMQService

  1. package com.tanhua.server.service;
  2. import com.alibaba.dubbo.config.annotation.Reference;
  3. import com.tanhua.dubbo.server.api.QuanZiApi;
  4. import com.tanhua.dubbo.server.api.VideoApi;
  5. import com.tanhua.dubbo.server.pojo.Publish;
  6. import com.tanhua.dubbo.server.pojo.Video;
  7. import com.tanhua.server.pojo.User;
  8. import com.tanhua.server.utils.UserThreadLocal;
  9. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.stereotype.Service;
  14. import java.util.HashMap;
  15. import java.util.Map;
  16. @Service
  17. public class VideoMQService {
  18. private static final Logger LOGGER = LoggerFactory.getLogger(VideoMQService.class);
  19. @Autowired
  20. private RocketMQTemplate rocketMQTemplate;
  21. @Reference(version = "1.0.0")
  22. private VideoApi videoApi;
  23. /**
  24. * 发布小视频消息
  25. *
  26. * @return
  27. */
  28. public Boolean videoMsg(String videoId) {
  29. return this.sendMsg(videoId, 1);
  30. }
  31. /**
  32. * 点赞小视频
  33. *
  34. * @return
  35. */
  36. public Boolean likeVideoMsg(String videoId) {
  37. return this.sendMsg(videoId, 2);
  38. }
  39. /**
  40. * 取消点赞小视频
  41. *
  42. * @return
  43. */
  44. public Boolean disLikeVideoMsg(String videoId) {
  45. return this.sendMsg(videoId, 3);
  46. }
  47. /**
  48. * 评论小视频
  49. *
  50. * @return
  51. */
  52. public Boolean commentVideoMsg(String videoId) {
  53. return this.sendMsg(videoId, 4);
  54. }
  55. /**
  56. * 发送小视频操作相关的消息
  57. *
  58. * @param videoId
  59. * @param type 1-发动态,2-点赞, 3-取消点赞,4-评论
  60. * @return
  61. */
  62. private Boolean sendMsg(String videoId, Integer type) {
  63. try {
  64. User user = UserThreadLocal.get();
  65. Video video = this.videoApi.queryVideoById(videoId);
  66. //构建消息
  67. Map<String, Object> msg = new HashMap<>();
  68. msg.put("userId", user.getId());
  69. msg.put("date", System.currentTimeMillis());
  70. msg.put("videoId", videoId);
  71. msg.put("vid", video.getVid());
  72. msg.put("type", type);
  73. this.rocketMQTemplate.convertAndSend("tanhua-video", msg);
  74. } catch (Exception e) {
  75. LOGGER.error("发送消息失败! videoId = " + videoId + ", type = " + type, e);
  76. return false;
  77. }
  78. return true;
  79. }
  80. }

4.3.2、VideoController

  1. package com.tanhua.server.controller;
  2. import com.tanhua.server.service.MovementsService;
  3. import com.tanhua.server.service.VideoMQService;
  4. import com.tanhua.server.service.VideoService;
  5. import com.tanhua.server.vo.PageResult;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.http.HttpStatus;
  9. import org.springframework.http.ResponseEntity;
  10. import org.springframework.web.bind.annotation.*;
  11. import org.springframework.web.multipart.MultipartFile;
  12. import java.util.Map;
  13. @RestController
  14. @RequestMapping("smallVideos")
  15. public class VideoController {
  16. @Autowired
  17. private VideoService videoService;
  18. @Autowired
  19. private MovementsService movementsService;
  20. @Autowired
  21. private CommentsController commentsController;
  22. @Autowired
  23. private VideoMQService videoMQService;
  24. /**
  25. * 发布小视频
  26. *
  27. * @param picFile
  28. * @param videoFile
  29. * @return
  30. */
  31. @PostMapping
  32. public ResponseEntity<Void> saveVideo(@RequestParam(value = "videoThumbnail", required = false) MultipartFile picFile,
  33. @RequestParam(value = "videoFile", required = false) MultipartFile videoFile) {
  34. try {
  35. String id = this.videoService.saveVideo(picFile, videoFile);
  36. if (StringUtils.isNotEmpty(id)) {
  37. //发送消息
  38. this.videoMQService.videoMsg(id);
  39. return ResponseEntity.ok(null);
  40. }
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  45. }
  46. /**
  47. * 查询小视频列表
  48. *
  49. * @param page
  50. * @param pageSize
  51. * @return
  52. */
  53. @GetMapping
  54. public ResponseEntity<PageResult> queryVideoList(@RequestParam(value = "page", defaultValue = "1") Integer page,
  55. @RequestParam(value = "pagesize", defaultValue = "10") Integer pageSize) {
  56. try {
  57. if (page <= 0) {
  58. page = 1;
  59. }
  60. PageResult pageResult = this.videoService.queryVideoList(page, pageSize);
  61. if (null != pageResult) {
  62. return ResponseEntity.ok(pageResult);
  63. }
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  68. }
  69. /**
  70. * 视频点赞
  71. *
  72. * @param videoId 视频id
  73. * @return
  74. */
  75. @PostMapping("/{id}/like")
  76. public ResponseEntity<Long> likeComment(@PathVariable("id") String videoId) {
  77. try {
  78. Long likeCount = this.movementsService.likeComment(videoId);
  79. if (likeCount != null) {
  80. this.videoMQService.likeVideoMsg(videoId);
  81. return ResponseEntity.ok(likeCount);
  82. }
  83. } catch (Exception e) {
  84. e.printStackTrace();
  85. }
  86. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  87. }
  88. /**
  89. * 取消点赞
  90. *
  91. * @param videoId
  92. * @return
  93. */
  94. @PostMapping("/{id}/dislike")
  95. public ResponseEntity<Long> disLikeComment(@PathVariable("id") String videoId) {
  96. try {
  97. Long likeCount = this.movementsService.cancelLikeComment(videoId);
  98. if (null != likeCount) {
  99. this.videoMQService.disLikeVideoMsg(videoId);
  100. return ResponseEntity.ok(likeCount);
  101. }
  102. } catch (Exception e) {
  103. e.printStackTrace();
  104. }
  105. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  106. }
  107. /**
  108. * 评论点赞
  109. *
  110. * @param publishId
  111. * @return
  112. */
  113. @PostMapping("/comments/{id}/like")
  114. public ResponseEntity<Long> commentsLikeComment(@PathVariable("id") String publishId) {
  115. try {
  116. Long likeCount = this.movementsService.likeComment(publishId);
  117. if (likeCount != null) {
  118. return ResponseEntity.ok(likeCount);
  119. }
  120. } catch (Exception e) {
  121. e.printStackTrace();
  122. }
  123. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  124. }
  125. /**
  126. * 评论取消点赞
  127. *
  128. * @param publishId
  129. * @return
  130. */
  131. @PostMapping("/comments/{id}/dislike")
  132. public ResponseEntity<Long> disCommentsLikeComment(@PathVariable("id") String publishId) {
  133. try {
  134. Long likeCount = this.movementsService.cancelLikeComment(publishId);
  135. if (null != likeCount) {
  136. return ResponseEntity.ok(likeCount);
  137. }
  138. } catch (Exception e) {
  139. e.printStackTrace();
  140. }
  141. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  142. }
  143. /**
  144. * 提交评论
  145. *
  146. * @param param
  147. * @param videoId
  148. * @return
  149. */
  150. @PostMapping("/{id}/comments")
  151. public ResponseEntity<Void> saveComments(@RequestBody Map<String, String> param,
  152. @PathVariable("id") String videoId) {
  153. param.put("movementId", videoId);
  154. ResponseEntity<Void> entity = this.commentsController.saveComments(param);
  155. if (entity.getStatusCode().is2xxSuccessful()) {
  156. //发送消息
  157. this.videoMQService.commentVideoMsg(videoId);
  158. }
  159. return entity;
  160. }
  161. /**
  162. * 评论列表
  163. */
  164. @GetMapping("/{id}/comments")
  165. public ResponseEntity<PageResult> queryCommentsList(@PathVariable("id") String videoId,
  166. @RequestParam(value = "page", defaultValue = "1") Integer page,
  167. @RequestParam(value = "pagesize", defaultValue = "10") Integer pagesize) {
  168. return this.commentsController.queryCommentsList(videoId, page, pagesize);
  169. }
  170. /**
  171. * 视频用户关注
  172. */
  173. @PostMapping("/{id}/userFocus")
  174. public ResponseEntity<Void> saveUserFocusComments(@PathVariable("id") Long userId) {
  175. try {
  176. Boolean bool = this.videoService.followUser(userId);
  177. if (bool) {
  178. return ResponseEntity.ok(null);
  179. }
  180. } catch (Exception e) {
  181. e.printStackTrace();
  182. }
  183. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  184. }
  185. /**
  186. * 视频用户关注
  187. */
  188. @PostMapping("/{id}/userUnFocus")
  189. public ResponseEntity<Void> saveUserUnFocusComments(@PathVariable("id") Long userId) {
  190. try {
  191. Boolean bool = this.videoService.disFollowUser(userId);
  192. if (bool) {
  193. return ResponseEntity.ok(null);
  194. }
  195. } catch (Exception e) {
  196. e.printStackTrace();
  197. }
  198. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
  199. }
  200. }

4.4、接收消息

4.4.1、RecommendVideo

  1. package com.tanhua.recommend.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.bson.types.ObjectId;
  6. @Data
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class RecommendVideo {
  10. private ObjectId id;
  11. private Long userId;// 用户id
  12. private Long videoId; //视频id,需要转化为Long类型
  13. private Double score; //得分
  14. private Long date; //时间戳
  15. }

4.4.2、VideoMsgConsumer

  1. package com.tanhua.recommend.msg;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.tanhua.recommend.pojo.Publish;
  5. import com.tanhua.recommend.pojo.RecommendQuanZi;
  6. import com.tanhua.recommend.pojo.RecommendVideo;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  9. import org.apache.rocketmq.spring.core.RocketMQListener;
  10. import org.bson.types.ObjectId;
  11. import org.joda.time.DateTime;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.data.mongodb.core.MongoTemplate;
  16. import org.springframework.stereotype.Component;
  17. import org.springframework.util.CollectionUtils;
  18. @Component
  19. @RocketMQMessageListener(topic = "tanhua-video",
  20. consumerGroup = "tanhua-video-consumer")
  21. public class VideoMsgConsumer implements RocketMQListener<String> {
  22. private static final ObjectMapper MAPPER = new ObjectMapper();
  23. private static final Logger LOGGER = LoggerFactory.getLogger(VideoMsgConsumer.class);
  24. @Autowired
  25. private MongoTemplate mongoTemplate;
  26. @Override
  27. public void onMessage(String msg) {
  28. try {
  29. JsonNode jsonNode = MAPPER.readTree(msg);
  30. Long userId = jsonNode.get("userId").asLong();
  31. Long vid = jsonNode.get("vid").asLong();
  32. Integer type = jsonNode.get("type").asInt();
  33. //1-发动态,2-点赞, 3-取消点赞,4-评论
  34. RecommendVideo recommendVideo = new RecommendVideo();
  35. recommendVideo.setUserId(userId);
  36. recommendVideo.setId(ObjectId.get());
  37. recommendVideo.setDate(System.currentTimeMillis());
  38. recommendVideo.setVideoId(vid);
  39. switch (type) {
  40. case 1: {
  41. recommendVideo.setScore(2d);
  42. break;
  43. }
  44. case 2: {
  45. recommendVideo.setScore(5d);
  46. break;
  47. }
  48. case 3: {
  49. recommendVideo.setScore(-5d);
  50. break;
  51. }
  52. case 4: {
  53. recommendVideo.setScore(10d);
  54. break;
  55. }
  56. default: {
  57. recommendVideo.setScore(0d);
  58. break;
  59. }
  60. }
  61. String collectionName = "recommend_video_" + new DateTime().toString("yyyyMMdd");
  62. this.mongoTemplate.save(recommendVideo, collectionName);
  63. } catch (Exception e) {
  64. LOGGER.error("处理小视频消息失败~" + msg, e);
  65. }
  66. }
  67. }

4.4.3、测试

day07-圈子推荐功能实现 - 图24

4.5、部署推荐服务

  1. #拉取镜像
  2. docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0
  3. #创建容器
  4. docker create --name tanhua-spark-video --restart=always \
  5. --env MONGODB_HOST=192.168.31.81 \
  6. --env MONGODB_PORT=27017 \
  7. --env MONGODB_USERNAME=tanhua \
  8. --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
  9. --env MONGODB_DATABASE=tanhua \
  10. --env MONGODB_COLLECTION=recommend_video \
  11. --env SCHEDULE_PERIOD=3 \
  12. --env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
  13. registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0
  14. #启动服务
  15. docker start tanhua-spark-video
  16. #查看日志
  17. docker logs -f tanhua-spark-video

测试:

day07-圈子推荐功能实现 - 图25

4.6、修改查询逻辑

修改VideoService的实现:

  1. public PageResult queryVideoList(Integer page, Integer pageSize) {
  2. User user = UserThreadLocal.get();
  3. PageResult pageResult = new PageResult();
  4. pageResult.setPage(page);
  5. pageResult.setPagesize(pageSize);
  6. pageResult.setPages(0);
  7. pageResult.setCounts(0);
  8. PageInfo<Video> pageInfo = null;
  9. //先从Redis进行命中,如果命中则返回推荐列表,如果未命中查询默认列表
  10. String redisValue = this.redisTemplate.opsForValue().get("QUANZI_VIDEO_RECOMMEND_" + user.getId());
  11. if (StringUtils.isNotEmpty(redisValue)) {
  12. String[] pids = StringUtils.split(redisValue, ',');
  13. int startIndex = (page - 1) * pageSize;
  14. if (startIndex < pids.length) {
  15. int endIndex = startIndex + pageSize - 1;
  16. if (endIndex >= pids.length) {
  17. endIndex = pids.length - 1;
  18. }
  19. List<Long> vidList = new ArrayList<>();
  20. for (int i = startIndex; i <= endIndex; i++) {
  21. vidList.add(Long.valueOf(pids[i]));
  22. }
  23. List<Video> videoList = this.videoApi.queryVideoListByPids(vidList);
  24. pageInfo = new PageInfo<>();
  25. pageInfo.setRecords(videoList);
  26. }
  27. }
  28. if(null == pageInfo){
  29. pageInfo = this.videoApi.queryVideoList(page, pageSize);
  30. }
  31. List<Video> records = pageInfo.getRecords();
  32. List<VideoVo> videoVoList = new ArrayList<>();
  33. List<Long> userIds = new ArrayList<>();
  34. for (Video record : records) {
  35. VideoVo videoVo = new VideoVo();
  36. videoVo.setUserId(record.getUserId());
  37. videoVo.setCover(record.getPicUrl());
  38. videoVo.setVideoUrl(record.getVideoUrl());
  39. videoVo.setId(record.getId().toHexString());
  40. videoVo.setSignature("我就是我~");
  41. Long commentCount = this.quanZiApi.queryCommentCount(videoVo.getId(), 2);
  42. videoVo.setCommentCount(commentCount == null ? 0 : commentCount.intValue()); // 评论数
  43. String followUserKey = "VIDEO_FOLLOW_USER_" + user.getId() + "_" + videoVo.getUserId();
  44. videoVo.setHasFocus(this.redisTemplate.hasKey(followUserKey) ? 1 : 0); //是否关注
  45. String userKey = "QUANZI_COMMENT_LIKE_USER_" + user.getId() + "_" + videoVo.getId();
  46. videoVo.setHasLiked(this.redisTemplate.hasKey(userKey) ? 1 : 0); //是否点赞(1是,0否)
  47. String key = "QUANZI_COMMENT_LIKE_" + videoVo.getId();
  48. String value = this.redisTemplate.opsForValue().get(key);
  49. if (StringUtils.isNotEmpty(value)) {
  50. videoVo.setLikeCount(Integer.valueOf(value)); //点赞数
  51. } else {
  52. videoVo.setLikeCount(0);
  53. }
  54. if (!userIds.contains(record.getUserId())) {
  55. userIds.add(record.getUserId());
  56. }
  57. videoVoList.add(videoVo);
  58. }
  59. QueryWrapper<UserInfo> queryWrapper = new QueryWrapper();
  60. queryWrapper.in("user_id", userIds);
  61. List<UserInfo> userInfos = this.userInfoService.queryList(queryWrapper);
  62. for (VideoVo videoVo : videoVoList) {
  63. for (UserInfo userInfo : userInfos) {
  64. if (videoVo.getUserId().longValue() == userInfo.getUserId().longValue()) {
  65. videoVo.setNickname(userInfo.getNickName());
  66. videoVo.setAvatar(userInfo.getLogo());
  67. break;
  68. }
  69. }
  70. }
  71. pageResult.setItems(videoVoList);
  72. return pageResult;
  73. }