课程说明

  • 了解推荐系统
  • 实现好友的推荐
  • 圈子推荐功能说明
  • 圈子推荐功能流程
  • 圈子推荐功能的实现
  • 小视频推荐功能的实现

    1、了解推荐系统

    1.1、什么是推荐系统?

    为了解决信息过载和用户无明确需求的问题,找到用户感兴趣的物品,才有了个性化推荐系统。
    其实,解决信息过载的问题,代表性的解决方案是分类目录和搜索引擎,如hao123,电商首页的分类目录以及百度,360搜索等。
    不过分类目录和搜索引擎只能解决用户主动查找信息的需求,即用户知道自己想要什么,并不能解决用户没用明确需求很随便的问题。
    经典语录是:你想吃什么,随便!面对这种很随便又得罪不起的用户(女友和上帝),只能通过分析用户的历史行为给用户的兴趣建模,从而主动给用户推荐能够满足他们兴趣和需求的信息。比如问问女友的闺蜜,她一般什么时候喜欢吃什么。

    1.2、电商是推荐系统的先行者

  • 电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。

  • 不光是电商类,推荐系统无处不在。QQ,微信的好友推荐;新浪微博的你可能感兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;脉脉的同事推荐等。
  • 推荐引擎的鼻祖思想源泉:http://portal.acm.org/citation.cfm?id=1070751
  • 亚马逊最早提出基亍物品的协同过滤推荐算法:http://portal.acm.org/citation.cfm?id=372071

京东的推荐: day10-实现推荐功能 - 图1

1.3、推荐系统业务流程

day10-实现推荐功能 - 图2
推荐系统广泛存在于各类网站中,作为一个应用为用户提供个性化的推荐。它需要一些用户的历史数据,一般由三个部分组成:基础数据、推荐算法系统、前台展示。

  • 基础数据包括很多维度,包括用户的访问、浏览、下单、收藏,用户的历史订单信息,评价信息等很多信息;
  • 推荐算法系统主要是根据不同的推荐诉求由多个算法组成的推荐模型;
  • 前台展示主要是对客户端系统进行响应,返回相关的推荐信息以供展示。

    1.4、协同过滤推荐算法

    迄今为止,在个性化推荐系统中,协同过滤技术是应用最成功的技术。目前国内外有许多大型网站应用这项技术为用户更加智能(个性化、千人千面)的推荐内容。

    核心思想: 协同过滤一般是在海量的用户中发掘出一小部分和你品位比较类似的,在协同过滤中,这些用户成为邻居,然后根据他们喜欢的其他东西组织成一个排序的目彔作为推荐给你。

1.4.1、基于用户的推荐 UserCF

day10-实现推荐功能 - 图3
day10-实现推荐功能 - 图4
对于用户A,根据用户的历史偏好,这里只计算得到一个邻居–用户C,然后将用户C 喜欢的物品D 推荐给用户A。
基于用户的协同过滤算法先计算的是用户与用户的相似度(兴趣相投,物以类聚人以群分),然后将相似度比较接近的用户A购买的物品推荐给用户B,专业的说法是该算法用最近邻居(nearest-neighbor)算法找出一个用户的邻居集合,该集合的用户和该用户有相似的喜好,算法根据邻居的偏好对该用户进行预测。

1.4.2、基于商品的推荐 ItemCF

day10-实现推荐功能 - 图5

  • 基于ItemCF的原理和基于UserCF类似,只是在计算邻居时采用物品本身,而不是从用户的角度,即基于用户对物品的偏好找到相似的物品,然后根据用户的历史偏好,推荐相似的物品给他。
  • 从计算的角度看,就是将所有用户对某个物品的偏好作为一个向量来计算物品之间的相似度,得到物品的相似物品后,根据用户历史的偏好预测当前用户还没有表示偏好的物品,计算得到一个排序的物品列表作为推荐。
  • 解释:对于物品A,根据所有用户的历史偏好,喜欢物品A 的用户都喜欢物品C,得出物品A 和物品C 比较相似,而用户C 喜欢物品A,那么可以推断出用户C 可能也喜欢物品C。

    1.5、ALS算法

    ALS 是交替最小二乘 (alternating least squares)的简称。在机器学习的上下文中,ALS 特指使用交替最小二乘求解的一个协同推荐算法。它通过观察到的所有用户给产品的打分,来推断每个用户的喜好并向用户推荐适合的产品。从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。
    用户和商品的关系,可以抽象为如下的三元组:。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。如下:
    1. 196 242 3 881250949
    2. 186 302 3 891717742
    3. 22 377 1 878887116
    4. 244 51 2 880606923
    5. 166 346 1 886397596
    6. 298 474 4 884182806
    7. 115 265 2 881171488
    8. 253 465 5 891628467
    9. 305 451 3 886324817
    10. 6 86 3 883603013
    11. 62 257 2 879372434
    12. 286 1014 5 879781125
    13. 200 222 5 876042340
    14. 210 40 3 891035994
    15. ................

    2、好友推荐

    对于好友的推荐,需要找出每个用户之间的相似性,具体规则如下:
字段 权重分
年龄差 0-2岁 30分 3-5 20分 5-10岁 10分 10岁以上 0分
性别 异性 30分 同性 0分
位置 同城 20分 不同 0分
学历 相同 20分 不同 0分

2.1、流程

day10-实现推荐功能 - 图6

2.2、部署好友推荐服务

  1. #拉取镜像
  2. docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-recommend-user:1.0.1
  3. #创建容器
  4. docker create --name tanhua-spark-recommend-user \
  5. --env MONGODB_HOST=192.168.31.81 \
  6. --env MONGODB_PORT=27017 \
  7. --env MONGODB_USERNAME=tanhua \
  8. --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
  9. --env MONGODB_DATABASE=tanhua \
  10. --env MONGODB_COLLECTION=recommend_user \
  11. --env JDBC_URL="jdbc:mysql://192.168.31.81:3306/mytanhua?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true&useSSL=false" \
  12. --env JDBC_DRIVER=com.mysql.jdbc.Driver \
  13. --env JDBC_USER=root \
  14. --env JDBC_PASSWORD=root \
  15. --env JDBC_TABLE=tb_user_info \
  16. --env SCHEDULE_PERIOD=30 \
  17. registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-recommend-user:1.0.1
  18. #参数说明
  19. #MONGODB_HOST mongodb服务的地址
  20. #MONGODB_PORT mongodb服务的端口
  21. #MONGODB_USERNAME mongodb服务的认证用户名
  22. #MONGODB_PASSWORD mongodb服务的认证密码
  23. #MONGODB_DATABASE mongodb连接的数据库
  24. #MONGODB_COLLECTION 操作表
  25. #JDBC_URL mysql数据库连接地址
  26. #JDBC_DRIVER jdbc驱动
  27. #JDBC_USER 数据库连接用户名
  28. #JDBC_PASSWORD 数据库连接密码
  29. #JDBC_TABLE 数据库表名
  30. #SCHEDULE_PERIOD 下次执行时间间隔,但是为分,默认为10分钟
  31. #启动服务
  32. docker start tanhua-spark-recommend-user
  33. #查看日志
  34. docker logs -f tanhua-spark-recommend-user

执行完成后,可以看到MongoDB中的recommend_user表中数据已经重新生成了。

3、圈子推荐

3.1、功能说明

在圈子功能中,针对于用户发布的动态信息,系统可以根据用户的发布、浏览、点赞等操作,对动态信息做计算,然后对每个用户进行不同的推荐。

3.2、流程说明

day10-实现推荐功能 - 图7
流程说明:

  • 用户对圈子的动态操作,如:发布、浏览、点赞、喜欢等,就会给RocketMQ进行发送消息;
  • 推荐系统接收消息,并且处理消息数据,处理之后将结果数据写入到MongoDB中;
  • Spark系统拉取数据,然后进行推荐计算;
  • 计算之后的结果数据写入到Redis中,为每个用户都进行个性化推荐;

    3.3、动态计分规则

  • 浏览 +1

  • 点赞 +5
  • 喜欢 +8
  • 评论 + 10
  • 发布动态
    • 文字长度:50以内1分,50~100之间2分,100以上3分
    • 图片个数:每个图片一分

核心推荐逻辑:

  • 推荐模型:用户 | 动态 | 评分
  • 其中,评分是用户对动态操作的得分合计
  • 为什么自己发布动态还要计分? 是因为,自己发布就相当于自己对此动态也感兴趣,这样就可以在相似的人之间进行推荐了。

    3.4、发送消息

    3.4.1、QuanziMQService

    my-tanhua-server增加依赖:

    1. <!--RocketMQ相关-->
    2. <dependency>
    3. <groupId>org.apache.rocketmq</groupId>
    4. <artifactId>rocketmq-spring-boot-starter</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.rocketmq</groupId>
    8. <artifactId>rocketmq-client</artifactId>
    9. </dependency>

    配置文件:

    1. # RocketMQ相关配置
    2. rocketmq.name-server=192.168.31.81:9876
    3. rocketmq.producer.group=tanhua
    1. package com.tanhua.server.service;
    2. import com.alibaba.dubbo.config.annotation.Reference;
    3. import com.tanhua.common.pojo.User;
    4. import com.tanhua.common.utils.UserThreadLocal;
    5. import com.tanhua.dubbo.server.api.QuanZiApi;
    6. import com.tanhua.dubbo.server.pojo.Publish;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.apache.rocketmq.spring.core.RocketMQTemplate;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.stereotype.Service;
    11. import java.util.HashMap;
    12. import java.util.Map;
    13. @Service
    14. @Slf4j
    15. public class QuanziMQService {
    16. @Autowired
    17. private RocketMQTemplate rocketMQTemplate;
    18. @Reference(version = "1.0.0")
    19. private QuanZiApi quanZiApi;
    20. /**
    21. * 发布动态消息
    22. *
    23. * @param publishId
    24. * @return
    25. */
    26. public Boolean publishMsg(String publishId) {
    27. return this.sendMsg(publishId, 1);
    28. }
    29. /**
    30. * 浏览动态消息
    31. *
    32. * @param publishId
    33. * @return
    34. */
    35. public Boolean queryPublishMsg(String publishId) {
    36. return this.sendMsg(publishId, 2);
    37. }
    38. /**
    39. * 点赞动态消息
    40. *
    41. * @param publishId
    42. * @return
    43. */
    44. public Boolean likePublishMsg(String publishId) {
    45. return this.sendMsg(publishId, 3);
    46. }
    47. /**
    48. * 取消点赞动态消息
    49. *
    50. * @param publishId
    51. * @return
    52. */
    53. public Boolean disLikePublishMsg(String publishId) {
    54. return this.sendMsg(publishId, 6);
    55. }
    56. /**
    57. * 喜欢动态消息
    58. *
    59. * @param publishId
    60. * @return
    61. */
    62. public Boolean lovePublishMsg(String publishId) {
    63. return this.sendMsg(publishId, 4);
    64. }
    65. /**
    66. * 取消喜欢动态消息
    67. *
    68. * @param publishId
    69. * @return
    70. */
    71. public Boolean disLovePublishMsg(String publishId) {
    72. return this.sendMsg(publishId, 7);
    73. }
    74. /**
    75. * 评论动态消息
    76. *
    77. * @param publishId
    78. * @return
    79. */
    80. public Boolean commentPublishMsg(String publishId) {
    81. return this.sendMsg(publishId, 5);
    82. }
    83. /**
    84. * 发送圈子操作相关的消息
    85. *
    86. * @param publishId
    87. * @param type 1-发动态,2-浏览动态, 3-点赞, 4-喜欢, 5-评论,6-取消点赞,7-取消喜欢
    88. * @return
    89. */
    90. private Boolean sendMsg(String publishId, Integer type) {
    91. try {
    92. User user = UserThreadLocal.get();
    93. Publish publish = this.quanZiApi.queryPublishById(publishId);
    94. //构建消息
    95. Map<String, Object> msg = new HashMap<>();
    96. msg.put("userId", user.getId());
    97. msg.put("date", System.currentTimeMillis());
    98. msg.put("publishId", publishId);
    99. msg.put("pid", publish.getPid());
    100. msg.put("type", type);
    101. this.rocketMQTemplate.convertAndSend("tanhua-quanzi", msg);
    102. } catch (Exception e) {
    103. log.error("发送消息失败! publishId = " + publishId + ", type = " + type, e);
    104. return false;
    105. }
    106. return true;
    107. }
    108. }

    3.4.2、修改QuanZiService

    在QuanZiService完成发送消息方法调用。

    1. package com.tanhua.server.service;
    2. import cn.hutool.core.bean.BeanUtil;
    3. import cn.hutool.core.collection.CollUtil;
    4. import cn.hutool.core.convert.Convert;
    5. import cn.hutool.core.date.DateUtil;
    6. import cn.hutool.core.util.ObjectUtil;
    7. import cn.hutool.core.util.StrUtil;
    8. import com.alibaba.dubbo.config.annotation.Reference;
    9. import com.tanhua.common.pojo.User;
    10. import com.tanhua.common.pojo.UserInfo;
    11. import com.tanhua.common.service.PicUploadService;
    12. import com.tanhua.common.utils.RelativeDateFormat;
    13. import com.tanhua.common.utils.UserThreadLocal;
    14. import com.tanhua.common.vo.PicUploadResult;
    15. import com.tanhua.dubbo.server.api.QuanZiApi;
    16. import com.tanhua.dubbo.server.api.VisitorsApi;
    17. import com.tanhua.dubbo.server.pojo.Comment;
    18. import com.tanhua.dubbo.server.pojo.Publish;
    19. import com.tanhua.dubbo.server.pojo.Visitors;
    20. import com.tanhua.dubbo.server.vo.PageInfo;
    21. import com.tanhua.server.vo.CommentVo;
    22. import com.tanhua.server.vo.PageResult;
    23. import com.tanhua.server.vo.QuanZiVo;
    24. import com.tanhua.server.vo.VisitorsVo;
    25. import org.apache.commons.lang3.StringUtils;
    26. import org.springframework.beans.factory.annotation.Autowired;
    27. import org.springframework.stereotype.Service;
    28. import org.springframework.web.multipart.MultipartFile;
    29. import java.util.*;
    30. @Service
    31. public class QuanZiService {
    32. @Reference(version = "1.0.0")
    33. private QuanZiApi quanZiApi;
    34. @Reference(version = "1.0.0")
    35. private VisitorsApi visitorsApi;
    36. @Autowired
    37. private UserService userService;
    38. @Autowired
    39. private UserInfoService userInfoService;
    40. @Autowired
    41. private PicUploadService picUploadService;
    42. @Autowired
    43. private QuanziMQService quanziMQService;
    44. public PageResult queryPublishList(Integer page, Integer pageSize) {
    45. //分析:通过dubbo中的服务查询用户的好友动态
    46. //通过mysql查询用户的信息,回写到结果对象中(QuanZiVo)
    47. PageResult pageResult = new PageResult();
    48. pageResult.setPage(page);
    49. pageResult.setPagesize(pageSize);
    50. //直接从ThreadLocal中获取对象
    51. User user = UserThreadLocal.get();
    52. //通过dubbo查询数据
    53. PageInfo<Publish> pageInfo = this.quanZiApi.queryPublishList(user.getId(), page, pageSize);
    54. List<Publish> records = pageInfo.getRecords();
    55. if (CollUtil.isEmpty(records)) {
    56. return pageResult;
    57. }
    58. pageResult.setItems(this.fillQuanZiVo(records));
    59. return pageResult;
    60. }
    61. /**
    62. * 填充用户信息
    63. *
    64. * @param userInfo
    65. * @param quanZiVo
    66. */
    67. private void fillUserInfoToQuanZiVo(UserInfo userInfo, QuanZiVo quanZiVo) {
    68. BeanUtil.copyProperties(userInfo, quanZiVo, "id");
    69. quanZiVo.setGender(userInfo.getSex().name().toLowerCase());
    70. quanZiVo.setTags(StringUtils.split(userInfo.getTags(), ','));
    71. //当前用户
    72. User user = UserThreadLocal.get();
    73. quanZiVo.setCommentCount(0); //TODO 评论数
    74. quanZiVo.setDistance("1.2公里"); //TODO 距离
    75. quanZiVo.setHasLiked(this.quanZiApi.queryUserIsLike(user.getId(), quanZiVo.getId()) ? 1 : 0); //是否点赞(1是,0否)
    76. quanZiVo.setLikeCount(Convert.toInt(this.quanZiApi.queryLikeCount(quanZiVo.getId()))); //点赞数
    77. quanZiVo.setHasLoved(this.quanZiApi.queryUserIsLove(user.getId(), quanZiVo.getId()) ? 1 : 0); //是否喜欢(1是,0否)
    78. quanZiVo.setLoveCount(Convert.toInt(this.quanZiApi.queryLoveCount(quanZiVo.getId()))); //喜欢数
    79. }
    80. /**
    81. * 根据查询到的publish集合填充QuanZiVo对象
    82. *
    83. * @param records
    84. * @return
    85. */
    86. private List<QuanZiVo> fillQuanZiVo(List<Publish> records) {
    87. List<QuanZiVo> quanZiVoList = new ArrayList<>();
    88. records.forEach(publish -> {
    89. QuanZiVo quanZiVo = new QuanZiVo();
    90. quanZiVo.setId(publish.getId().toHexString());
    91. quanZiVo.setTextContent(publish.getText());
    92. quanZiVo.setImageContent(publish.getMedias().toArray(new String[]{}));
    93. quanZiVo.setUserId(publish.getUserId());
    94. quanZiVo.setCreateDate(RelativeDateFormat.format(new Date(publish.getCreated())));
    95. quanZiVoList.add(quanZiVo);
    96. });
    97. //查询用户信息
    98. List<Object> userIds = CollUtil.getFieldValues(records, "userId");
    99. List<UserInfo> userInfoList = this.userInfoService.queryUserInfoByUserIdList(userIds);
    100. for (QuanZiVo quanZiVo : quanZiVoList) {
    101. //找到对应的用户信息
    102. for (UserInfo userInfo : userInfoList) {
    103. if (quanZiVo.getUserId().longValue() == userInfo.getUserId().longValue()) {
    104. this.fillUserInfoToQuanZiVo(userInfo, quanZiVo);
    105. break;
    106. }
    107. }
    108. }
    109. return quanZiVoList;
    110. }
    111. /**
    112. * 发布动态
    113. *
    114. * @param textContent
    115. * @param location
    116. * @param latitude
    117. * @param longitude
    118. * @param multipartFile
    119. * @return
    120. */
    121. public String savePublish(String textContent,
    122. String location,
    123. String latitude,
    124. String longitude,
    125. MultipartFile[] multipartFile) {
    126. //查询当前的登录信息
    127. User user = UserThreadLocal.get();
    128. Publish publish = new Publish();
    129. publish.setUserId(user.getId());
    130. publish.setText(textContent);
    131. publish.setLocationName(location);
    132. publish.setLatitude(latitude);
    133. publish.setLongitude(longitude);
    134. publish.setSeeType(1);
    135. List<String> picUrls = new ArrayList<>();
    136. //图片上传
    137. for (MultipartFile file : multipartFile) {
    138. PicUploadResult picUploadResult = this.picUploadService.upload(file);
    139. picUrls.add(picUploadResult.getName());
    140. }
    141. publish.setMedias(picUrls);
    142. String publishId = this.quanZiApi.savePublish(publish);
    143. if(StrUtil.isNotEmpty(publishId)){
    144. //发送消息
    145. this.quanziMQService.publishMsg(publishId);
    146. }
    147. return publishId;
    148. }
    149. public PageResult queryRecommendPublishList(Integer page, Integer pageSize) {
    150. //分析:通过dubbo中的服务查询系统推荐动态
    151. //通过mysql查询用户的信息,回写到结果对象中(QuanZiVo)
    152. PageResult pageResult = new PageResult();
    153. pageResult.setPage(page);
    154. pageResult.setPagesize(pageSize);
    155. //直接从ThreadLocal中获取对象
    156. User user = UserThreadLocal.get();
    157. //通过dubbo查询数据
    158. PageInfo<Publish> pageInfo = this.quanZiApi.queryRecommendPublishList(user.getId(), page, pageSize);
    159. List<Publish> records = pageInfo.getRecords();
    160. if (CollUtil.isEmpty(records)) {
    161. return pageResult;
    162. }
    163. pageResult.setItems(this.fillQuanZiVo(records));
    164. return pageResult;
    165. }
    166. /**
    167. * 动态点赞
    168. * @param publishId
    169. * @return
    170. */
    171. public Long likeComment(String publishId) {
    172. User user = UserThreadLocal.get();
    173. Boolean result = this.quanZiApi.likeComment(user.getId(), publishId);
    174. if (result) {
    175. //发消息
    176. this.quanziMQService.likePublishMsg(publishId);
    177. //查询点赞数
    178. return this.quanZiApi.queryLikeCount(publishId);
    179. }
    180. return null;
    181. }
    182. /**
    183. * 动态的取消点赞
    184. *
    185. * @param publishId
    186. * @return
    187. */
    188. public Long disLikeComment(String publishId) {
    189. User user = UserThreadLocal.get();
    190. Boolean result = this.quanZiApi.disLikeComment(user.getId(), publishId);
    191. if (result) {
    192. //发消息
    193. this.quanziMQService.disLikePublishMsg(publishId);
    194. //查询点赞数
    195. return this.quanZiApi.queryLikeCount(publishId);
    196. }
    197. return null;
    198. }
    199. public Long loveComment(String publishId) {
    200. User user = UserThreadLocal.get();
    201. //喜欢
    202. Boolean result = this.quanZiApi.loveComment(user.getId(), publishId);
    203. if(result){
    204. //发消息
    205. this.quanziMQService.lovePublishMsg(publishId);
    206. //查询喜欢数
    207. return this.quanZiApi.queryLoveCount(publishId);
    208. }
    209. return null;
    210. }
    211. public Long disLoveComment(String publishId) {
    212. User user = UserThreadLocal.get();
    213. //取消喜欢
    214. Boolean result = this.quanZiApi.disLoveComment(user.getId(), publishId);
    215. if(result){
    216. //发消息
    217. this.quanziMQService.disLovePublishMsg(publishId);
    218. //查询喜欢数
    219. return this.quanZiApi.queryLoveCount(publishId);
    220. }
    221. return null;
    222. }
    223. public QuanZiVo queryById(String publishId) {
    224. Publish publish = this.quanZiApi.queryPublishById(publishId);
    225. if (publish == null) {
    226. return null;
    227. }
    228. //发消息
    229. this.quanziMQService.queryPublishMsg(publishId);
    230. return this.fillQuanZiVo(Arrays.asList(publish)).get(0);
    231. }
    232. /**
    233. * 查询评论列表
    234. *
    235. * @param publishId
    236. * @param page
    237. * @param pageSize
    238. * @return
    239. */
    240. public PageResult queryCommentList(String publishId, Integer page, Integer pageSize) {
    241. PageResult pageResult = new PageResult();
    242. pageResult.setPage(page);
    243. pageResult.setPagesize(pageSize);
    244. User user = UserThreadLocal.get();
    245. //查询评论列表数据
    246. PageInfo<Comment> pageInfo = this.quanZiApi.queryCommentList(publishId, page, pageSize);
    247. List<Comment> records = pageInfo.getRecords();
    248. if(CollUtil.isEmpty(records)){
    249. return pageResult;
    250. }
    251. //查询用户信息
    252. List<Object> userIdList = CollUtil.getFieldValues(records, "userId");
    253. List<UserInfo> userInfoList = this.userInfoService.queryUserInfoByUserIdList(userIdList);
    254. List<CommentVo> result = new ArrayList<>();
    255. for (Comment record : records) {
    256. CommentVo commentVo = new CommentVo();
    257. commentVo.setContent(record.getContent());
    258. commentVo.setId(record.getId().toHexString());
    259. commentVo.setCreateDate(DateUtil.format(new Date(record.getCreated()), "HH:mm"));
    260. //是否点赞
    261. commentVo.setHasLiked(this.quanZiApi.queryUserIsLike(user.getId(), commentVo.getId()) ? 1 : 0);
    262. //点赞数
    263. commentVo.setLikeCount(Convert.toInt(this.quanZiApi.queryLikeCount(commentVo.getId())));
    264. for (UserInfo userInfo : userInfoList) {
    265. if(ObjectUtil.equals(record.getUserId(), userInfo.getUserId())){
    266. commentVo.setAvatar(userInfo.getLogo());
    267. commentVo.setNickname(userInfo.getNickName());
    268. break;
    269. }
    270. }
    271. result.add(commentVo);
    272. }
    273. pageResult.setItems(result);
    274. return pageResult;
    275. }
    276. /**
    277. * 发表评论
    278. * @param publishId
    279. * @param content
    280. * @return
    281. */
    282. public Boolean saveComments(String publishId, String content) {
    283. User user = UserThreadLocal.get();
    284. Boolean result = this.quanZiApi.saveComment(user.getId(), publishId, content);
    285. if(result){
    286. //发消息
    287. this.quanziMQService.commentPublishMsg(publishId);
    288. }
    289. return result;
    290. }
    291. public PageResult queryAlbumList(Long userId, Integer page, Integer pageSize) {
    292. PageResult pageResult = new PageResult();
    293. pageResult.setPage(page);
    294. pageResult.setPagesize(pageSize);
    295. //查询数据
    296. PageInfo<Publish> pageInfo = this.quanZiApi.queryAlbumList(userId, page, pageSize);
    297. if(CollUtil.isEmpty(pageInfo.getRecords())){
    298. return pageResult;
    299. }
    300. //填充数据
    301. pageResult.setItems(this.fillQuanZiVo(pageInfo.getRecords()));
    302. return pageResult;
    303. }
    304. public List<VisitorsVo> queryVisitorsList() {
    305. User user = UserThreadLocal.get();
    306. List<Visitors> visitorsList = this.visitorsApi.queryMyVisitor(user.getId());
    307. if (CollUtil.isEmpty(visitorsList)) {
    308. return Collections.emptyList();
    309. }
    310. List<Object> userIds = CollUtil.getFieldValues(visitorsList, "visitorUserId");
    311. List<UserInfo> userInfoList = this.userInfoService.queryUserInfoByUserIdList(userIds);
    312. List<VisitorsVo> visitorsVoList = new ArrayList<>();
    313. for (Visitors visitor : visitorsList) {
    314. for (UserInfo userInfo : userInfoList) {
    315. if (ObjectUtil.equals(visitor.getVisitorUserId(), userInfo.getUserId())) {
    316. VisitorsVo visitorsVo = new VisitorsVo();
    317. visitorsVo.setAge(userInfo.getAge());
    318. visitorsVo.setAvatar(userInfo.getLogo());
    319. visitorsVo.setGender(userInfo.getSex().name().toLowerCase());
    320. visitorsVo.setId(userInfo.getUserId());
    321. visitorsVo.setNickname(userInfo.getNickName());
    322. visitorsVo.setTags(StringUtils.split(userInfo.getTags(), ','));
    323. visitorsVo.setFateValue(visitor.getScore().intValue());
    324. visitorsVoList.add(visitorsVo);
    325. break;
    326. }
    327. }
    328. }
    329. return visitorsVoList;
    330. }
    331. }

    3.5、接收消息

    接收消息的工作需要新创建my-tanhua-recommend工程,在此工程中完成相关的操作。

    3.5.1、创建my-tanhua-recommend工程

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>my-tanhua</artifactId>
    7. <groupId>cn.itcast.tanhua</groupId>
    8. <version>1.0-SNAPSHOT</version>
    9. </parent>
    10. <modelVersion>4.0.0</modelVersion>
    11. <artifactId>my-tanhua-recommend</artifactId>
    12. <dependencies>
    13. <!--引入interface依赖-->
    14. <dependency>
    15. <groupId>cn.itcast.tanhua</groupId>
    16. <artifactId>my-tanhua-dubbo-interface</artifactId>
    17. <version>1.0-SNAPSHOT</version>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.springframework.boot</groupId>
    21. <artifactId>spring-boot-starter-web</artifactId>
    22. </dependency>
    23. <dependency>
    24. <groupId>org.springframework.boot</groupId>
    25. <artifactId>spring-boot-starter-test</artifactId>
    26. <scope>test</scope>
    27. </dependency>
    28. <dependency>
    29. <groupId>org.springframework.boot</groupId>
    30. <artifactId>spring-boot-starter-data-redis</artifactId>
    31. </dependency>
    32. <dependency>
    33. <groupId>org.springframework.boot</groupId>
    34. <artifactId>spring-boot-starter-data-mongodb</artifactId>
    35. </dependency>
    36. <!--RocketMQ相关-->
    37. <dependency>
    38. <groupId>org.apache.rocketmq</groupId>
    39. <artifactId>rocketmq-spring-boot-starter</artifactId>
    40. </dependency>
    41. <dependency>
    42. <groupId>org.apache.rocketmq</groupId>
    43. <artifactId>rocketmq-client</artifactId>
    44. </dependency>
    45. <dependency>
    46. <groupId>org.projectlombok</groupId>
    47. <artifactId>lombok</artifactId>
    48. </dependency>
    49. <dependency>
    50. <groupId>joda-time</groupId>
    51. <artifactId>joda-time</artifactId>
    52. </dependency>
    53. <dependency>
    54. <groupId>cn.hutool</groupId>
    55. <artifactId>hutool-all</artifactId>
    56. </dependency>
    57. </dependencies>
    58. </project>

    3.5.2、配置文件

    application.properties

    1. spring.application.name = itcast-rocketmq
    2. server.port = 18082
    3. # RocketMQ相关配置
    4. rocketmq.name-server=192.168.31.81:9876
    5. rocketmq.producer.group=tanhua
    6. # mongodb相关配置
    7. spring.data.mongodb.username=tanhua
    8. spring.data.mongodb.password=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV
    9. spring.data.mongodb.authentication-database=admin
    10. spring.data.mongodb.database=tanhua
    11. spring.data.mongodb.port=27017
    12. spring.data.mongodb.host=192.168.31.81

    3.5.3、启动类

    1. package com.tanhua.recommend;
    2. import org.springframework.boot.SpringApplication;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. @SpringBootApplication
    5. public class RecommendApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(RecommendApplication.class, args);
    8. }
    9. }

    3.5.4、RecommendQuanZi

    存储到MongoDB的中的实体结构。

    1. package com.tanhua.recommend.pojo;
    2. import lombok.AllArgsConstructor;
    3. import lombok.Data;
    4. import lombok.NoArgsConstructor;
    5. import org.bson.types.ObjectId;
    6. import org.springframework.data.mongodb.core.mapping.Document;
    7. @Data
    8. @NoArgsConstructor
    9. @AllArgsConstructor
    10. @Document(collection = "recommend_quanzi")
    11. public class RecommendQuanZi {
    12. private ObjectId id;
    13. private Long userId;// 用户id
    14. private Long publishId; //动态id,需要转化为Long类型
    15. private Double score; //得分
    16. private Long date; //时间戳
    17. }

    3.5.5、QuanZiMsgConsumer

    1. package com.tanhua.recommend.msg;
    2. import cn.hutool.core.collection.CollUtil;
    3. import cn.hutool.core.util.ObjectUtil;
    4. import cn.hutool.core.util.StrUtil;
    5. import cn.hutool.json.JSONObject;
    6. import cn.hutool.json.JSONUtil;
    7. import com.tanhua.dubbo.server.pojo.Publish;
    8. import com.tanhua.recommend.pojo.RecommendQuanZi;
    9. import lombok.extern.slf4j.Slf4j;
    10. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    11. import org.apache.rocketmq.spring.core.RocketMQListener;
    12. import org.bson.types.ObjectId;
    13. import org.springframework.beans.factory.annotation.Autowired;
    14. import org.springframework.data.mongodb.core.MongoTemplate;
    15. import org.springframework.stereotype.Component;
    16. @Component
    17. @RocketMQMessageListener(topic = "tanhua-quanzi",
    18. consumerGroup = "tanhua-quanzi-consumer")
    19. @Slf4j
    20. public class QuanZiMsgConsumer implements RocketMQListener<String> {
    21. @Autowired
    22. private MongoTemplate mongoTemplate;
    23. @Override
    24. public void onMessage(String msg) {
    25. try {
    26. JSONObject jsonObject = JSONUtil.parseObj(msg);
    27. Long userId = jsonObject.getLong("userId");
    28. Long date = jsonObject.getLong("date");
    29. String publishId = jsonObject.getStr("publishId");
    30. Long pid = jsonObject.getLong("pid");
    31. Integer type = jsonObject.getInt("type");
    32. RecommendQuanZi recommendQuanZi = new RecommendQuanZi();
    33. recommendQuanZi.setUserId(userId);
    34. recommendQuanZi.setId(ObjectId.get());
    35. recommendQuanZi.setDate(date);
    36. recommendQuanZi.setPublishId(pid);
    37. //1-发动态,2-浏览动态, 3-点赞, 4-喜欢, 5-评论,6-取消点赞,7-取消喜欢
    38. switch (type) {
    39. case 1: {
    40. Publish publish = this.mongoTemplate.findById(new ObjectId(publishId), Publish.class);
    41. if (ObjectUtil.isNotEmpty(publish)) {
    42. double score = 0d;
    43. //获取图片数
    44. score += CollUtil.size(publish.getMedias());
    45. //获取文本的长度
    46. //文字长度:50以内1分,50~100之间2分,100以上3分
    47. int length = StrUtil.length(publish.getText());
    48. if (length >= 0 && length < 50) {
    49. score += 1;
    50. } else if (length < 100) {
    51. score += 2;
    52. } else {
    53. score += 3;
    54. }
    55. recommendQuanZi.setScore(score);
    56. }
    57. break;
    58. }
    59. case 2: {
    60. recommendQuanZi.setScore(1d);
    61. break;
    62. }
    63. case 3: {
    64. recommendQuanZi.setScore(5d);
    65. break;
    66. }
    67. case 4: {
    68. recommendQuanZi.setScore(8d);
    69. break;
    70. }
    71. case 5: {
    72. recommendQuanZi.setScore(10d);
    73. break;
    74. }
    75. case 6: {
    76. recommendQuanZi.setScore(-5d);
    77. break;
    78. }
    79. case 7: {
    80. recommendQuanZi.setScore(-8d);
    81. break;
    82. }
    83. default: {
    84. recommendQuanZi.setScore(0d);
    85. break;
    86. }
    87. }
    88. //数据保存到MongoDB中
    89. this.mongoTemplate.save(recommendQuanZi);
    90. } catch (Exception e) {
    91. log.error("处理消息出错!msg = " + msg, e);
    92. }
    93. }
    94. }

    1.7、测试

    测试方法:使用APP进行操作,可以看到在MongoDB中已经有数据写入。
    day10-实现推荐功能 - 图8

    4、部署推荐系统

    在推荐系统中,我们将基于前面写入到推荐表中的数据通过Spark进行计算,在Spark计算完成后将结果写入到Redis中,以供在业务系统中进行查询。
    推荐服务我们将基于docker的形式进行部署:

    1. #拉取镜像
    2. docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0
    3. #创建容器
    4. docker create --name tanhua-spark-quanzi \
    5. --env MONGODB_HOST=192.168.31.81 \
    6. --env MONGODB_PORT=27017 \
    7. --env MONGODB_USERNAME=tanhua \
    8. --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
    9. --env MONGODB_DATABASE=tanhua \
    10. --env MONGODB_COLLECTION=recommend_quanzi \
    11. --env SCHEDULE_PERIOD=10 \
    12. --env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
    13. registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0
    14. #参数说明
    15. #MONGODB_HOST mongodb服务的地址
    16. #MONGODB_PORT mongodb服务的端口
    17. #MONGODB_USERNAME mongodb服务的认证用户名
    18. #MONGODB_PASSWORD mongodb服务的认证密码
    19. #MONGODB_DATABASE mongodb连接的数据库
    20. #MONGODB_COLLECTION 操作表
    21. #SCHEDULE_PERIOD 下次执行时间间隔,但是为分,默认为10分钟
    22. #REDIS_NODES redis集群地址,也可以使用单节点
    23. #mongodb开启认证服务
    24. #docker create --name mongodb --restart=always -p 27017:27017 -v mongodb:/data/db mongo:4.0.3 --auth
    25. #启动服务,启动之后就会进行执行,在SCHEDULE_PERIOD时间后再次执行
    26. docker start tanhua-spark-quanzi
    27. #查看日志
    28. docker logs -f tanhua-spark-quanzi
    29. #执行完成后会将数据写入到redis中

    进入redis查看是否已经有数据: day10-实现推荐功能 - 图9

    5、小视频推荐

    小视频的推荐和动态推荐的实现逻辑非常的类似。

    5.1、动态计分规则

  • 发布+2

  • 点赞 +5
  • 评论 + 10

    5.2、发送消息

    5.2.1、VideoMQService

    1. package com.tanhua.server.service;
    2. import com.alibaba.dubbo.config.annotation.Reference;
    3. import com.tanhua.common.pojo.User;
    4. import com.tanhua.common.utils.UserThreadLocal;
    5. import com.tanhua.dubbo.server.api.VideoApi;
    6. import com.tanhua.dubbo.server.pojo.Video;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.apache.rocketmq.spring.core.RocketMQTemplate;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.stereotype.Service;
    11. import java.util.HashMap;
    12. import java.util.Map;
    13. @Service
    14. @Slf4j
    15. public class VideoMQService {
    16. @Autowired
    17. private RocketMQTemplate rocketMQTemplate;
    18. @Reference(version = "1.0.0")
    19. private VideoApi videoApi;
    20. /**
    21. * 发布小视频消息
    22. *
    23. * @return
    24. */
    25. public Boolean videoMsg(String videoId) {
    26. return this.sendMsg(videoId, 1);
    27. }
    28. /**
    29. * 点赞小视频
    30. *
    31. * @return
    32. */
    33. public Boolean likeVideoMsg(String videoId) {
    34. return this.sendMsg(videoId, 2);
    35. }
    36. /**
    37. * 取消点赞小视频
    38. *
    39. * @return
    40. */
    41. public Boolean disLikeVideoMsg(String videoId) {
    42. return this.sendMsg(videoId, 3);
    43. }
    44. /**
    45. * 评论小视频
    46. *
    47. * @return
    48. */
    49. public Boolean commentVideoMsg(String videoId) {
    50. return this.sendMsg(videoId, 4);
    51. }
    52. /**
    53. * 发送小视频操作相关的消息
    54. *
    55. * @param videoId
    56. * @param type 1-发动态,2-点赞, 3-取消点赞,4-评论
    57. * @return
    58. */
    59. private Boolean sendMsg(String videoId, Integer type) {
    60. try {
    61. User user = UserThreadLocal.get();
    62. Video video = this.videoApi.queryVideoById(videoId);
    63. //构建消息
    64. Map<String, Object> msg = new HashMap<>();
    65. msg.put("userId", user.getId());
    66. msg.put("date", System.currentTimeMillis());
    67. msg.put("videoId", videoId);
    68. msg.put("vid", video.getVid());
    69. msg.put("type", type);
    70. this.rocketMQTemplate.convertAndSend("tanhua-video", msg);
    71. } catch (Exception e) {
    72. log.error("发送消息失败! videoId = " + videoId + ", type = " + type, e);
    73. return false;
    74. }
    75. return true;
    76. }
    77. }

    4.3.2、VideoService

    1. package com.tanhua.server.service;
    2. import cn.hutool.core.collection.CollUtil;
    3. import cn.hutool.core.convert.Convert;
    4. import cn.hutool.core.util.ObjectUtil;
    5. import cn.hutool.core.util.StrUtil;
    6. import com.alibaba.dubbo.config.annotation.Reference;
    7. import com.github.tobato.fastdfs.domain.conn.FdfsWebServer;
    8. import com.github.tobato.fastdfs.domain.fdfs.StorePath;
    9. import com.github.tobato.fastdfs.service.FastFileStorageClient;
    10. import com.tanhua.common.pojo.User;
    11. import com.tanhua.common.pojo.UserInfo;
    12. import com.tanhua.common.service.PicUploadService;
    13. import com.tanhua.common.utils.UserThreadLocal;
    14. import com.tanhua.common.vo.PicUploadResult;
    15. import com.tanhua.dubbo.server.api.QuanZiApi;
    16. import com.tanhua.dubbo.server.api.VideoApi;
    17. import com.tanhua.dubbo.server.pojo.Video;
    18. import com.tanhua.dubbo.server.vo.PageInfo;
    19. import com.tanhua.server.vo.PageResult;
    20. import com.tanhua.server.vo.VideoVo;
    21. import lombok.extern.slf4j.Slf4j;
    22. import org.springframework.beans.factory.annotation.Autowired;
    23. import org.springframework.stereotype.Service;
    24. import org.springframework.web.multipart.MultipartFile;
    25. import java.io.IOException;
    26. import java.util.ArrayList;
    27. import java.util.List;
    28. @Service
    29. @Slf4j
    30. public class VideoService {
    31. @Reference(version = "1.0.0")
    32. private VideoApi videoApi;
    33. @Autowired
    34. private PicUploadService picUploadService;
    35. @Autowired
    36. protected FastFileStorageClient storageClient;
    37. @Autowired
    38. private FdfsWebServer fdfsWebServer;
    39. @Autowired
    40. private UserInfoService userInfoService;
    41. @Reference(version = "1.0.0")
    42. private QuanZiApi quanZiApi;
    43. @Autowired
    44. private QuanZiService quanZiService;
    45. @Autowired
    46. private VideoMQService videoMQService;
    47. /**
    48. * 小视频上传
    49. *
    50. * @param picFile 封面图片
    51. * @param videoFile 视频文件
    52. * @return
    53. */
    54. public Boolean saveVideo(MultipartFile picFile, MultipartFile videoFile) {
    55. User user = UserThreadLocal.get();
    56. Video video = new Video();
    57. video.setUserId(user.getId());
    58. video.setSeeType(1);
    59. try {
    60. //上传图片到阿里云oss
    61. PicUploadResult uploadResult = this.picUploadService.upload(picFile);
    62. video.setPicUrl(uploadResult.getName());
    63. //上传视频到FastDFS中
    64. StorePath storePath = this.storageClient.uploadFile(videoFile.getInputStream(),
    65. videoFile.getSize(),
    66. StrUtil.subAfter(videoFile.getOriginalFilename(), '.', true),
    67. null);
    68. video.setVideoUrl(fdfsWebServer.getWebServerUrl() + storePath.getFullPath());
    69. String videoId = this.videoApi.saveVideo(video);
    70. if(StrUtil.isNotEmpty(videoId)){
    71. //发送消息
    72. this.videoMQService.videoMsg(videoId);
    73. }
    74. return StrUtil.isNotEmpty(videoId);
    75. } catch (IOException e) {
    76. log.error("上传小视频出错~ userId = " + user.getId() + ", file = " + videoFile.getOriginalFilename(), e);
    77. }
    78. return null;
    79. }
    80. public PageResult queryVideoList(Integer page, Integer pageSize) {
    81. User user = UserThreadLocal.get();
    82. PageResult pageResult = new PageResult();
    83. pageResult.setPage(page);
    84. pageResult.setPagesize(pageSize);
    85. PageInfo<Video> pageInfo = this.videoApi.queryVideoList(user.getId(), page, pageSize);
    86. List<Video> records = pageInfo.getRecords();
    87. if (CollUtil.isEmpty(records)) {
    88. return pageResult;
    89. }
    90. //查询用户信息
    91. List<Object> userIds = CollUtil.getFieldValues(records, "userId");
    92. List<UserInfo> userInfoList = this.userInfoService.queryUserInfoByUserIdList(userIds);
    93. List<VideoVo> videoVoList = new ArrayList<>();
    94. for (Video record : records) {
    95. VideoVo videoVo = new VideoVo();
    96. videoVo.setUserId(record.getUserId());
    97. videoVo.setCover(record.getPicUrl());
    98. videoVo.setVideoUrl(record.getVideoUrl());
    99. videoVo.setId(record.getId().toHexString());
    100. videoVo.setSignature("我就是我~"); //TODO 签名
    101. videoVo.setCommentCount(Convert.toInt(this.quanZiApi.queryCommentCount(videoVo.getId()))); //评论数
    102. videoVo.setHasFocus(this.videoApi.isFollowUser(user.getId(), videoVo.getUserId()) ? 1 : 0); //是否关注
    103. videoVo.setHasLiked(this.quanZiApi.queryUserIsLike(user.getId(), videoVo.getId()) ? 1 : 0); //是否点赞(1是,0否)
    104. videoVo.setLikeCount(Convert.toInt(this.quanZiApi.queryLikeCount(videoVo.getId())));//点赞数
    105. //填充用户信息
    106. for (UserInfo userInfo : userInfoList) {
    107. if (ObjectUtil.equals(videoVo.getUserId(), userInfo.getUserId())) {
    108. videoVo.setNickname(userInfo.getNickName());
    109. videoVo.setAvatar(userInfo.getLogo());
    110. break;
    111. }
    112. }
    113. videoVoList.add(videoVo);
    114. }
    115. pageResult.setItems(videoVoList);
    116. return pageResult;
    117. }
    118. /**
    119. * 点赞
    120. *
    121. * @param videoId
    122. * @return
    123. */
    124. public Long likeComment(String videoId) {
    125. User user = UserThreadLocal.get();
    126. Boolean result = this.quanZiApi.likeComment(user.getId(), videoId);
    127. if (result) {
    128. //发送消息
    129. this.videoMQService.likeVideoMsg(videoId);
    130. return this.quanZiApi.queryLikeCount(videoId);
    131. }
    132. return null;
    133. }
    134. /**
    135. * 取消点赞
    136. *
    137. * @param videoId
    138. * @return
    139. */
    140. public Long disLikeComment(String videoId) {
    141. User user = UserThreadLocal.get();
    142. Boolean result = this.quanZiApi.disLikeComment(user.getId(), videoId);
    143. if (result) {
    144. //发送消息
    145. this.videoMQService.disLikeVideoMsg(videoId);
    146. return this.quanZiApi.queryLikeCount(videoId);
    147. }
    148. return null;
    149. }
    150. public Boolean saveComment(String videoId, String content) {
    151. Boolean result = this.quanZiService.saveComments(videoId, content);
    152. if(result){
    153. //发送消息
    154. this.videoMQService.commentVideoMsg(videoId);
    155. }
    156. return result;
    157. }
    158. public PageResult queryCommentList(String videoId, Integer page, Integer pageSize) {
    159. return this.quanZiService.queryCommentList(videoId, page, pageSize);
    160. }
    161. /**
    162. * 关注用户
    163. *
    164. * @param userId
    165. * @return
    166. */
    167. public Boolean followUser(Long userId) {
    168. User user = UserThreadLocal.get();
    169. return this.videoApi.followUser(user.getId(), userId);
    170. }
    171. /**
    172. * 取消关注
    173. *
    174. * @param userId
    175. * @return
    176. */
    177. public Boolean disFollowUser(Long userId) {
    178. User user = UserThreadLocal.get();
    179. return this.videoApi.disFollowUser(user.getId(), userId);
    180. }
    181. }

    5.3、接收消息

    5.3.1、RecommendVideo

    1. package com.tanhua.recommend.pojo;
    2. import lombok.AllArgsConstructor;
    3. import lombok.Data;
    4. import lombok.NoArgsConstructor;
    5. import org.bson.types.ObjectId;
    6. import org.springframework.data.mongodb.core.mapping.Document;
    7. @Data
    8. @NoArgsConstructor
    9. @AllArgsConstructor
    10. @Document(collection = "recommend_video")
    11. public class RecommendVideo {
    12. private ObjectId id;
    13. private Long userId;// 用户id
    14. private Long videoId; //视频id,需要转化为Long类型
    15. private Double score; //得分
    16. private Long date; //时间戳
    17. }

    5.3.2、VideoMsgConsumer

    1. package com.tanhua.recommend.msg;
    2. import cn.hutool.json.JSONObject;
    3. import cn.hutool.json.JSONUtil;
    4. import com.tanhua.recommend.pojo.RecommendVideo;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    7. import org.apache.rocketmq.spring.core.RocketMQListener;
    8. import org.bson.types.ObjectId;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.data.mongodb.core.MongoTemplate;
    11. import org.springframework.stereotype.Component;
    12. @Component
    13. @RocketMQMessageListener(topic = "tanhua-video",
    14. consumerGroup = "tanhua-video-consumer")
    15. @Slf4j
    16. public class VideoMsgConsumer implements RocketMQListener<String> {
    17. @Autowired
    18. private MongoTemplate mongoTemplate;
    19. @Override
    20. public void onMessage(String msg) {
    21. try {
    22. JSONObject jsonObject = JSONUtil.parseObj(msg);
    23. Long userId = jsonObject.getLong("userId");
    24. Long vid = jsonObject.getLong("vid");
    25. Integer type = jsonObject.getInt("type");
    26. //1-发动态,2-点赞, 3-取消点赞,4-评论
    27. RecommendVideo recommendVideo = new RecommendVideo();
    28. recommendVideo.setUserId(userId);
    29. recommendVideo.setId(ObjectId.get());
    30. recommendVideo.setDate(System.currentTimeMillis());
    31. recommendVideo.setVideoId(vid);
    32. switch (type) {
    33. case 1: {
    34. recommendVideo.setScore(2d);
    35. break;
    36. }
    37. case 2: {
    38. recommendVideo.setScore(5d);
    39. break;
    40. }
    41. case 3: {
    42. recommendVideo.setScore(-5d);
    43. break;
    44. }
    45. case 4: {
    46. recommendVideo.setScore(10d);
    47. break;
    48. }
    49. default: {
    50. recommendVideo.setScore(0d);
    51. break;
    52. }
    53. }
    54. this.mongoTemplate.save(recommendVideo);
    55. } catch (Exception e) {
    56. log.error("处理小视频消息失败~" + msg, e);
    57. }
    58. }
    59. }

    5.3.3、测试

    day10-实现推荐功能 - 图10
    可以看到,用户1对于视频有点赞、取消点赞、评论等操作。

    5.4、部署推荐服务

    1. #拉取镜像
    2. docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0
    3. #创建容器
    4. docker create --name tanhua-spark-video \
    5. --env MONGODB_HOST=192.168.31.81 \
    6. --env MONGODB_PORT=27017 \
    7. --env MONGODB_USERNAME=tanhua \
    8. --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
    9. --env MONGODB_DATABASE=tanhua \
    10. --env MONGODB_COLLECTION=recommend_video \
    11. --env SCHEDULE_PERIOD=10 \
    12. --env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
    13. registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0
    14. #启动服务
    15. docker start tanhua-spark-video
    16. #查看日志
    17. docker logs -f tanhua-spark-video
    测试:
    day10-实现推荐功能 - 图11