本项目主要用于互联网电商企业中,使用Spark技术开发的大数据统计分析平台,对电商网站的各种用户行为(访问行为、购物行为、广告点击行为等)进行复杂的分析。用统计分析出来的数据,辅助公司中的PM(产品经理)、数据分析师以及管理人员分析现有产品的情况,并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务。最终达到用大数据技术来帮助提升公司的业绩、营业额以及市场占有率的目标。

本项目使用了Spark技术生态栈中最常用的三个技术框架,Spark Core、Spark SQL和Spark Streaming,进行离线计算和实时计算业务模块的开发。实现了包括用户访问session分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计4个业务模块。

在项目中,重点介绍符合实际企业项目中的性能调优、troubleshooting以及数据倾斜等知识和技术,同时以企业级大数据项目开发流程贯穿每个业务模块的讲解,涵盖了项目开发全流程,包括需求分析、方案设计、数据设计、编码实现、测试以及性能调优等环节,全面还原真实大数据项目的开发流程。

模块介绍

用户访问session分析

该模块主要是对用户访问session进行统计分析,包括session的聚合指标计算、按时间比例随机抽取session、获取每天点击、下单和购买排名前10的品类、并获取top10品类的点击量排名前10的session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略做出调整。主要使用Spark Core实现。

页面单跳转化率统计

该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用Spark Core实现。

热门商品离线统计

该模块主要实现每天统计出各个区域的top3热门商品。然后使用Oozie进行离线统计任务的定时调度;使用Zeppeline进行数据可视化的报表展示。该模块可以让企业管理层看到公司售卖的商品的整体情况,从而对公司的商品相关的战略进行调整。主要使用Spark SQL实现。

参考: https://github.com/Erik-ly/SprakProject
参考:https://blog.csdn.net/u012318074/category_6744423.html

用户访问session分析spark作业介绍

接收用户创建的分析任务,用户可能指定的条件如下:
1. 时间范围:起始日期-结束日期
2. 性别:男或女
3. 年龄范围
4. 职业:多选
5. 城市:多选
6. 搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件
7. 点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件
Spark作业工作过程
J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param字段中,,接着J2EE平台执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本,spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数传递给spark作业的main函数,参数就封装在main函数得到args数组中, 这是spark本事提供的特性。

  1. package com.erik.sparkproject.spark;
  2. import java.util.Iterator;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.SparkContext;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import org.apache.spark.sql.DataFrame;
  10. import org.apache.spark.sql.Row;
  11. import org.apache.spark.sql.SQLContext;
  12. import org.apache.spark.sql.hive.HiveContext;
  13. import com.alibaba.fastjson.JSONObject;
  14. import com.erik.sparkproject.conf.ConfigurationManager;
  15. import com.erik.sparkproject.constant.Constants;
  16. import com.erik.sparkproject.dao.ITaskDAO;
  17. import com.erik.sparkproject.domain.Task;
  18. import com.erik.sparkproject.impl.DAOFactory;
  19. import com.erik.sparkproject.test.MockData;
  20. import com.erik.sparkproject.util.*;
  21. import scala.Tuple2;
  22. /**
  23. * @author Erik
  24. *
  25. */
  26. public class UserVisitSessionAnalyzeSpark {
  27. public static void main(String[] args) {
  28. //构建spark上下文
  29. //首先在Constants.java中设置spark作业相关的常量
  30. //String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";
  31. //保存Constants.java配置
  32. SparkConf conf = new SparkConf()
  33. .setAppName(Constants.SPARK_APP_NAME)
  34. .setMaster("local");
  35. JavaSparkContext sc = new JavaSparkContext(conf);
  36. SQLContext sqlContext = getSQLContext(sc.sc());
  37. //生成模拟测试数据
  38. mockData(sc, sqlContext);
  39. //创建需要使用的DAO组件
  40. ITaskDAO taskDAO = DAOFactory.getTaskDAO();
  41. //那么就首先得查询出来指定的任务,并获取任务的查询参数
  42. long taskid = ParamUtils.getTaskIdFromArgs(args);
  43. Task task = taskDAO.findById(taskid);
  44. JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
  45. //如果要进行session粒度的数据聚合,
  46. //首先要从user_visit_action表中,查询出来指定日期范围内的数据
  47. JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);
  48. //聚合
  49. //首先,可以将行为数据按照session_id进行groupByKey分组
  50. //此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join
  51. //然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息
  52. JavaPairRDD<String, String> sessionid2AggrInfoRDD =
  53. aggregateBySession(sqlContext, actionRDD);
  54. //关闭spark上下文
  55. sc.close();
  56. }
  57. /**
  58. * 获取SQLContext
  59. * 如果在本地测试环境的话,那么久生成SQLC哦那text对象
  60. *如果在生产环境运行的话,那么就生成HiveContext对象
  61. * @param sc SparkContext
  62. * @return SQLContext
  63. */
  64. private static SQLContext getSQLContext(SparkContext sc) {
  65. //在my.properties中配置
  66. //spark.local=true(打包之前改为flase)
  67. //在ConfigurationManager.java中添加
  68. //public static Boolean getBoolean(String key) {
  69. // String value = getProperty(key);
  70. // try {
  71. // return Boolean.valueOf(value);
  72. // } catch (Exception e) {
  73. // e.printStackTrace();
  74. // }
  75. // return false;
  76. //}
  77. //在Contants.java中添加
  78. //String SPARK_LOCAL = "spark.local";
  79. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  80. if(local) {
  81. return new SQLContext(sc);
  82. }else {
  83. return new HiveContext(sc);
  84. }
  85. }
  86. /**
  87. * 生成模拟数据
  88. * 只有是本地模式,才会生成模拟数据
  89. * @param sc
  90. * @param sqlContext
  91. */
  92. private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {
  93. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  94. if(local) {
  95. MockData.mock(sc, sqlContext);
  96. }
  97. }
  98. /**
  99. * 获取指定日期范围内的用户访问行为数据
  100. * @param sqlContext SQLContext
  101. * @param taskParam 任务参数
  102. * @return 行为数据RDD
  103. */
  104. private static JavaRDD<Row> getActionRDDByDateRange(
  105. SQLContext sqlContext, JSONObject taskParam) {
  106. //先在Constants.java中添加任务相关的常量
  107. //String PARAM_START_DATE = "startDate";
  108. //String PARAM_END_DATE = "endDate";
  109. String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
  110. String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
  111. String sql = "select * "
  112. + "from user_visit_action"
  113. + "where date>='" + startDate + "'"
  114. + "and date<='" + endDate + "'";
  115. DataFrame actionDF = sqlContext.sql(sql);
  116. return actionDF.javaRDD();
  117. }
  118. /**
  119. * 对行为数据按sesssion粒度进行聚合
  120. * @param actionRDD 行为数据RDD
  121. * @return session粒度聚合数据
  122. */
  123. private static JavaPairRDD<String, String> aggregateBySession(
  124. SQLContext sqlContext, JavaRDD<Row> actionRDD) {
  125. //现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
  126. //现在需要将这个Row映射成<sessionid,Row>的格式
  127. JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(
  128. /**
  129. * PairFunction
  130. * 第一个参数,相当于是函数的输入
  131. * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
  132. */
  133. new PairFunction<Row, String, Row>() {
  134. private static final long serialVersionUID = 1L;
  135. public Tuple2<String, Row> call(Row row) throws Exception {
  136. //按照MockData.java中字段顺序获取
  137. //此时需要拿到session_id,序号是2
  138. return new Tuple2<String, Row>(row.getString(2), row);
  139. }
  140. });
  141. //对行为数据按照session粒度进行分组
  142. JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
  143. sessionid2ActionRDD.groupByKey();
  144. //对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
  145. //到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
  146. JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(
  147. new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
  148. private static final long serialVersionUID = 1L;
  149. public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
  150. throws Exception {
  151. String sessionid = tuple._1;
  152. Iterator<Row> iterator = tuple._2.iterator();
  153. StringBuffer searchKeywordsBuffer = new StringBuffer("");
  154. StringBuffer clickCategoryIdsBuffer = new StringBuffer("");
  155. Long userid = null;
  156. //遍历session所有的访问行为
  157. while(iterator.hasNext()) {
  158. //提取每个 访问行为的搜索词字段和点击品类字段
  159. Row row = iterator.next();
  160. if(userid == null) {
  161. userid = row.getLong(1);
  162. }
  163. String searchKeyword = row.getString(5);
  164. long clickCategoryId = row.getLong(6);
  165. //实际上这里要对数据说明一下
  166. //并不是每一行访问行为都有searchKeyword和clickCategoryId两个字段的
  167. //其实,只有搜索行为是有searchKeyword字段的
  168. //只有点击品类的行为是有clickCaregoryId字段的
  169. //所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的
  170. //所以是否将搜索词点击品类id拼接到字符串中去
  171. //首先要满足不能是null值
  172. //其次,之前的字符串中还没有搜索词或者点击品类id
  173. if(StringUtils.isNotEmpty(searchKeyword)) {
  174. if(!searchKeywordsBuffer.toString().comains(searchKeyword)) {
  175. searchKeywordsBuffer.append(searchKeyword + ",");
  176. }
  177. }
  178. if(!clickCategoryIdsBuffer.toString().contains(
  179. String.valueOf(clickCategoryId))) {
  180. clickCategoryIdsBuffer.append(clickCategoryId + ",");
  181. }
  182. }
  183. //StringUtils引入的包是import com.erik.sparkproject.util.trimComma;
  184. String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
  185. String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());
  186. //返回的数据即是<sessionid, partAggrInfo>
  187. //但是,这一步聚合后,其实还需要将每一行数据,根对应的用户信息进行聚合
  188. //问题来了,如果是跟用户信息进行聚合的话,那么key就不应该是sessionid,而应该是userid
  189. //才能够跟<userid, Row>格式的用户信息进行聚合
  190. //如果我们这里直接返回<sessionid, partAggrInfo>,还得再做一次mapToPair算子
  191. //将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举
  192. //所以,我们这里其实可以直接返回数据格式就是<userid,partAggrInfo>
  193. //然后在直接将返回的Tuple的key设置成sessionid
  194. //最后的数据格式,还是<sessionid,fullAggrInfo>
  195. //聚合数据,用什么样的格式进行拼接?
  196. //我们这里统一定义,使用key=value|key=vale
  197. //在Constants.java中定义spark作业相关的常量
  198. //String FIELD_SESSION_ID = "sessionid";
  199. //String FIELD_SEARCH_KEYWORDS = "searchKeywords";
  200. //String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
  201. String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
  202. + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
  203. + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds;
  204. return new Tuple2<Long, String>(userid, partAggrInfo);
  205. }
  206. });
  207. //查询所有用户数据
  208. String sql = "select * from user_info";
  209. JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();
  210. JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(
  211. new PairFunction<Row, Long, Row>(){
  212. private static final long serialVersionUID = 1L;
  213. public Tuple2<Long, Row> call(Row row) throws Exception {
  214. return new Tuple2<Long, Row>(row.getLong(0), row);
  215. }
  216. });
  217. //将session粒度聚合数据,与用户信息进行join
  218. JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
  219. userid2PartAggrInfoRDD.join(userid2InfoRDD);
  220. //对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
  221. JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(
  222. new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {
  223. private static final long serialVersionUID = 1L;
  224. public Tuple2<String, String> call(
  225. Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {
  226. String partAggrInfo = tuple._2._1;
  227. Row userInfoRow = tuple._2._2;
  228. String sessionid = StringUtils.getFieldFromConcatString(
  229. partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
  230. int age = userInfoRow.getInt(3);
  231. String professional = userInfoRow.getString(4);
  232. String city = userInfoRow.getString(5);
  233. String sex = userInfoRow.getString(6);
  234. //在Constants.java中添加以下常量
  235. //String FIELD_AGE = "age";
  236. //String FIELD_PROFESSIONAL = "professional";
  237. //String FIELD_CITY = "city";
  238. //String FIELD_SEX = "sex";
  239. String fullAggrInfo = partAggrInfo + "|"
  240. + Constants.FIELD_AGE + "=" + age + "|"
  241. + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
  242. + Constants.FIELD_CITY + "=" + city + "|"
  243. + Constants.FIELD_SEX + "=" + sex ;
  244. return new Tuple2<String, String>(sessionid, fullAggrInfo);
  245. }
  246. });
  247. return sessionid2FullAggrInfoRDD;
  248. }
  249. }

按筛选参数对session粒度聚合数据进行过滤

  1. package com.erik.sparkproject.spark;
  2. import java.util.Iterator;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.SparkContext;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.Function;
  9. import org.apache.spark.api.java.function.PairFunction;
  10. import org.apache.spark.sql.DataFrame;
  11. import org.apache.spark.sql.Row;
  12. import org.apache.spark.sql.SQLContext;
  13. import org.apache.spark.sql.hive.HiveContext;
  14. import com.alibaba.fastjson.JSONObject;
  15. import com.erik.sparkproject.conf.ConfigurationManager;
  16. import com.erik.sparkproject.constant.Constants;
  17. import com.erik.sparkproject.dao.ITaskDAO;
  18. import com.erik.sparkproject.domain.Task;
  19. import com.erik.sparkproject.impl.DAOFactory;
  20. import com.erik.sparkproject.test.MockData;
  21. import com.erik.sparkproject.util.*;
  22. import scala.Tuple2;
  23. /**
  24. *
  25. * @author Erik
  26. *
  27. */
  28. public class UserVisitSessionAnalyzeSpark {
  29. public static void main(String[] args) {
  30. args = new String[]{"2"};
  31. //构建spark上下文
  32. //首先在Constants.java中设置spark作业相关的常量
  33. //String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";
  34. //保存Constants.java配置
  35. SparkConf conf = new SparkConf()
  36. .setAppName(Constants.SPARK_APP_NAME)
  37. .setMaster("local");
  38. JavaSparkContext sc = new JavaSparkContext(conf);
  39. SQLContext sqlContext = getSQLContext(sc.sc());
  40. //生成模拟测试数据
  41. mockData(sc, sqlContext);
  42. //创建需要使用的DAO组件
  43. ITaskDAO taskDAO = DAOFactory.getTaskDAO();
  44. //那么就首先得查询出来指定的任务,并获取任务的查询参数
  45. long taskid = ParamUtils.getTaskIdFromArgs(args);
  46. Task task = taskDAO.findById(taskid);
  47. JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
  48. //如果要进行session粒度的数据聚合,
  49. //首先要从user_visit_action表中,查询出来指定日期范围内的数据
  50. JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);
  51. //聚合
  52. //首先,可以将行为数据按照session_id进行groupByKey分组
  53. //此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join
  54. //然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息
  55. //到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,
  56. //clickCategoryIds,age,professional,city,sex)>
  57. JavaPairRDD<String, String> sessionid2AggrInfoRDD =
  58. aggregateBySession(sqlContext, actionRDD);
  59. //接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
  60. //相当于我们自己编写的算子,是要访问外面的任务参数对象的
  61. //匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的
  62. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD =
  63. filterSession(sessionid2AggrInfoRDD, taskParam);
  64. //关闭spark上下文
  65. sc.close();
  66. }
  67. /**
  68. * 获取SQLContext
  69. * 如果在本地测试环境的话,那么久生成SQLC哦那text对象
  70. *如果在生产环境运行的话,那么就生成HiveContext对象
  71. * @param sc SparkContext
  72. * @return SQLContext
  73. */
  74. private static SQLContext getSQLContext(SparkContext sc) {
  75. //在my.properties中配置
  76. //spark.local=true(打包之前改为flase)
  77. //在ConfigurationManager.java中添加
  78. //public static Boolean getBoolean(String key) {
  79. // String value = getProperty(key);
  80. // try {
  81. // return Boolean.valueOf(value);
  82. // } catch (Exception e) {
  83. // e.printStackTrace();
  84. // }
  85. // return false;
  86. //}
  87. //在Contants.java中添加
  88. //String SPARK_LOCAL = "spark.local";
  89. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  90. if(local) {
  91. return new SQLContext(sc);
  92. }else {
  93. return new HiveContext(sc);
  94. }
  95. }
  96. /**
  97. * 生成模拟数据
  98. * 只有是本地模式,才会生成模拟数据
  99. * @param sc
  100. * @param sqlContext
  101. */
  102. private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {
  103. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  104. if(local) {
  105. MockData.mock(sc, sqlContext);
  106. }
  107. }
  108. /**
  109. * 获取指定日期范围内的用户访问行为数据
  110. * @param sqlContext SQLContext
  111. * @param taskParam 任务参数
  112. * @return 行为数据RDD
  113. */
  114. private static JavaRDD<Row> getActionRDDByDateRange(
  115. SQLContext sqlContext, JSONObject taskParam) {
  116. //先在Constants.java中添加任务相关的常量
  117. //String PARAM_START_DATE = "startDate";
  118. //String PARAM_END_DATE = "endDate";
  119. String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
  120. String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
  121. String sql = "select * "
  122. + "from user_visit_action"
  123. + "where date>='" + startDate + "'"
  124. + "and date<='" + endDate + "'";
  125. DataFrame actionDF = sqlContext.sql(sql);
  126. return actionDF.javaRDD();
  127. }
  128. /**
  129. * 对行为数据按sesssion粒度进行聚合
  130. * @param actionRDD 行为数据RDD
  131. * @return session粒度聚合数据
  132. */
  133. private static JavaPairRDD<String, String> aggregateBySession(
  134. SQLContext sqlContext, JavaRDD<Row> actionRDD) {
  135. //现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
  136. //现在需要将这个Row映射成<sessionid,Row>的格式
  137. JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(
  138. /**
  139. * PairFunction
  140. * 第一个参数,相当于是函数的输入
  141. * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
  142. */
  143. new PairFunction<Row, String, Row>() {
  144. private static final long serialVersionUID = 1L;
  145. public Tuple2<String, Row> call(Row row) throws Exception {
  146. //按照MockData.java中字段顺序获取
  147. //此时需要拿到session_id,序号是2
  148. return new Tuple2<String, Row>(row.getString(2), row);
  149. }
  150. });
  151. //对行为数据按照session粒度进行分组
  152. JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
  153. sessionid2ActionRDD.groupByKey();
  154. //对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
  155. //到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
  156. JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(
  157. new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
  158. private static final long serialVersionUID = 1L;
  159. public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
  160. throws Exception {
  161. String sessionid = tuple._1;
  162. Iterator<Row> iterator = tuple._2.iterator();
  163. StringBuffer searchKeywordsBuffer = new StringBuffer("");
  164. StringBuffer clickCategoryIdsBuffer = new StringBuffer("");
  165. Long userid = null;
  166. //遍历session所有的访问行为
  167. while(iterator.hasNext()) {
  168. //提取每个 访问行为的搜索词字段和点击品类字段
  169. Row row = iterator.next();
  170. if(userid == null) {
  171. userid = row.getLong(1);
  172. }
  173. String searchKeyword = row.getString(5);
  174. Long clickCategoryId = row.getLong(6);
  175. //实际上这里要对数据说明一下
  176. //并不是每一行访问行为都有searchKeyword和clickCategoryId两个字段的
  177. //其实,只有搜索行为是有searchKeyword字段的
  178. //只有点击品类的行为是有clickCaregoryId字段的
  179. //所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的
  180. //所以是否将搜索词点击品类id拼接到字符串中去
  181. //首先要满足不能是null值
  182. //其次,之前的字符串中还没有搜索词或者点击品类id
  183. if(StringUtils.isNotEmpty(searchKeyword)) {
  184. if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
  185. searchKeywordsBuffer.append(searchKeyword + ",");
  186. }
  187. }
  188. if(clickCategoryId != null) {
  189. if(!clickCategoryIdsBuffer.toString().contains(
  190. String.valueOf(clickCategoryId))) {
  191. clickCategoryIdsBuffer.append(clickCategoryId + ",");
  192. }
  193. } }
  194. //StringUtils引入的包是import com.erik.sparkproject.util.trimComma;
  195. String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
  196. String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());
  197. //返回的数据即是<sessionid, partAggrInfo>
  198. //但是,这一步聚合后,其实还需要将每一行数据,根对应的用户信息进行聚合
  199. //问题来了,如果是跟用户信息进行聚合的话,那么key就不应该是sessionid,而应该是userid
  200. //才能够跟<userid, Row>格式的用户信息进行聚合
  201. //如果我们这里直接返回<sessionid, partAggrInfo>,还得再做一次mapToPair算子
  202. //将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举
  203. //所以,我们这里其实可以直接返回数据格式就是<userid,partAggrInfo>
  204. //然后在直接将返回的Tuple的key设置成sessionid
  205. //最后的数据格式,还是<sessionid,fullAggrInfo>
  206. //聚合数据,用什么样的格式进行拼接?
  207. //我们这里统一定义,使用key=value|key=vale
  208. //在Constants.java中定义spark作业相关的常量
  209. //String FIELD_SESSION_ID = "sessionid";
  210. //String FIELD_SEARCH_KEYWORDS = "searchKeywords";
  211. //String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
  212. String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
  213. + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
  214. + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds;
  215. return new Tuple2<Long, String>(userid, partAggrInfo);
  216. }
  217. });
  218. //查询所有用户数据
  219. String sql = "select * from user_info";
  220. JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();
  221. JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(
  222. new PairFunction<Row, Long, Row>(){
  223. private static final long serialVersionUID = 1L;
  224. public Tuple2<Long, Row> call(Row row) throws Exception {
  225. return new Tuple2<Long, Row>(row.getLong(0), row);
  226. }
  227. });
  228. //将session粒度聚合数据,与用户信息进行join
  229. JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
  230. userid2PartAggrInfoRDD.join(userid2InfoRDD);
  231. //对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
  232. JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(
  233. new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {
  234. private static final long serialVersionUID = 1L;
  235. public Tuple2<String, String> call(
  236. Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {
  237. String partAggrInfo = tuple._2._1;
  238. Row userInfoRow = tuple._2._2;
  239. String sessionid = StringUtils.getFieldFromConcatString(
  240. partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
  241. int age = userInfoRow.getInt(3);
  242. String professional = userInfoRow.getString(4);
  243. String city = userInfoRow.getString(5);
  244. String sex = userInfoRow.getString(6);
  245. //在Constants.java中添加以下常量
  246. //String FIELD_AGE = "age";
  247. //String FIELD_PROFESSIONAL = "professional";
  248. //String FIELD_CITY = "city";
  249. //String FIELD_SEX = "sex";
  250. String fullAggrInfo = partAggrInfo + "|"
  251. + Constants.FIELD_AGE + "=" + age + "|"
  252. + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
  253. + Constants.FIELD_CITY + "=" + city + "|"
  254. + Constants.FIELD_SEX + "=" + sex ;
  255. return new Tuple2<String, String>(sessionid, fullAggrInfo);
  256. }
  257. });
  258. return sessionid2FullAggrInfoRDD;
  259. }
  260. /**
  261. * 过滤session数据
  262. * @param sessionid2AggrInfoRDD
  263. * @return
  264. */
  265. private static JavaPairRDD<String, String> filterSession(
  266. JavaPairRDD<String, String> sessionid2AggrInfoRDD,
  267. final JSONObject taskParam) {
  268. //为了使用后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
  269. String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  270. String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  271. String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
  272. String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
  273. String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
  274. String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
  275. String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);
  276. String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
  277. + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
  278. + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
  279. + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
  280. + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
  281. + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
  282. + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : "");
  283. if (_parameter.endsWith("\\|")) {
  284. _parameter = _parameter.substring(0, _parameter.length() - 1);
  285. }
  286. final String parameter = _parameter;
  287. //根据筛选参数进行过滤
  288. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
  289. new Function<Tuple2<String, String>, Boolean>() {
  290. private static final long serialVersionUID = 1L;
  291. public Boolean call(Tuple2<String, String> tuple) throws Exception {
  292. //首先,从tuple中,获取聚合数据
  293. String aggrInfo = tuple._2;
  294. //接着,依次按照筛选条件进行过滤
  295. //按照年龄范围进行过滤(startAge、endAge)
  296. //先在Constants.java中添加常量
  297. //String PARAM_START_AGE = "startAge";
  298. //String PARAM_END_AGE = "endage";
  299. //String PARAM_PROFESSIONALS = "professionals";
  300. //String PARAM_CITIES = "cities";
  301. //String PARAM_SEX = "sex";
  302. //String PARAM_KEYWORDS = "keywords";
  303. //String PARAM_CATEGORY_IDS = "categoryIds";
  304. if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
  305. parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
  306. return false;
  307. }
  308. //按照职业范围进行过滤(professionals)
  309. if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,
  310. parameter, Constants.PARAM_PROFESSIONALS)) {
  311. return false;
  312. }
  313. //按照城市范围进行过滤(cities)
  314. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
  315. parameter, Constants.PARAM_CATEGORY_IDS)) {
  316. return false;
  317. }
  318. //按照性别过滤
  319. if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
  320. parameter, Constants.PARAM_SEX)) {
  321. return false;
  322. }
  323. //按照搜索词过滤
  324. if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,
  325. parameter, Constants.PARAM_KEYWORDS)) {
  326. return false;
  327. }
  328. //按照点击品类id进行搜索
  329. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,
  330. parameter, Constants.PARAM_CATEGORY_IDS)) {
  331. return false;
  332. }
  333. return true;
  334. }
  335. });
  336. return null;
  337. }
  338. }

session聚合统计之自定义Accumulator

之前已经得到通过条件过滤后的session,现在需要统计出来访问时长在0s~3s的session的数量,占总session数量的比例;4s~6s的session的数量,占总session数量的比例;……
并且统计出访问步长在1~3的session的数量,占总session数量的比例;4~6的session的数量,占总session数量的比例;……

这是就需要Accumulator,传统的Accumulator实现方式为:
Accumulator 1s_3s = sc.accumulator(0L);
十几个Accumulator

可以对过滤以后的session,调用foreach也可以,遍历所有session;计算每个session的访问时长和访问步长;
访问时长:把session的最后一个action的时间,减去第一个action的时间。
访问步长:session的action数量。
计算出访问时长和访问步长以后,根据对应的区间,找到对应的Accumulator,1s_3s.add(1L),
同时每遍历一个session,就可以给总session数量对应的Accumulator加1,最后用各个区间的session数量,除以总session数量,就可以计算出各个区间的占比了。

这种传统的实现方式,有什么不好呢???

最大的不好,就是Accumulator太多了,不便于维护。
首先第一,很有可能,在写后面的累加代码的时候,比如找到了一个4s~6s的区间的session,但是却代码里面不小心,累加到7s~9s里面去了;
第二,当后期,项目如果要出现一些逻辑上的变更,比如说,session数量的计算逻辑,要改变,就得更改所有Accumulator对应的代码;或者说,又要增加几个范围,那么又要增加多个Accumulator,并且修改对应的累加代码;维护成本,相当之高(甚至可能,修改一个小功能,或者增加一个小功能,耗费的时间,比做一个新项目还要多;甚至于,还修改出了bug,那就耗费更多的时间)。

所以,我们这里的设计,不打算采用传统的方式,用十几个,甚至二十个Accumulator,因为维护成本太高。
这里的实现思路是,我们自己自定义一个Accumulator,实现较为复杂的计算逻辑,一个Accumulator维护了所有范围区间的数量的统计逻辑。
低耦合,如果说,session数量计算逻辑要改变,那么不用变更session遍历的相关的代码;只要维护一个Accumulator里面的代码即可;如果计算逻辑后期变更,或者加了几个范围,那么也很方便,不用多加好几个Accumulator,去修改大量的代码;只要维护一个Accumulator里面的代码即可;维护成本,大大降低

自定义Accumulator,也是Spark Core中,属于比较高端的一个技术使用自定义Accumulator,大家就可以任意的实现自己的复杂分布式计算的逻辑。如果说,你的task,分布式,进行复杂计算逻辑,那么是很难实现的(借助于redis,维护中间状态,借助于zookeeper去实现分布式锁)。但是,使用自定义Accumulator,可以更方便进行中间状态的维护,而且不用担心并发和锁的问题。

  1. package com.erik.sparkproject.spark;
  2. import org.apache.spark.AccumulatorParam;
  3. import com.erik.sparkproject.constant.Constants;
  4. import com.erik.sparkproject.util.StringUtils;
  5. /**
  6. * session聚合统计Accumulator
  7. * 使用自己定义的一些数据格式,比如String,甚至说,我们可以自己定义model,自己定义的类(必须可序列化)
  8. * 然后可以基于这种特殊的数据格式实现复杂的分布式计算逻辑
  9. * 各个task分布式在运行,可以根据自己的需求,task给Accumulator传入不同的值
  10. * 根据不同的值,去做复杂的逻辑
  11. *
  12. * Spark Core里很使用的高端技术
  13. * @author Erik
  14. *
  15. */
  16. public class SesssionAggrStatAccumulator implements AccumulatorParam<String> {
  17. private static final long serialVersionUID = -2113961376143864034L;
  18. //zero方法,主要用于数据初始化
  19. //这里就返回一个值,就是初始化中,所有范围区间的数值都是0
  20. //各个范围区间的统计数量的拼接,还是采用key=value|key=value的连接串格式
  21. //在Constants.java中添加
  22. //String SESSION_COUNT = "session_count";
  23. //String TIME_PERIOD_1s_3s = "1s_3s";
  24. //String TIME_PERIOD_4s_6s = "4s_6s";
  25. //String TIME_PERIOD_7s_9s = "7s_9s";
  26. //String TIME_PERIOD_10s_30s = "10s_30s";
  27. //String TIME_PERIOD_30s_60s = "30s_60s";
  28. //String TIME_PERIOD_1m_3m = "1m_3m";
  29. //String TIME_PERIOD_3m_10m = "3m_10m";
  30. //String TIME_PERIOD_10m_30m = "10m_30m";
  31. //String TIME_PERIOD_30m = "30m";
  32. //String STEP_PERIOD_1_3 = "1_3";
  33. //String STEP_PERIOD_4_6 = "4_6";
  34. //String STEP_PERIOD_7_9 = "7_9";
  35. //String STEP_PERIOD_10_30 = "10_30";
  36. //String STEP_PERIOD_30_60 = "30_60";
  37. //String STEP_PERIOD_60 = "60";
  38. public String zero(String v) {
  39. return Constants.SESSION_COUNT + "=0|"
  40. + Constants.TIME_PERIOD_1s_3s + "=0|"
  41. + Constants.TIME_PERIOD_4s_6s + "=0|"
  42. + Constants.TIME_PERIOD_7s_9s + "=0|"
  43. + Constants.TIME_PERIOD_10s_30s + "=0|"
  44. + Constants.TIME_PERIOD_30s_60s + "=0|"
  45. + Constants.TIME_PERIOD_1m_3m + "=0|"
  46. + Constants.TIME_PERIOD_3m_10m + "=0|"
  47. + Constants.TIME_PERIOD_10m_30m + "=0|"
  48. + Constants.TIME_PERIOD_30m + "=0|"
  49. + Constants.STEP_PERIOD_1_3 + "=0|"
  50. + Constants.STEP_PERIOD_4_6 + "=0|"
  51. + Constants.STEP_PERIOD_7_9 + "=0|"
  52. + Constants.STEP_PERIOD_10_30 + "=0|"
  53. + Constants.STEP_PERIOD_30_60 + "=0|"
  54. + Constants.STEP_PERIOD_60 + "=0";
  55. }
  56. //addInPlace和addAccumulator可以理解为是一样的
  57. //这两个方法,主要实现,v1可能就是我们初始化的那个连接串
  58. //v2就是在遍历session的时候,判断出某个session对应的区间,然后用Constants.TIME_PERIOD_1S_3S
  59. //所以,我们要做的事情就是在v1中,找到对应的v2对应的value,累加1,然后再更新到连接串里面去
  60. public String addInPlace(String v1, String v2) {
  61. return null;
  62. }
  63. public String addAccumulator(String v1, String v2) {
  64. return null;
  65. }
  66. /**
  67. * session统计计算逻辑
  68. * @param v1连接串
  69. * @param v2 范围区间
  70. * @return更新以后的连接串
  71. */
  72. private String add(String v1, String v2) {
  73. //校验:v1为空的时候,直接返回v2
  74. if(StringUtils.isEmpty(v1)) {
  75. return v2;
  76. }
  77. //使用StringUtils工具类,从v1中提取v2对应的值,并累加1
  78. String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
  79. if(oldValue != null) {
  80. //将范围区间原有的值累加1
  81. int newValue = Integer.valueOf(oldValue) + 1;
  82. //使用StringUtils工具类,将v1中,v2对应的值设置成心的累加胡的值
  83. return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));
  84. }
  85. return v1;
  86. }
  87. }

session聚合统计之重构实现思路与重构session聚合

接下来需要进行session聚合统计,统计出访问时长和访问步长,各个区间的session数量占总session数量的比例。

如果不进行重构,直接来实现,思路如下:
1. sessionRDD,映射成的格式;
2. 按session聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD;
3. 遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中对应的值;
4. 使用自定义Accumulator中的统计值,去计算各个区间的比例;
5. 将最后计算出来的结果,写入MySQL对应的表中。

普通实现思路的问题:
1. 为什么还要用actionRDD去映射?其实之前在session聚合的时候已经做过映射了,多次一举;
2. 是不是一定要为了session的聚合这个功能单独去遍历一遍session?其实没必要,已经有session数据,之前过滤session的时候,其实相当于是在遍历session了,那么这里就没必要再过滤一遍了。

重构实现思路
不要去生成任何新的RDD(处理上亿的数据);
不要去单独遍历一遍session(处理上千万的数据);
可以在进行session聚合的时候就直接计算出来每个session的访问时长和访问步长;
在进行过滤的时候,本来就要遍历所有的聚合session信息,此时就可以在某个session通过筛选条件后 将访问时长和访问步长累加到自定义的Acccumulator上面;
就是两种截然不同的思考方式和实现方式,在面对上亿,上千万数据的时候,甚至可以节省时间长达半小时 或者数个小时。
开发Spark大型复杂项目的一些经验准则
尽量少生成RDD;
尽量少对RDD进行算子操作,如果有可能,尽量在一个算子里实现多个需要的功能;
尽量少对进行RDD进行shuffle算子操作,比如groupByKey,reduceByKey,sortByKey等shuffle操作,会导致大量的磁盘读写,严重降低性能。有shuffle的算子和没有shuffle的算子,性能甚至会有长达几十分钟甚至数个小时的差别,有shuffle的算子,很容易导致数据倾斜,一旦数据倾斜,简直就是性能杀手(后续会有完整解决方案)
无论做什么功能,性能第一。
在传统的J2EE或者.NET或者PHP,软件/系统/网站开发中,架构和可维护性,可扩展性的重要程度远远高于性能,大量的分布式的架构,设计模式,代码的划分,类的划分(高并发网站除外)在大数据项目中,比如MapReduce,Hive,Spark,Storm中,性能的重要程度远远大于一次额代码的规范和设计模式,代码的划分,类的划分;大数据最重要的是性能。

主要就是因为大数据以及大数据项目的特点决定了大数据的程度和项目的速度都比较慢,如果不优先考虑性能的话,会导致一个大数据处理程序运行时间长达数小时,甚至数十个小时,所以,推荐大数据项目,在开发和代码的架构中,优先考虑性能,其次考虑功能代码的划分、解耦合。

我们如果采用第一种实现方案,那么其实就是代码划分(解耦合、可维护)优先,设计优先如果采用第二种方案,那么其实就是性能优先。

  1. package com.erik.sparkproject.spark;
  2. import java.text.ParseException;
  3. import java.util.Date;
  4. import java.util.Iterator;
  5. import org.apache.spark.SparkConf;
  6. import org.apache.spark.SparkContext;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.Function;
  11. import org.apache.spark.api.java.function.PairFunction;
  12. import org.apache.spark.sql.DataFrame;
  13. import org.apache.spark.sql.Row;
  14. import org.apache.spark.sql.SQLContext;
  15. import org.apache.spark.sql.hive.HiveContext;
  16. import com.alibaba.fastjson.JSONObject;
  17. import com.erik.sparkproject.conf.ConfigurationManager;
  18. import com.erik.sparkproject.constant.Constants;
  19. import com.erik.sparkproject.dao.ITaskDAO;
  20. import com.erik.sparkproject.domain.Task;
  21. import com.erik.sparkproject.impl.DAOFactory;
  22. import com.erik.sparkproject.test.MockData;
  23. import com.erik.sparkproject.util.*;
  24. import scala.Tuple2;
  25. /**
  26. * 用户访问session分析spark作业
  27. *
  28. * 接收用户创建的分析任务,用户可能指定的条件如下:
  29. * 1.时间范围:起始日期-结束日期
  30. * 2.性别:男或女
  31. * 3.年龄范围
  32. * 4.职业:多选
  33. * 5.城市:多选
  34. * 6.搜索词:多个搜索词,只要某个session中的任何一个
  35. * action搜索过指定的关键词,那么session就符合条件
  36. * 7.点击品类:多个品类,只要某个session中的任何一个
  37. * action点击过某个品类,那么session就符合条件
  38. *
  39. * 我们的Spark作业如何接受用户创建的任务呢?
  40. * J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,
  41. * 任务参数以JSON格式封装在task_param字段中
  42. * 接着J2EE平台执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本
  43. * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数传递给spark作业的main函数
  44. * 参数就封装在main函数得到args数组中
  45. *
  46. * 这是spark本事提供的特性
  47. *
  48. *
  49. * @author Erik
  50. *
  51. */
  52. public class UserVisitSessionAnalyzeSpark {
  53. public static void main(String[] args) {
  54. args = new String[]{"2"};
  55. //构建spark上下文
  56. //首先在Constants.java中设置spark作业相关的常量
  57. //String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";
  58. //保存Constants.java配置
  59. SparkConf conf = new SparkConf()
  60. .setAppName(Constants.SPARK_APP_NAME)
  61. .setMaster("local");
  62. JavaSparkContext sc = new JavaSparkContext(conf);
  63. SQLContext sqlContext = getSQLContext(sc.sc());
  64. //生成模拟测试数据
  65. mockData(sc, sqlContext);
  66. //创建需要使用的DAO组件
  67. ITaskDAO taskDAO = DAOFactory.getTaskDAO();
  68. //那么就首先得查询出来指定的任务,并获取任务的查询参数
  69. long taskid = ParamUtils.getTaskIdFromArgs(args);
  70. Task task = taskDAO.findById(taskid);
  71. JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
  72. //如果要进行session粒度的数据聚合,
  73. //首先要从user_visit_action表中,查询出来指定日期范围内的数据
  74. JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);
  75. //聚合
  76. //首先,可以将行为数据按照session_id进行groupByKey分组
  77. //此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join
  78. //然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息
  79. //到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,
  80. //clickCategoryIds,age,professional,city,sex)>
  81. JavaPairRDD<String, String> sessionid2AggrInfoRDD =
  82. aggregateBySession(sqlContext, actionRDD);
  83. //接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
  84. //相当于我们自己编写的算子,是要访问外面的任务参数对象的
  85. //匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的
  86. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD =
  87. filterSession(sessionid2AggrInfoRDD, taskParam);
  88. /**
  89. * session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例)
  90. */
  91. //关闭spark上下文
  92. sc.close();
  93. }
  94. /**
  95. * 获取SQLContext
  96. * 如果在本地测试环境的话,那么久生成SQLC哦那text对象
  97. *如果在生产环境运行的话,那么就生成HiveContext对象
  98. * @param sc SparkContext
  99. * @return SQLContext
  100. */
  101. private static SQLContext getSQLContext(SparkContext sc) {
  102. //在my.properties中配置
  103. //spark.local=true(打包之前改为flase)
  104. //在ConfigurationManager.java中添加
  105. //public static Boolean getBoolean(String key) {
  106. // String value = getProperty(key);
  107. // try {
  108. // return Boolean.valueOf(value);
  109. // } catch (Exception e) {
  110. // e.printStackTrace();
  111. // }
  112. // return false;
  113. //}
  114. //在Contants.java中添加
  115. //String SPARK_LOCAL = "spark.local";
  116. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  117. if(local) {
  118. return new SQLContext(sc);
  119. }else {
  120. return new HiveContext(sc);
  121. }
  122. }
  123. /**
  124. * 生成模拟数据
  125. * 只有是本地模式,才会生成模拟数据
  126. * @param sc
  127. * @param sqlContext
  128. */
  129. private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {
  130. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  131. if(local) {
  132. MockData.mock(sc, sqlContext);
  133. }
  134. }
  135. /**
  136. * 获取指定日期范围内的用户访问行为数据
  137. * @param sqlContext SQLContext
  138. * @param taskParam 任务参数
  139. * @return 行为数据RDD
  140. */
  141. private static JavaRDD<Row> getActionRDDByDateRange(
  142. SQLContext sqlContext, JSONObject taskParam) {
  143. //先在Constants.java中添加任务相关的常量
  144. //String PARAM_START_DATE = "startDate";
  145. //String PARAM_END_DATE = "endDate";
  146. String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
  147. String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
  148. String sql = "select * "
  149. + "from user_visit_action"
  150. + "where date>='" + startDate + "'"
  151. + "and date<='" + endDate + "'";
  152. DataFrame actionDF = sqlContext.sql(sql);
  153. return actionDF.javaRDD();
  154. }
  155. /**
  156. * 对行为数据按sesssion粒度进行聚合
  157. * @param actionRDD 行为数据RDD
  158. * @return session粒度聚合数据
  159. */
  160. private static JavaPairRDD<String, String> aggregateBySession(
  161. SQLContext sqlContext, JavaRDD<Row> actionRDD) {
  162. //现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
  163. //现在需要将这个Row映射成<sessionid,Row>的格式
  164. JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(
  165. /**
  166. * PairFunction
  167. * 第一个参数,相当于是函数的输入
  168. * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
  169. */
  170. new PairFunction<Row, String, Row>() {
  171. private static final long serialVersionUID = 1L;
  172. public Tuple2<String, Row> call(Row row) throws Exception {
  173. //按照MockData.java中字段顺序获取
  174. //此时需要拿到session_id,序号是2
  175. return new Tuple2<String, Row>(row.getString(2), row);
  176. }
  177. });
  178. //对行为数据按照session粒度进行分组
  179. JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
  180. sessionid2ActionRDD.groupByKey();
  181. //对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
  182. //到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
  183. JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(
  184. new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
  185. private static final long serialVersionUID = 1L;
  186. public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
  187. throws Exception {
  188. String sessionid = tuple._1;
  189. Iterator<Row> iterator = tuple._2.iterator();
  190. StringBuffer searchKeywordsBuffer = new StringBuffer("");
  191. StringBuffer clickCategoryIdsBuffer = new StringBuffer("");
  192. Long userid = null;
  193. //session的起始和结束时间
  194. Date startTime = null;
  195. Date endTime = null;
  196. //session的访问步长
  197. int stepLength = 0;
  198. //遍历session所有的访问行为
  199. while(iterator.hasNext()) {
  200. //提取每个 访问行为的搜索词字段和点击品类字段
  201. Row row = iterator.next();
  202. if(userid == null) {
  203. userid = row.getLong(1);
  204. }
  205. String searchKeyword = row.getString(5);
  206. Long clickCategoryId = row.getLong(6);
  207. //实际上这里要对数据说明一下
  208. //并不是每一行访问行为都有searchKeyword和clickCategoryId两个字段的
  209. //其实,只有搜索行为是有searchKeyword字段的
  210. //只有点击品类的行为是有clickCaregoryId字段的
  211. //所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的
  212. //所以是否将搜索词点击品类id拼接到字符串中去
  213. //首先要满足不能是null值
  214. //其次,之前的字符串中还没有搜索词或者点击品类id
  215. if(StringUtils.isNotEmpty(searchKeyword)) {
  216. if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
  217. searchKeywordsBuffer.append(searchKeyword + ",");
  218. }
  219. }
  220. if(clickCategoryId != null) {
  221. if(!clickCategoryIdsBuffer.toString().contains(
  222. String.valueOf(clickCategoryId))) {
  223. clickCategoryIdsBuffer.append(clickCategoryId + ",");
  224. }
  225. }
  226. //计算session开始和结束时间
  227. Date actionTime = DateUtils.parseTime(row.getString(4));
  228. if(startTime == null) {
  229. startTime = actionTime;
  230. }
  231. if(endTime == null) {
  232. endTime = actionTime;
  233. }
  234. if(actionTime.before(startTime)) {
  235. startTime = actionTime;
  236. }
  237. if(actionTime.after(endTime)) {
  238. endTime = actionTime;
  239. }
  240. //计算session访问步长
  241. stepLength ++;
  242. }
  243. //计算session开始和结束时间
  244. //现在DateUtils.java中添加方法
  245. //public static Date parseTime(String time) {
  246. // try {
  247. // return TIME_FORMAT.parse(time);
  248. // } catch (ParseException e) {
  249. // e.printStackTrace();
  250. // }
  251. // return null;
  252. //}
  253. //StringUtils引入的包是import com.erik.sparkproject.util.trimComma;
  254. String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
  255. String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());
  256. //计算session访问时长(秒)
  257. long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;
  258. //返回的数据即是<sessionid, partAggrInfo>
  259. //但是,这一步聚合后,其实还需要将每一行数据,根对应的用户信息进行聚合
  260. //问题来了,如果是跟用户信息进行聚合的话,那么key就不应该是sessionid,而应该是userid
  261. //才能够跟<userid, Row>格式的用户信息进行聚合
  262. //如果我们这里直接返回<sessionid, partAggrInfo>,还得再做一次mapToPair算子
  263. //将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举
  264. //所以,我们这里其实可以直接返回数据格式就是<userid,partAggrInfo>
  265. //然后在直接将返回的Tuple的key设置成sessionid
  266. //最后的数据格式,还是<sessionid,fullAggrInfo>
  267. //聚合数据,用什么样的格式进行拼接?
  268. //我们这里统一定义,使用key=value|key=vale
  269. //在Constants.java中定义spark作业相关的常量
  270. //String FIELD_SESSION_ID = "sessionid";
  271. //String FIELD_SEARCH_KEYWORDS = "searchKeywords";
  272. //String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
  273. //String FIELD_VISIT_LENGTH = "visitLength";
  274. //String FIELD_STEP_LENGTH = "stepLength";
  275. String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
  276. + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
  277. + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
  278. + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
  279. + Constants.FIELD_STEP_LENGTH + "=" + stepLength;
  280. return new Tuple2<Long, String>(userid, partAggrInfo);
  281. }
  282. });
  283. //查询所有用户数据
  284. String sql = "select * from user_info";
  285. JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();
  286. JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(
  287. new PairFunction<Row, Long, Row>(){
  288. private static final long serialVersionUID = 1L;
  289. public Tuple2<Long, Row> call(Row row) throws Exception {
  290. return new Tuple2<Long, Row>(row.getLong(0), row);
  291. }
  292. });
  293. //将session粒度聚合数据,与用户信息进行join
  294. JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
  295. userid2PartAggrInfoRDD.join(userid2InfoRDD);
  296. //对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
  297. JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(
  298. new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {
  299. private static final long serialVersionUID = 1L;
  300. public Tuple2<String, String> call(
  301. Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {
  302. String partAggrInfo = tuple._2._1;
  303. Row userInfoRow = tuple._2._2;
  304. String sessionid = StringUtils.getFieldFromConcatString(
  305. partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
  306. int age = userInfoRow.getInt(3);
  307. String professional = userInfoRow.getString(4);
  308. String city = userInfoRow.getString(5);
  309. String sex = userInfoRow.getString(6);
  310. //在Constants.java中添加以下常量
  311. //String FIELD_AGE = "age";
  312. //String FIELD_PROFESSIONAL = "professional";
  313. //String FIELD_CITY = "city";
  314. //String FIELD_SEX = "sex";
  315. String fullAggrInfo = partAggrInfo + "|"
  316. + Constants.FIELD_AGE + "=" + age + "|"
  317. + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
  318. + Constants.FIELD_CITY + "=" + city + "|"
  319. + Constants.FIELD_SEX + "=" + sex ;
  320. return new Tuple2<String, String>(sessionid, fullAggrInfo);
  321. }
  322. });
  323. return sessionid2FullAggrInfoRDD;
  324. }
  325. /**
  326. * 过滤session数据
  327. * @param sessionid2AggrInfoRDD
  328. * @return
  329. */
  330. private static JavaPairRDD<String, String> filterSession(
  331. JavaPairRDD<String, String> sessionid2AggrInfoRDD,
  332. final JSONObject taskParam) {
  333. //为了使用后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
  334. String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  335. String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  336. String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
  337. String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
  338. String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
  339. String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
  340. String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);
  341. String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
  342. + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
  343. + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
  344. + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
  345. + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
  346. + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
  347. + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : "");
  348. if (_parameter.endsWith("\\|")) {
  349. _parameter = _parameter.substring(0, _parameter.length() - 1);
  350. }
  351. final String parameter = _parameter;
  352. //根据筛选参数进行过滤
  353. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
  354. new Function<Tuple2<String, String>, Boolean>() {
  355. private static final long serialVersionUID = 1L;
  356. public Boolean call(Tuple2<String, String> tuple) throws Exception {
  357. //首先,从tuple中,获取聚合数据
  358. String aggrInfo = tuple._2;
  359. //接着,依次按照筛选条件进行过滤
  360. //按照年龄范围进行过滤(startAge、endAge)
  361. //先在Constants.java中添加常量
  362. //String PARAM_START_AGE = "startAge";
  363. //String PARAM_END_AGE = "endage";
  364. //String PARAM_PROFESSIONALS = "professionals";
  365. //String PARAM_CITIES = "cities";
  366. //String PARAM_SEX = "sex";
  367. //String PARAM_KEYWORDS = "keywords";
  368. //String PARAM_CATEGORY_IDS = "categoryIds";
  369. if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
  370. parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
  371. return false;
  372. }
  373. //按照职业范围进行过滤(professionals)
  374. if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,
  375. parameter, Constants.PARAM_PROFESSIONALS)) {
  376. return false;
  377. }
  378. //按照城市范围进行过滤(cities)
  379. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
  380. parameter, Constants.PARAM_CATEGORY_IDS)) {
  381. return false;
  382. }
  383. //按照性别过滤
  384. if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
  385. parameter, Constants.PARAM_SEX)) {
  386. return false;
  387. }
  388. //按照搜索词过滤
  389. if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,
  390. parameter, Constants.PARAM_KEYWORDS)) {
  391. return false;
  392. }
  393. //按照点击品类id进行搜索
  394. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,
  395. parameter, Constants.PARAM_CATEGORY_IDS)) {
  396. return false;
  397. }
  398. return true;
  399. }
  400. });
  401. return null;
  402. }
  403. }

session聚合统计之重构过滤进行统计

  1. package com.erik.sparkproject.spark;
  2. import java.text.ParseException;
  3. import java.util.Date;
  4. import java.util.Iterator;
  5. import org.apache.spark.Accumulator;
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.SparkContext;
  8. import org.apache.spark.api.java.JavaPairRDD;
  9. import org.apache.spark.api.java.JavaRDD;
  10. import org.apache.spark.api.java.JavaSparkContext;
  11. import org.apache.spark.api.java.function.Function;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.sql.DataFrame;
  14. import org.apache.spark.sql.Row;
  15. import org.apache.spark.sql.SQLContext;
  16. import org.apache.spark.sql.hive.HiveContext;
  17. import com.alibaba.fastjson.JSONObject;
  18. import com.erik.sparkproject.conf.ConfigurationManager;
  19. import com.erik.sparkproject.constant.Constants;
  20. import com.erik.sparkproject.dao.ITaskDAO;
  21. import com.erik.sparkproject.domain.Task;
  22. import com.erik.sparkproject.impl.DAOFactory;
  23. import com.erik.sparkproject.test.MockData;
  24. import com.erik.sparkproject.util.*;
  25. import scala.Tuple2;
  26. /**
  27. * 这是spark本事提供的特性
  28. *
  29. * @author Erik
  30. */
  31. public class UserVisitSessionAnalyzeSpark {
  32. public static void main(String[] args) {
  33. args = new String[]{"2"};
  34. //构建spark上下文
  35. //首先在Constants.java中设置spark作业相关的常量
  36. //String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";
  37. //保存Constants.java配置
  38. SparkConf conf = new SparkConf()
  39. .setAppName(Constants.SPARK_APP_NAME)
  40. .setMaster("local");
  41. JavaSparkContext sc = new JavaSparkContext(conf);
  42. SQLContext sqlContext = getSQLContext(sc.sc());
  43. //生成模拟测试数据
  44. mockData(sc, sqlContext);
  45. //创建需要使用的DAO组件
  46. ITaskDAO taskDAO = DAOFactory.getTaskDAO();
  47. //那么就首先得查询出来指定的任务,并获取任务的查询参数
  48. long taskid = ParamUtils.getTaskIdFromArgs(args);
  49. Task task = taskDAO.findById(taskid);
  50. JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
  51. //如果要进行session粒度的数据聚合,
  52. //首先要从user_visit_action表中,查询出来指定日期范围内的数据
  53. JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);
  54. //聚合
  55. //首先,可以将行为数据按照session_id进行groupByKey分组
  56. //此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join
  57. //然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息
  58. //到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,
  59. //clickCategoryIds,age,professional,city,sex)>
  60. JavaPairRDD<String, String> sessionid2AggrInfoRDD =
  61. aggregateBySession(sqlContext, actionRDD);
  62. //接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
  63. //相当于我们自己编写的算子,是要访问外面的任务参数对象的
  64. //匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的
  65. //重构,同时进行过滤和统计
  66. Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
  67. "", new SesssionAggrStatAccumulator());
  68. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(
  69. sessionid2AggrInfoRDD, taskParam, sessionAggrStatAccumulator);
  70. //关闭spark上下文
  71. sc.close();
  72. }
  73. /**
  74. * 获取SQLContext
  75. * 如果在本地测试环境的话,那么久生成SQLC哦那text对象
  76. *如果在生产环境运行的话,那么就生成HiveContext对象
  77. * @param sc SparkContext
  78. * @return SQLContext
  79. */
  80. private static SQLContext getSQLContext(SparkContext sc) {
  81. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  82. if(local) {
  83. return new SQLContext(sc);
  84. }else {
  85. return new HiveContext(sc);
  86. }
  87. }
  88. /**
  89. * 生成模拟数据
  90. * 只有是本地模式,才会生成模拟数据
  91. * @param sc
  92. * @param sqlContext
  93. */
  94. private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {
  95. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  96. if(local) {
  97. MockData.mock(sc, sqlContext);
  98. }
  99. }
  100. /**
  101. * 获取指定日期范围内的用户访问行为数据
  102. * @param sqlContext SQLContext
  103. * @param taskParam 任务参数
  104. * @return 行为数据RDD
  105. */
  106. private static JavaRDD<Row> getActionRDDByDateRange(
  107. SQLContext sqlContext, JSONObject taskParam) {
  108. String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
  109. String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
  110. String sql = "select * "
  111. + "from user_visit_action"
  112. + "where date>='" + startDate + "'"
  113. + "and date<='" + endDate + "'";
  114. DataFrame actionDF = sqlContext.sql(sql);
  115. return actionDF.javaRDD();
  116. }
  117. /**
  118. * 对行为数据按sesssion粒度进行聚合
  119. * @param actionRDD 行为数据RDD
  120. * @return session粒度聚合数据
  121. */
  122. private static JavaPairRDD<String, String> aggregateBySession(
  123. SQLContext sqlContext, JavaRDD<Row> actionRDD) {
  124. //现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
  125. //现在需要将这个Row映射成<sessionid,Row>的格式
  126. JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(
  127. /**
  128. * PairFunction
  129. * 第一个参数,相当于是函数的输入
  130. * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
  131. */
  132. new PairFunction<Row, String, Row>() {
  133. private static final long serialVersionUID = 1L;
  134. public Tuple2<String, Row> call(Row row) throws Exception {
  135. //按照MockData.java中字段顺序获取
  136. //此时需要拿到session_id,序号是2
  137. return new Tuple2<String, Row>(row.getString(2), row);
  138. }
  139. });
  140. //对行为数据按照session粒度进行分组
  141. JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
  142. sessionid2ActionRDD.groupByKey();
  143. //对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
  144. //到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
  145. JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(
  146. new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
  147. private static final long serialVersionUID = 1L;
  148. public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
  149. throws Exception {
  150. String sessionid = tuple._1;
  151. Iterator<Row> iterator = tuple._2.iterator();
  152. StringBuffer searchKeywordsBuffer = new StringBuffer("");
  153. StringBuffer clickCategoryIdsBuffer = new StringBuffer("");
  154. Long userid = null;
  155. //session的起始和结束时间
  156. Date startTime = null;
  157. Date endTime = null;
  158. //session的访问步长
  159. int stepLength = 0;
  160. //遍历session所有的访问行为
  161. while(iterator.hasNext()) {
  162. //提取每个 访问行为的搜索词字段和点击品类字段
  163. Row row = iterator.next();
  164. if(userid == null) {
  165. userid = row.getLong(1);
  166. }
  167. String searchKeyword = row.getString(5);
  168. Long clickCategoryId = row.getLong(6);
  169. if(StringUtils.isNotEmpty(searchKeyword)) {
  170. if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
  171. searchKeywordsBuffer.append(searchKeyword + ",");
  172. }
  173. }
  174. if(clickCategoryId != null) {
  175. if(!clickCategoryIdsBuffer.toString().contains(
  176. String.valueOf(clickCategoryId))) {
  177. clickCategoryIdsBuffer.append(clickCategoryId + ",");
  178. }
  179. }
  180. //计算session开始和结束时间
  181. Date actionTime = DateUtils.parseTime(row.getString(4));
  182. if(startTime == null) {
  183. startTime = actionTime;
  184. }
  185. if(endTime == null) {
  186. endTime = actionTime;
  187. }
  188. if(actionTime.before(startTime)) {
  189. startTime = actionTime;
  190. }
  191. if(actionTime.after(endTime)) {
  192. endTime = actionTime;
  193. }
  194. //计算session访问步长
  195. stepLength ++;
  196. }
  197. String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
  198. String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());
  199. //计算session访问时长(秒)
  200. long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;
  201. String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
  202. + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
  203. + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
  204. + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
  205. + Constants.FIELD_STEP_LENGTH + "=" + stepLength;
  206. return new Tuple2<Long, String>(userid, partAggrInfo);
  207. }
  208. });
  209. //查询所有用户数据
  210. String sql = "select * from user_info";
  211. JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();
  212. JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(
  213. new PairFunction<Row, Long, Row>(){
  214. private static final long serialVersionUID = 1L;
  215. public Tuple2<Long, Row> call(Row row) throws Exception {
  216. return new Tuple2<Long, Row>(row.getLong(0), row);
  217. }
  218. });
  219. //将session粒度聚合数据,与用户信息进行join
  220. JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
  221. userid2PartAggrInfoRDD.join(userid2InfoRDD);
  222. //对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
  223. JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(
  224. new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {
  225. private static final long serialVersionUID = 1L;
  226. public Tuple2<String, String> call(
  227. Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {
  228. String partAggrInfo = tuple._2._1;
  229. Row userInfoRow = tuple._2._2;
  230. String sessionid = StringUtils.getFieldFromConcatString(
  231. partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
  232. int age = userInfoRow.getInt(3);
  233. String professional = userInfoRow.getString(4);
  234. String city = userInfoRow.getString(5);
  235. String sex = userInfoRow.getString(6);
  236. String fullAggrInfo = partAggrInfo + "|"
  237. + Constants.FIELD_AGE + "=" + age + "|"
  238. + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
  239. + Constants.FIELD_CITY + "=" + city + "|"
  240. + Constants.FIELD_SEX + "=" + sex ;
  241. return new Tuple2<String, String>(sessionid, fullAggrInfo);
  242. }
  243. });
  244. return sessionid2FullAggrInfoRDD;
  245. }
  246. /**
  247. * 过滤session数据,并进行聚合统计
  248. * @param sessionid2AggrInfoRDD
  249. * @return
  250. */
  251. private static JavaPairRDD<String, String> filterSessionAndAggrStat(
  252. JavaPairRDD<String, String> sessionid2AggrInfoRDD,
  253. final JSONObject taskParam,
  254. final Accumulator<String> sessionAggrAccumulator) {
  255. //为了使用后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
  256. String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  257. String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  258. String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
  259. String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
  260. String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
  261. String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
  262. String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);
  263. String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
  264. + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
  265. + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
  266. + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
  267. + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
  268. + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
  269. + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : "");
  270. if (_parameter.endsWith("\\|")) {
  271. _parameter = _parameter.substring(0, _parameter.length() - 1);
  272. }
  273. final String parameter = _parameter;
  274. //根据筛选参数进行过滤
  275. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
  276. new Function<Tuple2<String, String>, Boolean>() {
  277. private static final long serialVersionUID = 1L;
  278. public Boolean call(Tuple2<String, String> tuple) throws Exception {
  279. //首先,从tuple中,获取聚合数据
  280. String aggrInfo = tuple._2;
  281. if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
  282. parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
  283. return false;
  284. }
  285. //按照职业范围进行过滤(professionals)
  286. if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,
  287. parameter, Constants.PARAM_PROFESSIONALS)) {
  288. return false;
  289. }
  290. //按照城市范围进行过滤(cities)
  291. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
  292. parameter, Constants.PARAM_CATEGORY_IDS)) {
  293. return false;
  294. }
  295. //按照性别过滤
  296. if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
  297. parameter, Constants.PARAM_SEX)) {
  298. return false;
  299. }
  300. //按照搜索词过滤
  301. if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,
  302. parameter, Constants.PARAM_KEYWORDS)) {
  303. return false;
  304. }
  305. //按照点击品类id进行搜索
  306. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,
  307. parameter, Constants.PARAM_CATEGORY_IDS)) {
  308. return false;
  309. }
  310. //如果经过了之前的多个过滤条件之后,程序能够走到这里
  311. //那么说明该session是通过了用户指定的筛选条件的,也就是需要保留的session
  312. //那么就要对session的访问时长和访问步长进行统计,
  313. //根据session对应的范围进行相应的累加计数
  314. //只要走到这一步,那么就是需要计数的session
  315. sessionAggrAccumulator.add(Constants.SESSION_COUNT);
  316. //计算出session的访问时长和访问步长的范围,并进行相应的累加
  317. long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(
  318. aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH));
  319. long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(
  320. aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH));
  321. calculateVisitLength(visitLength);
  322. calculateStepLength(stepLength);
  323. return true;
  324. }
  325. /**
  326. * 计算访问时长范围
  327. * @param visitLength
  328. */
  329. private void calculateVisitLength(long visitLength) {
  330. if(visitLength >= 1 && visitLength <= 3) {
  331. sessionAggrAccumulator.add(Constants.TIME_PERIOD_1s_3s);
  332. }else if(visitLength >= 4 && visitLength <= 6) {
  333. sessionAggrAccumulator.add(Constants.TIME_PERIOD_4s_6s);
  334. }else if(visitLength >= 7 && visitLength <= 9) {
  335. sessionAggrAccumulator.add(Constants.TIME_PERIOD_7s_9s);
  336. }else if(visitLength >= 10 && visitLength <= 30) {
  337. sessionAggrAccumulator.add(Constants.TIME_PERIOD_10s_30s);
  338. }else if(visitLength > 30 && visitLength <= 60) {
  339. sessionAggrAccumulator.add(Constants.TIME_PERIOD_30s_60s);
  340. }else if(visitLength > 60 && visitLength <= 180) {
  341. sessionAggrAccumulator.add(Constants.TIME_PERIOD_1m_3m);
  342. }else if(visitLength > 180 && visitLength <= 600) {
  343. sessionAggrAccumulator.add(Constants.TIME_PERIOD_3m_10m);
  344. }else if(visitLength > 600 && visitLength <= 1800) {
  345. sessionAggrAccumulator.add(Constants.TIME_PERIOD_10m_30m);
  346. }else if(visitLength > 1800) {
  347. sessionAggrAccumulator.add(Constants.TIME_PERIOD_30m);
  348. }
  349. }
  350. /**
  351. * 计算访问步长范围
  352. * @param stepLength
  353. */
  354. private void calculateStepLength(long stepLength) {
  355. if(stepLength >= 1 && stepLength <= 3) {
  356. sessionAggrAccumulator.add(Constants.STEP_PERIOD_1_3);
  357. }else if(stepLength >= 4 && stepLength <= 6) {
  358. sessionAggrAccumulator.add(Constants.STEP_PERIOD_4_6);
  359. }else if(stepLength >= 7 && stepLength <= 9) {
  360. sessionAggrAccumulator.add(Constants.STEP_PERIOD_7_9);
  361. }else if(stepLength >= 10 && stepLength <= 30) {
  362. sessionAggrAccumulator.add(Constants.STEP_PERIOD_10_30);
  363. }else if(stepLength > 30 && stepLength <= 60) {
  364. sessionAggrAccumulator.add(Constants.STEP_PERIOD_30_60);
  365. }else if(stepLength > 60) {
  366. sessionAggrAccumulator.add(Constants.STEP_PERIOD_60);
  367. }
  368. }
  369. });
  370. return null;
  371. }
  372. }

性能调优之调节并行度

Spark并行度原理

以 Spark 中的 WordCount 作业为例,每个Spark作业其实都是一个 Application,每个 Application 对应多个 Jobs,一个 action 操作(比如 collect)触发一个 job,在WordCount程序中有一个 job,每个 job 拆成多个 stage(发生 shuffle 的时候回拆分出一个 stage),reduceByKey 处会发生 shuffle。reduceByKey 这里,相当于 stage0 的task在最后执行到 reduceByKey 的时候会为每个 stage1 的task都创建一份文件(也可能是合并在少量的文件里面),每个 stage1 的 task会去各个节点上的各个 task 创建的属于自己的那一份文件里面拉取数据,每个 stage1 的 task 拉取到的数据一定是相同 key 对应的数据。对相同的key,对应的values,才能去执行我们自定义的function操作( + )。

task会找到 HDFS 上属于自己的对应数据,每个 task 处理一小块数据 HDFS block,然后依次去执行算子操作。task 默认会创建三份文件,每一个文件里面,一定是存放相同的key对应的values;但是一个文件里面可能有多个key,以及其对应的values;相同key的values一定是进入同一个文件。

下一个 stage 的每个 task 都会去上一个 stage 的 task 里拉去属于自己的那份文件。

并行度设置
并行度其实就是指的是Spark作业中各个stage的task数量,也就代表了Spark作业的在各个阶段(stage)的并行度。

如果不调节并行度,导致并行度过低会怎么样?
假设现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如有50个 executor,每个executor 有10G内存,每个 executor 有3个cpu core,基本已经达到了集群或者yarn队列的资源上限。

如果 task 没有设置,或者设置的很少,比如就设置了100个 task。现在50个 executor,每个executor 有3个cpu core,也就是说,你的Application任何一个 stage 运行的时候都有总数在150个 cpu core,可以并行运行。但是你现在只有100个task,平均分配一下,每个executor 分配到2个task,那么同时在运行的task只有100个,每个executor只会并行运行2个task,每个executor剩下的一个 cpu core 就浪费掉了。

你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。

合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源。比如上面的例子,总共集群有150个cpu core,可以并行运行150个task。那么就应该将你的Application 的并行度至少设置成150才能完全有效的利用你的集群资源,让150个task并行执行,而且task增加到150个以后,既可以同时并行运行,还可以让每个task要处理的数据量变少。比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据,现在增加到150个task可以并行运行,而且每个task主要处理1G的数据就可以。

很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。

task数量,至少设置成与Spark application的总cpu core数量相同(最理想情况是:比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)。

官方推荐task数量设置成spark application总cpu core数量的2~3倍,比如150个cpu core基本要设置task数量为300~500。实际情况与理想情况不同的,有些task会运行的快一点,比如50s就完了,有些task可能会慢一点,比如要1分半才运行完,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费,因为,比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。

如何设置一个Spark Application的并行度?
可以使用参数spark.default.parallelism设置,比如
SparkConf conf = new SparkConf()
.set(“spark.default.parallelism”, “500”)
总结
分配更多资源属于“重剑无锋”类型:真正有分量的一些技术和点,其实都是看起来比较平凡,看起来没有那么“炫酷”,但是其实是你每次写完一个spark作业,进入性能调优阶段的时候,应该优先调节的事情,就是这些(大部分时候,可能资源和并行度到位了,spark作业就很快了,几分钟就跑完了)
后续会进行一些“炫酷”的调优方式,比如数据倾斜(100个spark作业,最多10个会出现真正严重的数据倾斜问题)JVM调优等等。

性能调优之广播大变量

代码优化
以代码中的 session 随机抽取功能为例,因为 session 随机抽取功能使用到了随机抽取索引map这一比较大的变量,之前是直接在算子里使用了这个map,每个task都会拷贝一份map副本,比较消耗内存和网络传输性能,现在将其改为广播变量。
首先传入一个变量sc,将代码

private static void randomExtractSession(
final long taskid,
JavaPairRDD sessionid2AggrInfoRDD,
JavaPairRDD sessionid2actionRDD)
变为

private static void randomExtractSession(
JavaSparkContext sc,
final long taskid,
JavaPairRDD sessionid2AggrInfoRDD,
JavaPairRDD sessionid2actionRDD)
然后在“使用按是按比例随机抽取算法,计算出每小时要抽取session的索引”方法后设置广播变量(原先map之前的final去掉)

final Broadcast>>> dateHourExtractMapBroadcast =
sc.broadcast(dateHourExtractMap);
接着就是使用广播变量, 直接调用广播变量(Broadcast类型)的value()或getvalue()函数就可以获取到之前封装的广播变量,在代码

List extractIndexList = dateHourExtractMap.get(date).get(hour);
前加上

Map>> dateHourExtractMap =
dateHourExtractMapBroadcast.value();
最后不要忘了在randomExtractSession处添加sc参数,即由

randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);

变为

randomExtractSession(sc, task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);

此时广播变量方法就完成了。

性能调优之Kryo序列化

Kryo 序列化原因
在广播大变量进行优化后,还可以进一步优化,即优化这个序列化格式。
默认情况下,Spark内部是使用Java的序列化机制ObjectOutputStream / ObjectInputStream这种对象输入输出流机制来进行序列化。
这种默认序列化机制的好处在于:处理起来比较方便,也不需要我们手动去做什么事情,只是在算子里面使用的变量必须是实现Serializable接口的,可序列化即可。
但是缺点在于:默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。

可以手动进行序列化格式的优化,Spark支持使用Kryo序列化机制。Kryo序列化机制比默认的Java序列化机制速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。

Kryo序列化生效位置
Kryo序列化机制一旦启用以后,会在以下几个地方生效:
1. 算子函数中使用到的外部变量,会序列化,这时可以是用Kryo序列化机制;
2. 持久化 RDD 时进行序列化,比如StorageLevel.MEMORY_ONLY_SER,可以使用 Kryo 进一步优化序列化的效率和性能;
3、进行shuffle时,比如在进行stage间的task的shuffle操作时,节点与节点之间的task会互相大量通过网络拉取和传输文件,此时,这些数据既然通过网络传输,也是可能要序列化的,就会使用Kryo。

Kryo序列化优点
其实之前在“Kryo序列化生效位置”处已经提到了Kryo序列化的优点,这里总结一下,大致为:
1. 算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗;
2. 持久化RDD,优化内存的占用和消耗,持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC;
3、shuffle:可以优化网络传输的性能。

实现Kryo序列化步骤
第一步,在 SparkConf中设置一个属性spark.serializer,使用org.apache.spark.serializer.KryoSerializer类。
Kryo 之所以没有被作为默认的序列化类库的原因,主要是因为 Kryo 要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类(比如,你的算子函数中使用到了外部自定义类型的对象变量,这时就要求必须注册你的类,否则 Kryo 达不到最佳性能)。
第二步,注册你使用到的需要通过 Kryo 序列化的一些自定义类。

代码实现
在项目中,以CategorySortKey这个方法为例,在获取top10热门品类功能中,在进行二次排序时自定义了一个key,这个key是需要在进行 shuffle 的时候进行网络传输的,因此也是要求实现序列化的,启用 Kryo 序列化机制后就会用 Kryo 去序列化和反序列化 CategorySortKey,按上面讲的序列化步骤具体实现代码为:

  1. SparkConf conf = new SparkConf()<br /> .setAppName(Constants.SPARK_APP_NAME_SESSION)<br /> .setMaster("local")<br /> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")<br /> .registerKryoClasses(new Class[]{<br /> CategorySortKey.class});

性能调优之使用 fastutil 优化数据格式

fastutil 介绍
fastutil 是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue。

fastutil 能够提供更小的内存占用,更快的存取速度。我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于:fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度。

fastutil也提供了64位的array、set 和 list 以及高性能快速的,以及实用的IO类来处理二进制和文本类型的文件。fastutil的每一种集合类型都实现了对应的 Java 中的标准接口(比如fastutil的map实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。

fastutil 还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。
fastutil 除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。fastutil尽量提供了在任何场景下都是速度最快的集合类库。

Spark中应用fastutil的场景
如果算子函数使用了外部变量,那么第一,你可以使用Broadcast广播变量优化;第二,可以使用Kryo序列化类库,提升序列化性能和效率;第三,如果外部变量是某种比较大的集合,那么可以考虑使用 fastutil 改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用。

在你的算子函数里,也就是task要执行的计算逻辑里面,如果有逻辑中出现要创建比较大的 Map、List 等集合,可能会占用较大的内存空间,而且可能涉及到消耗性能的遍历、存取等集合操作,那么此时可以考虑将这些集合类型使用 fastutil 类库重写,使用了 fastutil 集合类以后,就可以在一定程度上减少task创建出来的集合类型的内存占用,避免 executor 内存频繁占满,频繁唤起GC,导致性能下降。

fastutil调优说明
fastutil 其实没有你想象中的那么强大,也不会跟官网上说的效果那么一鸣惊人。之前所说的广播变量、Kryo序列化类库、fastutil,对于性能来说类似于一种调味品。分配资源、并行度、RDD架构与持久化,这三个是性能提升最明显的,而broadcast、kryo、fastutil 等类似于调料的作用,起到部分加强提升的功能。

比如说你的spark作业,经过之前一些调优以后,大概30分钟运行完,现在加上broadcast、kryo、fastutil,也许就是优化到29分钟运行完,或者更好一点,也许就是28分钟或25分钟。如果使用shuffle调优或许会降到15分钟;groupByKey用reduceByKey改写,执行本地聚合后也许就10分钟;如果跟公司申请更多的资源,比如资源更大的YARN队列,可能1分钟就可以运行完。

代码实现
第一步:在 pom.xml 中引用 fastutil 的包


fastutil
fastutil
5.0.9

(速度比较慢,可能是从国外的网去拉取jar包,可能要等待5分钟,甚至几十分钟不等)

然后修改代码,以“session随机抽取功能”中的List为例,在

final Broadcast>>> dateHourExtractMapBroadcast =
sc.broadcast(dateHourExtractMap);
之前加上

Map> fastutilDateHourExtractMap =
new HashMap>();

  1. for(Map.Entry<String, Map<String, List<Integer>>> dateHourExtractEntry : <br /> dateHourExtractMap.entrySet()) {<br /> String date = dateHourExtractEntry.getKey();<br /> Map<String, List<Integer>> hourExtractMap = dateHourExtractEntry.getValue();
  2. Map<String, IntList> fastutilHourExtractMap = new HashMap<String, IntList>();
  3. for(Map.Entry<String, List<Integer>> hourExtractEntry : hourExtractMap.entrySet()) {<br /> String hour = hourExtractEntry.getKey();<br /> List<Integer> extractList = hourExtractEntry.getValue();
  4. IntList fastutilExtractList = new IntArrayList();
  5. for(int i = 0; i < extractList.size(); i++) {<br /> fastutilExtractList.add(extractList.get(i)); <br /> }
  6. fastutilHourExtractMap.put(hour, fastutilExtractList);<br /> }
  7. fastutilDateHourExtractMap.put(date, fastutilHourExtractMap);<br /> }<br />然后将

final Broadcast>>> dateHourExtractMapBroadcast =
sc.broadcast(dateHourExtractMap);
改写为

final Broadcast>> dateHourExtractMapBroadcast =
sc.broadcast(fastutilDateHourExtractMap);
在之前改写的广播变量处将

final Broadcast>>> dateHourExtractMapBroadcast =
sc.broadcast(dateHourExtractMap);
改为

Map> dateHourExtractMap =
dateHourExtractMapBroadcast.value();
最后记得在“构建Spark上下文”处的Kryo中注册 IntList

  1. SparkConf conf = new SparkConf()<br /> .setAppName(Constants.SPARK_APP_NAME_SESSION)<br /> .setMaster("local")<br /> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")<br /> .registerKryoClasses(new Class[]{<br /> CategorySortKey.class,<br /> IntList.class}); <br />至此,使用 fastutil 优化完成。

JVM调优之原理概述以及降低cache操作的内存占比

性能调优分类
常规性能调优:分配资源、并行度等等方式。

JVM 调优(Java虚拟机):JVM相关的参数。通常情况下,如果你的硬件配置、基础的 JVM 的配置都 ok 的话,JVM 通常不会造成太严重的性能问题,反而更多的是在 troubleshooting 中 JVM 占了很重要的地位, JVM 造成线上的 spark 作业的运行报错,甚至失败(比如OOM)。

shuffle 调优(相当重要):spark 在执行 groupByKey、reduceByKey 等操作时的 shuffle 环节的调优,这个很重要。shuffle 调优其实对 spark 作业的性能的影响是相当之高!经验总结:在 spark 作业的运行过程中只要一牵扯到有 shuffle 的操作,基本上 shuffle 操作的性能消耗要占到整个 spark 作业的 50%~90%。

spark 操作调优(spark算子调优,比较重要):原来使用groupByKey进行的操作现在使用 countByKey 或 aggregateByKey 来重构实现,用foreachPartition替代foreach。有些算子的性能是比其他一些算子的性能要高的。如果一旦遇到合适的情况,效果还是不错的。

调优顺序最好依照:
分配资源、并行度、RDD架构与缓存;
shuffle调优;
spark算子调优;
JVM调优、广播大变量……
理论基础
Spark 是用 scala 开发的,大家不要以为 scala 就跟java一点关系都没有了,这是一个很常见的错误。spark 的 scala 代码调用了很多 java api,scala 也是运行在 java 虚拟机中的,spark 是运行在 java 虚拟机中的。java虚拟机可能会产生的问题就是内存不足!我们的 RDD 的缓存、task运行定义的算子函数可能会创建很多对象,都可能会占用大量内存,没搞好的话,可能导致JVM出问题。

JVM结构

JVM中的堆内存存放我们创建的一些对象,堆内存里有年轻代(yong generation)和老年代(old generation),年轻代内部又分为三部分:Eden区域和两个survivor 区域。理想情况下,老年代都是放一些声明周期很长的对象,数量应该是很少的,比如数据库连接池。

我们在 spark task 执行算子函数的时候可能会创建很多对象,这些对象都是要放入JVM 年轻代中的。

每一次放对象的时候都是放入 eden 区域和其中一个 survivor 区域,另外一个 survivor 区域是空闲的。当 eden 区域和一个 survivor 区域放满了以后(spark运行过程中,产生的对象实在太多了)就会触发 minor gc(小型垃圾回收),把不再使用的对象从内存中清空,给后面新创建的对象腾出来点儿地方。

清理掉了不再使用的对象之后,也会将存活下来的对象(还要继续使用的)放入之前空闲的那一个 survivor 区域中。这里可能会出现一个问题:默认 eden、survior1 和 survivor2 的内存占比是 8:1:1,问题是如果存活下来的对象是1.5,一个 survivor区域放不下,此时就可能通过JVM的担保机制(不同JVM版本可能对应的行为)将多余的对象直接放入老年代了。

如果你的JVM内存不够大的话,可能导致频繁的年轻代内存满溢,频繁的进行 minor gc,频繁的 minor gc 会导致短时间内有些存活的对象多次垃圾回收都没有回收掉,会导致这种短声明周期(其实不一定是要长期使用的)对象年龄过大(垃圾回收次数太多还没有回收到)跑到老年代。

老年代中可能会因为内存不足囤积非常多的短生命周期的,本来应该在年轻代中的可能马上就要被回收掉的对象。此时,可能导致老年代频繁满溢,频繁进行 full gc(全局/全面垃圾回收),full gc就会去回收老年代中的对象。full gc由于这个算法的设计针对的是老年代中的对象数量很少,满溢进行full gc的频率应该很少,因此采取了不太复杂,但是耗费性能和时间的垃圾回收算法,full gc很慢。

full gc / minor gc无论是快还是慢,都会导致 jvm 的工作线程停止工作(stop the world)。简而言之,就是说:gc 的时候 spark 停止工作了,等着垃圾回收结束。

内存不充足的时候会导致的问题:
1. 频繁minor gc 也会导致频繁spark停止工作;
2. 老年代囤积大量活跃对象(短生命周期的对象)导致频繁full gc,full gc时间很长,短则数十秒,长则数分钟,甚至数小时,可能导致 spark 长时间停止工作。
3. 严重影响咱们的 spark 的性能和运行的速度。

JVM调优
JVM调优的第一个点:降低cache操作的内存占比
spark 中堆内存又被划分成了两块儿,一块儿是专门用来给 RDD 的cache、persist 操作进行 RDD 数据缓存用的;另外一块儿是用来给 spark 算子函数的运行使用的,用来存放函数中自己创建的对象。

默认情况下,给RDD cache操作的内存占比是0.6,也就是说60%的内存都给了cache 操作了。但是问题是如果某些情况下 cache 不是那么的紧张,问题在于 task 算子函数中创建的对象过多,然后内存又不太大,导致了频繁的minor gc,甚至频繁full gc,导致spark频繁的停止工作,性能影响会很大。

针对上述这种情况,大家可以在spark ui 中yarn 的界面去查看 spark 作业的运行统计,一层一层点击进去可以看到每个stage 的运行情况,包括每个task的运行时间、gc时间等等。如果发现gc太频繁,时间太长,此时就可以适当调价这个比例。降低cache操作的内存占比,大不了用 persist 操作选择将一部分缓存的RDD数据写入磁盘或者序列化方式,配合Kryo序列化类减少RDD缓存的内存占用,降低cache操作内存占比,对应的算子函数的内存占比就提升了。这个时候,可能就可以减少 minor gc 的频率,同时减少 full gc 的频率,对性能的提升是有一定的帮助的。

最终实现的效果就是:让 task 执行算子函数时,有更多的内存可以使用。

代码实现
通过spark.storage.memoryFraction参数进行调节 cache 内存占比,默认是0.6,可以调节为 0.5 , 0.4 或 0.2,具体调节数据要根据 yarn 界面的spark运行统计而定。具体在项目中设置是在构建Spark上下文的时候传入这个参数:

  1. SparkConf conf = new SparkConf()<br /> .setAppName(Constants.SPARK_APP_NAME_SESSION)<br /> .setMaster("local")<br /> .set("spark.storage.memoryFraction", "0.5")<br /> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")<br /> .registerKryoClasses(new Class[]{<br /> CategorySortKey.class,<br /> IntList.class});

JVM调优之调节executor堆外内存与连接等待时长

executor 堆外内存
有时候,如果你的 spark 作业处理的数据量特别特别大,比如有几亿数据量,然后spark 作业一运行,时不时的报错,出现 shuffle file cannot find 诸如此类的错误,导致 executor、task lost,out of memory(内存溢出),出现这种情况可能是说 executor 的堆外内存不太够用,导致 executor 在运行的过程中可能会内存溢出,可能导致后续的 stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件,但是 executor 可能已经挂掉了,关联的block manager也没有了,所以可能会报 shuffle output file not found;resubmitting task;executor lost 等错误,最后导致 spark 作业彻底崩溃。

上述情况下,就可以去考虑调节一下 executor 的堆外内存。也许就可以避免报错。此外,有时堆外内存调节的比较大的时候,对于性能来说也会带来一定的提升。

调节参数
在 spark-submit脚本里面用 —conf的方式添加配置spark.yarn.executor.memoryOverhead参数,一定要注意!切记不是在 spark 作业代码中用 new SparkConf().set() 这种方式去设置,这种设置是没有用的,一定要在spark-submit脚本中去设置。默认情况下,这个堆外内存上限大概是300多M,通常项目中真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行,此时就会去调节这个参数到至少1G,甚至说2G、4G,比如调节为 2G,可以在脚本中加入:

—conf spark.yarn.executor.memoryOverhead=2048
1
通常这个参数调节上去以后,就会避免掉某些 JVM OOM 的异常问题,同时会让整体spark作业的性能,得到较大的提升。

连接等待时长
executor 优先从自己本地关联的 BlockManager 中获取某份数据,如果本地 block manager 没有的话,那么会通过 TransferService 去远程连接其他节点上 executor的 block manager 去获取。如果在连接时,正好碰到那个 exeuctor 的 JVM 在垃圾回收,此时就会没有响应,无法建立网络连接,就会卡住,spark 默认的网络连接的超时时长是 60s,如果卡住 60s 都无法建立连接的话,那么就宣告失败了。

如果偶尔碰到一种没有规律情况,突然某某 file uuid(dsfsfd-2342vs–sdf–sdfsd)not found,file lost等,这种情况下,很有可能是有那份数据的executor在jvm gc,所以拉取数据的时候建立不了连接,然后超过默认 60s 以后,直接宣告失败。几次都拉取不到数据的话,可能会导致 spark 作业的崩溃,也可能会导致DAGScheduler 反复提交几次 stage,TaskScheduler 反复提交几次 task,大大延长我们的spark作业的运行时间,此时可以考虑调节连接的超时时长。
spark-submit 脚本中设置

—conf spark.core.connection.ack.wait.timeout=300
1
spark.core.connection.ack.wait.timeout是建立不上连接的时候,超时等待时长,调节这个值比较大以后,通常来说,可以避免部分的偶尔出现的某某文件拉取失败,比如某某文件lost等情况。

总结
之所以讲这两个参数,是因为在真正处理大数据(不是几千万数据量、几百万数据量,而是几亿、几十亿、几百亿的时候)的时候比较实用,很容易碰到executor堆外内存以及gc引起的连接超时的问题,出现file not found,executor lost,task lost等错误,调节上面两个参数还是很有帮助的。

spark-submit 脚本模板:

/usr/local/spark/bin/spark-submit \
—class com.erik.sparkstudy.WordCount \
—num-executors 80 \
—driver-memory 6g \
—executor-memory 6g \
—executor-cores 3 \
—master yarn-cluster \
—queue root.default \
—conf spark.yarn.executor.memoryOverhead=2048 \
—conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar \
${1}

  1. package com.erik.sparkproject.spark.session;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.HashMap;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Random;
  9. import org.apache.spark.Accumulator;
  10. import org.apache.spark.SparkConf;
  11. import org.apache.spark.SparkContext;
  12. import org.apache.spark.api.java.JavaPairRDD;
  13. import org.apache.spark.api.java.JavaRDD;
  14. import org.apache.spark.api.java.JavaSparkContext;
  15. import org.apache.spark.api.java.function.Function;
  16. import org.apache.spark.api.java.function.Function2;
  17. import org.apache.spark.api.java.function.PairFlatMapFunction;
  18. import org.apache.spark.api.java.function.PairFunction;
  19. import org.apache.spark.api.java.function.VoidFunction;
  20. import org.apache.spark.sql.DataFrame;
  21. import org.apache.spark.sql.Row;
  22. import org.apache.spark.sql.SQLContext;
  23. import org.apache.spark.sql.hive.HiveContext;
  24. import com.alibaba.fastjson.JSONObject;
  25. import com.erik.sparkproject.conf.ConfigurationManager;
  26. import com.erik.sparkproject.constant.Constants;
  27. import com.erik.sparkproject.dao.ISessionAggrStatDAO;
  28. import com.erik.sparkproject.dao.ISessionDetailDAO;
  29. import com.erik.sparkproject.dao.ISessionRandomExtractDAO;
  30. import com.erik.sparkproject.dao.ITaskDAO;
  31. import com.erik.sparkproject.dao.ITop10CategoryDAO;
  32. import com.erik.sparkproject.dao.ITop10SessionDAO;
  33. import com.erik.sparkproject.dao.factory.DAOFactory;
  34. import com.erik.sparkproject.domain.SessionAggrStat;
  35. import com.erik.sparkproject.domain.SessionDetail;
  36. import com.erik.sparkproject.domain.SessionRandomExtract;
  37. import com.erik.sparkproject.domain.Task;
  38. import com.erik.sparkproject.domain.Top10Category;
  39. import com.erik.sparkproject.domain.Top10Session;
  40. import com.erik.sparkproject.test.MockData;
  41. import com.erik.sparkproject.util.*;
  42. import com.google.common.base.Optional;
  43. import scala.Tuple2;
  44. /**
  45. * 用户访问session分析spark作业
  46. *
  47. * 接收用户创建的分析任务,用户可能指定的条件如下:
  48. * 1.时间范围:起始日期-结束日期
  49. * 2.性别:男或女
  50. * 3.年龄范围
  51. * 4.职业:多选
  52. * 5.城市:多选
  53. * 6.搜索词:多个搜索词,只要某个session中的任何一个
  54. * action搜索过指定的关键词,那么session就符合条件
  55. * 7.点击品类:多个品类,只要某个session中的任何一个
  56. * action点击过某个品类,那么session就符合条件
  57. *
  58. * 我们的Spark作业如何接受用户创建的任务呢?
  59. * J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,
  60. * 任务参数以JSON格式封装在task_param字段中
  61. * 接着J2EE平台执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本
  62. * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数传递给spark作业的main函数
  63. * 参数就封装在main函数得到args数组中
  64. *
  65. * 这是spark本事提供的特性
  66. *
  67. *
  68. * @author Erik
  69. *
  70. */
  71. public class UserVisitSessionAnalyzeSpark {
  72. public static void main(String[] args) {
  73. args = new String[]{"2"};
  74. //构建spark上下文
  75. //首先在Constants.java中设置spark作业相关的常量
  76. //String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark";
  77. //保存Constants.java配置
  78. SparkConf conf = new SparkConf()
  79. .setAppName(Constants.SPARK_APP_NAME)
  80. .setMaster("local");
  81. JavaSparkContext sc = new JavaSparkContext(conf);
  82. SQLContext sqlContext = getSQLContext(sc.sc());
  83. //生成模拟测试数据
  84. mockData(sc, sqlContext);
  85. //创建需要使用的DAO组件
  86. ITaskDAO taskDAO = DAOFactory.getTaskDAO();
  87. //那么就首先得查询出来指定的任务,并获取任务的查询参数
  88. long taskid = ParamUtils.getTaskIdFromArgs(args);
  89. Task task = taskDAO.findById(taskid);
  90. JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
  91. System.out.println(taskid);
  92. System.out.println(taskParam);
  93. //如果要进行session粒度的数据聚合,
  94. //首先要从user_visit_action表中,查询出来指定日期范围内的数据
  95. JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);
  96. JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);
  97. //聚合
  98. //首先,可以将行为数据按照session_id进行groupByKey分组
  99. //此时的数据粒度就是session粒度了,然后可以将session粒度的数据与用户信息数据惊醒join
  100. //然后就可以获取到session粒度的数据,同时数据里面还包含了session对应的user信息
  101. //到这里为止,获取的数据是<sessionid,(sessionid,searchKeywords,
  102. //clickCategoryIds,age,professional,city,sex)>
  103. JavaPairRDD<String, String> sessionid2AggrInfoRDD =
  104. aggregateBySession(sqlContext, actionRDD);
  105. //接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
  106. //相当于我们自己编写的算子,是要访问外面的任务参数对象的
  107. //匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的
  108. //重构,同时进行过滤和统计
  109. Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
  110. "", new SesssionAggrStatAccumulator());
  111. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(
  112. sessionid2AggrInfoRDD, taskParam, sessionAggrStatAccumulator);
  113. //生成公共RDD:通过筛选条件的session的访问明细数据
  114. JavaPairRDD<String, Row> sessionid2detailRDD = getSessionid2detailRDD(
  115. filteredSessionid2AggrInfoRDD, sessionid2actionRDD);
  116. //对于Accumulator这种分布式累加计算的变量的使用,有一个重要的说明
  117. //从Accumulator中,获取数据,插入数据库的时候,一定要是在某一个action操作以后
  118. //如果没有action的话,那么整个程序根本不会运行
  119. //而且必须把能够触发job执行的操作,放在最终写入MySQL方法之前
  120. //计算出的结果,在J2EE中,是以两张柱状图显示的
  121. /**
  122. * 特别说明
  123. * 我们知道,要将一个功能的session聚合统计数据获取到,就必须是在一个action操作触发job之后才能
  124. * 从Accmulator中获取数据,否则是获取不到数据的,因为没有job执行,Accumulator的值为空
  125. * 所以,我们在这里,将随机抽取功能的实现代码放在session聚合统计功能的最终计算和写库之前
  126. * 因为随机抽取功能中,有一个countByKey算子,是action操作,会触发job
  127. *
  128. */
  129. randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);
  130. //计算出各个范围的session占比,并写入MySQL
  131. calculateAndPersistAggrStat(sessionAggrStatAccumulator.value(), task.getTaskid());
  132. /**
  133. * session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例)
  134. *
  135. * 如果不进行重构,直接来实现,思路;
  136. * 1.sessionRDD,映射成<sessioinid,Row>的格式
  137. * 2.按session聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD
  138. * 3.遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中对应的值
  139. * 4.使用自定义Accumulator中的统计值,去计算各个区间的比例
  140. * 5.将最后计算出来的结果,写入MySQL对应的表中
  141. *
  142. * 普通实现思路的问题:
  143. * 1.为什么还要用actionRDD去映射?其实之前在session聚合的时候已经做过映射了,多次一举
  144. * 2.是不是一定要为了session的聚合这个功能单独去遍历一遍session?其实没必要,已经有session数据
  145. * 之前过滤session的时候,其实相当于是在遍历session了,那么这里就没必要再过滤一遍了
  146. *
  147. * 重构实现思路:
  148. * 1.不要去生成任何新的RDD(处理上亿的数据)
  149. * 2.不要去单独遍历一遍session(处理上千万的数据)
  150. * 3.可以在进行session聚合的时候就直接计算出来每个session的访问时长和访问步长
  151. * 4.在进行过滤的时候,本来就要遍历所有的聚合session信息,此时就可以在某个session通过筛选条件后
  152. * 将访问时长和访问步长累加到自定义的Acccumulator上面
  153. * 5.就是两种截然不同的思考方式和实现方式,在面对上亿,上千万数据的时候,甚至可以节省时间长达半小时
  154. * 或者数个小时
  155. *
  156. * 开发Spark大型复杂项目的一些经验准则:
  157. * 1.尽量少生成RDD
  158. * 2.尽量少对RDD进行算子操作,如果有可能,尽量在一个算子里实现多个需要的功能
  159. * 3.尽量少对进行RDD进行shuffle算子操作,比如groupByKey,reduceByKey,sortByKey
  160. * shuffle操作,会导致大量的磁盘读写,严重降低性能
  161. * 有shuffle的算子和没有shuffle的算子,性能甚至会有长达几十分钟甚至数个小时的差别
  162. * 有shuffle的算子,很容易导致数据倾斜,一旦数据倾斜,简直就是性能杀手(后续会有完整解决方案)
  163. * 4.无论做什么功能,性能第一
  164. * 在传统的J2EE或者.NET或者PHP,软件/系统/网站开发中,架构和可维护性,可扩展性的重要程度
  165. * 远远高于性能,大量的分布式的架构,设计模式,代码的划分,类的划分(高并发网站除外)
  166. *
  167. * 在大数据项目中,比如MapReduce,Hive,Spark,Storm中,性能的重要程度远远大于一次额代码的规范
  168. * 和设计模式,代码的划分,类的划分;大数据最重要的是性能。
  169. * 主要就是因为大数据以及大数据项目的特点决定了大数据的程度和项目的速度都比较慢
  170. * 如果不优先考虑性能的话,会导致一个大数据处理程序运行时间长达数小时,甚至数十个小时
  171. *
  172. * 所以,推荐大数据项目,在开发和代码的架构中,优先考虑性能,其次考虑功能代码的划分、解耦合
  173. *
  174. * 我们如果采用第一种实现方案,那么其实就是代码划分(解耦合、可维护)优先,设计优先
  175. * 如果采用第二种方案,那么其实就是性能优先
  176. */
  177. //获取top10热门品类
  178. List<Tuple2<CategorySortKey, String>> top10CategoryList =
  179. getTop10Category(task.getTaskid(), sessionid2detailRDD);
  180. //获取top10活跃session
  181. getTop10Session(sc, task.getTaskid(), top10CategoryList, sessionid2detailRDD);
  182. //关闭spark上下文
  183. sc.close();
  184. }
  185. /**
  186. * 获取SQLContext
  187. * 如果在本地测试环境的话,那么久生成SQLContext对象
  188. *如果在生产环境运行的话,那么就生成HiveContext对象
  189. * @param sc SparkContext
  190. * @return SQLContext
  191. */
  192. private static SQLContext getSQLContext(SparkContext sc) {
  193. //在my.properties中配置
  194. //spark.local=true(打包之前改为flase)
  195. //在ConfigurationManager.java中添加
  196. //public static Boolean getBoolean(String key) {
  197. // String value = getProperty(key);
  198. // try {
  199. // return Boolean.valueOf(value);
  200. // } catch (Exception e) {
  201. // e.printStackTrace();
  202. // }
  203. // return false;
  204. //}
  205. //在Contants.java中添加
  206. //String SPARK_LOCAL = "spark.local";
  207. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  208. if(local) {
  209. return new SQLContext(sc);
  210. }else {
  211. return new HiveContext(sc);
  212. }
  213. }
  214. /**
  215. * 生成模拟数据
  216. * 只有是本地模式,才会生成模拟数据
  217. * @param sc
  218. * @param sqlContext
  219. */
  220. private static void mockData(JavaSparkContext sc, SQLContext sqlContext) {
  221. boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
  222. if(local) {
  223. MockData.mock(sc, sqlContext);
  224. }
  225. }
  226. /**
  227. * 获取指定日期范围内的用户访问行为数据
  228. * @param sqlContext SQLContext
  229. * @param taskParam 任务参数
  230. * @return 行为数据RDD
  231. */
  232. private static JavaRDD<Row> getActionRDDByDateRange(
  233. SQLContext sqlContext, JSONObject taskParam) {
  234. //先在Constants.java中添加任务相关的常量
  235. //String PARAM_START_DATE = "startDate";
  236. //String PARAM_END_DATE = "endDate";
  237. String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
  238. String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
  239. String sql = "select * "
  240. + "from user_visit_action"
  241. + "where date>='" + startDate + "'"
  242. + "and date<='" + endDate + "'";
  243. DataFrame actionDF = sqlContext.sql(sql);
  244. return actionDF.javaRDD();
  245. }
  246. /**
  247. * 获取sessionid2到访问行为数据的映射的RDD
  248. * @param actionRDD
  249. * @return
  250. */
  251. public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {
  252. return actionRDD.mapToPair(new PairFunction<Row, String, Row>(){
  253. private static final long serialVersionUID = 1L;
  254. public Tuple2<String, Row> call(Row row) throws Exception {
  255. return new Tuple2<String, Row>(row.getString(2), row);
  256. }
  257. });
  258. }
  259. /**
  260. * 对行为数据按sesssion粒度进行聚合
  261. * @param actionRDD 行为数据RDD
  262. * @return session粒度聚合数据
  263. */
  264. private static JavaPairRDD<String, String> aggregateBySession(
  265. SQLContext sqlContext, JavaRDD<Row> actionRDD) {
  266. //现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
  267. //现在需要将这个Row映射成<sessionid,Row>的格式
  268. JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(
  269. /**
  270. * PairFunction
  271. * 第一个参数,相当于是函数的输入
  272. * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
  273. */
  274. new PairFunction<Row, String, Row>() {
  275. private static final long serialVersionUID = 1L;
  276. public Tuple2<String, Row> call(Row row) throws Exception {
  277. //按照MockData.java中字段顺序获取
  278. //此时需要拿到session_id,序号是2
  279. return new Tuple2<String, Row>(row.getString(2), row);
  280. }
  281. });
  282. //对行为数据按照session粒度进行分组
  283. JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
  284. sessionid2ActionRDD.groupByKey();
  285. //对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
  286. //到此为止,获取的数据格式如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
  287. JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(
  288. new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
  289. private static final long serialVersionUID = 1L;
  290. public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
  291. throws Exception {
  292. String sessionid = tuple._1;
  293. Iterator<Row> iterator = tuple._2.iterator();
  294. StringBuffer searchKeywordsBuffer = new StringBuffer("");
  295. StringBuffer clickCategoryIdsBuffer = new StringBuffer("");
  296. Long userid = null;
  297. //session的起始和结束时间
  298. Date startTime = null;
  299. Date endTime = null;
  300. //session的访问步长
  301. int stepLength = 0;
  302. //遍历session所有的访问行为
  303. while(iterator.hasNext()) {
  304. //提取每个 访问行为的搜索词字段和点击品类字段
  305. Row row = iterator.next();
  306. if(userid == null) {
  307. userid = row.getLong(1);
  308. }
  309. String searchKeyword = row.getString(5);
  310. Long clickCategoryId = row.getLong(6);
  311. //实际上这里要对数据说明一下
  312. //并不是每一行访问行为都有searchKeyword和clickCategoryId两个字段的
  313. //其实,只有搜索行为是有searchKeyword字段的
  314. //只有点击品类的行为是有clickCaregoryId字段的
  315. //所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的
  316. //所以是否将搜索词点击品类id拼接到字符串中去
  317. //首先要满足不能是null值
  318. //其次,之前的字符串中还没有搜索词或者点击品类id
  319. if(StringUtils.isNotEmpty(searchKeyword)) {
  320. if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
  321. searchKeywordsBuffer.append(searchKeyword + ",");
  322. }
  323. }
  324. if(clickCategoryId != null) {
  325. if(!clickCategoryIdsBuffer.toString().contains(
  326. String.valueOf(clickCategoryId))) {
  327. clickCategoryIdsBuffer.append(clickCategoryId + ",");
  328. }
  329. }
  330. //计算session开始和结束时间
  331. Date actionTime = DateUtils.parseTime(row.getString(4));
  332. if(startTime == null) {
  333. startTime = actionTime;
  334. }
  335. if(endTime == null) {
  336. endTime = actionTime;
  337. }
  338. if(actionTime.before(startTime)) {
  339. startTime = actionTime;
  340. }
  341. if(actionTime.after(endTime)) {
  342. endTime = actionTime;
  343. }
  344. //计算session访问步长
  345. stepLength ++;
  346. }
  347. //计算session开始和结束时间
  348. //现在DateUtils.java中添加方法
  349. //public static Date parseTime(String time) {
  350. // try {
  351. // return TIME_FORMAT.parse(time);
  352. // } catch (ParseException e) {
  353. // e.printStackTrace();
  354. // }
  355. // return null;
  356. //}
  357. //StringUtils引入的包是import com.erik.sparkproject.util.trimComma;
  358. String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
  359. String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());
  360. //计算session访问时长(秒)
  361. long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;
  362. //返回的数据即是<sessionid, partAggrInfo>
  363. //但是,这一步聚合后,其实还需要将每一行数据,根对应的用户信息进行聚合
  364. //问题来了,如果是跟用户信息进行聚合的话,那么key就不应该是sessionid,而应该是userid
  365. //才能够跟<userid, Row>格式的用户信息进行聚合
  366. //如果我们这里直接返回<sessionid, partAggrInfo>,还得再做一次mapToPair算子
  367. //将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举
  368. //所以,我们这里其实可以直接返回数据格式就是<userid,partAggrInfo>
  369. //然后在直接将返回的Tuple的key设置成sessionid
  370. //最后的数据格式,还是<sessionid,fullAggrInfo>
  371. //聚合数据,用什么样的格式进行拼接?
  372. //我们这里统一定义,使用key=value|key=vale
  373. //在Constants.java中定义spark作业相关的常量
  374. //String FIELD_SESSION_ID = "sessionid";
  375. //String FIELD_SEARCH_KEYWORDS = "searchKeywords";
  376. //String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
  377. //String FIELD_VISIT_LENGTH = "visitLength";
  378. //String FIELD_STEP_LENGTH = "stepLength";
  379. //String FIELD_START_TIME = "starttime";
  380. String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
  381. + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
  382. + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
  383. + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
  384. + Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|"
  385. + Constants.FIELD_START_TIME + "=" + DateUtils.formatDate(startTime);
  386. return new Tuple2<Long, String>(userid, partAggrInfo);
  387. }
  388. });
  389. //查询所有用户数据,并映射成<userid,Row>的格式
  390. String sql = "select * from user_info";
  391. JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();
  392. JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(
  393. new PairFunction<Row, Long, Row>(){
  394. private static final long serialVersionUID = 1L;
  395. public Tuple2<Long, Row> call(Row row) throws Exception {
  396. return new Tuple2<Long, Row>(row.getLong(0), row);
  397. }
  398. });
  399. //将session粒度聚合数据,与用户信息进行join
  400. JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
  401. userid2PartAggrInfoRDD.join(userid2InfoRDD);
  402. //对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
  403. JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(
  404. new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {
  405. private static final long serialVersionUID = 1L;
  406. public Tuple2<String, String> call(
  407. Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception {
  408. String partAggrInfo = tuple._2._1;
  409. Row userInfoRow = tuple._2._2;
  410. String sessionid = StringUtils.getFieldFromConcatString(
  411. partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
  412. int age = userInfoRow.getInt(3);
  413. String professional = userInfoRow.getString(4);
  414. String city = userInfoRow.getString(5);
  415. String sex = userInfoRow.getString(6);
  416. //在Constants.java中添加以下常量
  417. //String FIELD_AGE = "age";
  418. //String FIELD_PROFESSIONAL = "professional";
  419. //String FIELD_CITY = "city";
  420. //String FIELD_SEX = "sex";
  421. String fullAggrInfo = partAggrInfo + "|"
  422. + Constants.FIELD_AGE + "=" + age + "|"
  423. + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
  424. + Constants.FIELD_CITY + "=" + city + "|"
  425. + Constants.FIELD_SEX + "=" + sex ;
  426. return new Tuple2<String, String>(sessionid, fullAggrInfo);
  427. }
  428. });
  429. return sessionid2FullAggrInfoRDD;
  430. }
  431. /**
  432. * 过滤session数据,并进行聚合统计
  433. * @param sessionid2AggrInfoRDD
  434. * @return
  435. */
  436. private static JavaPairRDD<String, String> filterSessionAndAggrStat(
  437. JavaPairRDD<String, String> sessionid2AggrInfoRDD,
  438. final JSONObject taskParam,
  439. final Accumulator<String> sessionAggrAccumulator) {
  440. //为了使用后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
  441. String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  442. String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
  443. String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS);
  444. String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES);
  445. String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX);
  446. String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
  447. String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS);
  448. String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
  449. + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
  450. + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
  451. + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
  452. + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
  453. + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
  454. + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : "");
  455. if (_parameter.endsWith("\\|")) {
  456. _parameter = _parameter.substring(0, _parameter.length() - 1);
  457. }
  458. final String parameter = _parameter;
  459. //根据筛选参数进行过滤
  460. JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
  461. new Function<Tuple2<String, String>, Boolean>() {
  462. private static final long serialVersionUID = 1L;
  463. public Boolean call(Tuple2<String, String> tuple) throws Exception {
  464. //首先,从tuple中,获取聚合数据
  465. String aggrInfo = tuple._2;
  466. //接着,依次按照筛选条件进行过滤
  467. //按照年龄范围进行过滤(startAge、endAge)
  468. //先在Constants.java中添加常量
  469. //String PARAM_START_AGE = "startAge";
  470. //String PARAM_END_AGE = "endage";
  471. //String PARAM_PROFESSIONALS = "professionals";
  472. //String PARAM_CITIES = "cities";
  473. //String PARAM_SEX = "sex";
  474. //String PARAM_KEYWORDS = "keywords";
  475. //String PARAM_CATEGORY_IDS = "categoryIds";
  476. if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
  477. parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
  478. return false;
  479. }
  480. //按照职业范围进行过滤(professionals)
  481. if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL,
  482. parameter, Constants.PARAM_PROFESSIONALS)) {
  483. return false;
  484. }
  485. //按照城市范围进行过滤(cities)
  486. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
  487. parameter, Constants.PARAM_CATEGORY_IDS)) {
  488. return false;
  489. }
  490. //按照性别过滤
  491. if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
  492. parameter, Constants.PARAM_SEX)) {
  493. return false;
  494. }
  495. //按照搜索词过滤
  496. if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS,
  497. parameter, Constants.PARAM_KEYWORDS)) {
  498. return false;
  499. }
  500. //按照点击品类id进行搜索
  501. if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS,
  502. parameter, Constants.PARAM_CATEGORY_IDS)) {
  503. return false;
  504. }
  505. //如果经过了之前的多个过滤条件之后,程序能够走到这里
  506. //那么说明该session是通过了用户指定的筛选条件的,也就是需要保留的session
  507. //那么就要对session的访问时长和访问步长进行统计,
  508. //根据session对应的范围进行相应的累加计数
  509. //只要走到这一步,那么就是需要计数的session
  510. sessionAggrAccumulator.add(Constants.SESSION_COUNT);
  511. //计算出session的访问时长和访问步长的范围,并进行相应的累加
  512. long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(
  513. aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH));
  514. long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(
  515. aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH));
  516. calculateVisitLength(visitLength);
  517. calculateStepLength(stepLength);
  518. return true;
  519. }
  520. /**
  521. * 计算访问时长范围
  522. * @param visitLength
  523. */
  524. private void calculateVisitLength(long visitLength) {
  525. if(visitLength >= 1 && visitLength <= 3) {
  526. sessionAggrAccumulator.add(Constants.TIME_PERIOD_1s_3s);
  527. }else if(visitLength >= 4 && visitLength <= 6) {
  528. sessionAggrAccumulator.add(Constants.TIME_PERIOD_4s_6s);
  529. }else if(visitLength >= 7 && visitLength <= 9) {
  530. sessionAggrAccumulator.add(Constants.TIME_PERIOD_7s_9s);
  531. }else if(visitLength >= 10 && visitLength <= 30) {
  532. sessionAggrAccumulator.add(Constants.TIME_PERIOD_10s_30s);
  533. }else if(visitLength > 30 && visitLength <= 60) {
  534. sessionAggrAccumulator.add(Constants.TIME_PERIOD_30s_60s);
  535. }else if(visitLength > 60 && visitLength <= 180) {
  536. sessionAggrAccumulator.add(Constants.TIME_PERIOD_1m_3m);
  537. }else if(visitLength > 180 && visitLength <= 600) {
  538. sessionAggrAccumulator.add(Constants.TIME_PERIOD_3m_10m);
  539. }else if(visitLength > 600 && visitLength <= 1800) {
  540. sessionAggrAccumulator.add(Constants.TIME_PERIOD_10m_30m);
  541. }else if(visitLength > 1800) {
  542. sessionAggrAccumulator.add(Constants.TIME_PERIOD_30m);
  543. }
  544. }
  545. /**
  546. * 计算访问步长范围
  547. * @param stepLength
  548. */
  549. private void calculateStepLength(long stepLength) {
  550. if(stepLength >= 1 && stepLength <= 3) {
  551. sessionAggrAccumulator.add(Constants.STEP_PERIOD_1_3);
  552. }else if(stepLength >= 4 && stepLength <= 6) {
  553. sessionAggrAccumulator.add(Constants.STEP_PERIOD_4_6);
  554. }else if(stepLength >= 7 && stepLength <= 9) {
  555. sessionAggrAccumulator.add(Constants.STEP_PERIOD_7_9);
  556. }else if(stepLength >= 10 && stepLength <= 30) {
  557. sessionAggrAccumulator.add(Constants.STEP_PERIOD_10_30);
  558. }else if(stepLength > 30 && stepLength <= 60) {
  559. sessionAggrAccumulator.add(Constants.STEP_PERIOD_30_60);
  560. }else if(stepLength > 60) {
  561. sessionAggrAccumulator.add(Constants.STEP_PERIOD_60);
  562. }
  563. }
  564. });
  565. return filteredSessionid2AggrInfoRDD;
  566. }
  567. /**
  568. * 获取通过筛选条件的session的访问明细数据RDD
  569. * @param sessionid2aggrInfoRDD
  570. * @param sessionid2actionRDD
  571. * @return
  572. */
  573. private static JavaPairRDD<String, Row> getSessionid2detailRDD(
  574. JavaPairRDD<String, String> sessionid2aggrInfoRDD,
  575. JavaPairRDD<String, Row> sessionid2actionRDD) {
  576. JavaPairRDD<String, Row> sessionid2detailRDD = sessionid2aggrInfoRDD
  577. .join(sessionid2actionRDD)
  578. .mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {
  579. private static final long serialVersionUID = 1L;
  580. public Tuple2<String, Row> call(
  581. Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
  582. return new Tuple2<String, Row>(tuple._1, tuple._2._2);
  583. }
  584. });
  585. return sessionid2detailRDD;
  586. }
  587. /**
  588. * 随机抽取session
  589. * @param sessionid2AggrInfo
  590. */
  591. private static void randomExtractSession(
  592. final long taskid,
  593. JavaPairRDD<String, String> sessionid2AggrInfoRDD,
  594. JavaPairRDD<String, Row> sessionid2actionRDD) {
  595. /**
  596. * 第一步,计算每天每小时的session数量
  597. */
  598. //获取<yyyy-mm-dd_hh,aggrinfo>格式的RDD
  599. JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair(
  600. new PairFunction<Tuple2<String, String>, String, String>(){
  601. private static final long serialVersionUID = 1L;
  602. public Tuple2<String, String> call(
  603. Tuple2<String, String> tuple) throws Exception {
  604. String aggrInfo = tuple._2;
  605. String startTime = StringUtils.getFieldFromConcatString(
  606. aggrInfo, "\\|", Constants.FIELD_START_TIME);
  607. String dateHour = DateUtils.getDateHour(startTime);
  608. return new Tuple2<String, String>(dateHour, aggrInfo);
  609. }
  610. });
  611. //得到每天每小时的session数量
  612. Map<String, Object> countMap = time2sessionidRDD.countByKey();
  613. /**
  614. * 第二步,使用按是按比例随机抽取算法,计算出每小时要抽取session的索引
  615. */
  616. //将<yyyy-mm-dd_hh,count>格式的map转换成<yyyy-mm-dd,<hh,count>>的格式,方便后面使用
  617. Map<String, Map<String, Long>> dateHourCountMap =
  618. new HashMap<String, Map<String, Long>>();
  619. for(Map.Entry<String, Object>countEntry : countMap.entrySet()) {
  620. String dateHour = countEntry.getKey();
  621. String date = dateHour.split("_")[0];
  622. String hour = dateHour.split("_")[1];
  623. long count = Long.valueOf(String.valueOf(countEntry.getValue()));
  624. Map<String, Long> hourCountMap = dateHourCountMap.get(date);
  625. if(hourCountMap == null) {
  626. hourCountMap = new HashMap<String, Long>();
  627. dateHourCountMap.put(date, hourCountMap);
  628. }
  629. hourCountMap.put(hour, count);
  630. }
  631. //开始实现按照时间比例随机抽取算法
  632. //总共抽取100个session,先按照天数平分
  633. int extractNumberPerDay = 100 / dateHourCountMap.size();
  634. //每一天每一个小时抽取session的索引,<date,<hour,(3,5,20,200)>>
  635. final Map<String, Map<String, List<Integer>>> dateHourExtractMap =
  636. new HashMap<String, Map<String, List<Integer>>>();
  637. Random random = new Random();
  638. for(Map.Entry<String, Map<String, Long>> dateHourCountEntry : dateHourCountMap.entrySet()) {
  639. String date = dateHourCountEntry.getKey();
  640. Map<String, Long> hourCountMap = dateHourCountEntry.getValue();
  641. //计算出这一天的session总数
  642. long sessionCount = 0L;
  643. for(long hourCount : hourCountMap.values()) {
  644. sessionCount += hourCount;
  645. }
  646. Map<String, List<Integer>> hourExtractMap = dateHourExtractMap.get(date);
  647. if(hourExtractMap == null) {
  648. hourExtractMap = new HashMap<String, List<Integer>>();
  649. dateHourExtractMap.put(date, hourExtractMap);
  650. }
  651. //遍历每个小时
  652. for(Map.Entry<String, Long>hourCountEntry : hourCountMap.entrySet()) {
  653. String hour = hourCountEntry.getKey();
  654. long count = hourCountEntry.getValue();
  655. //计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量
  656. //就可以算出当前小时所需抽取的session数量
  657. long hourExtractNumber = (int)(((double)count / (double)sessionCount)
  658. * extractNumberPerDay);
  659. if(hourExtractNumber > count) {
  660. hourExtractNumber = (int)count;
  661. }
  662. //先获取当前小时的存放随机数list
  663. List<Integer> extractIndexList = hourExtractMap.get(hour);
  664. if(extractIndexList == null) {
  665. extractIndexList = new ArrayList<Integer>();
  666. hourExtractMap.put(hour, extractIndexList);
  667. }
  668. //生成上面计算出来的数量的随机数
  669. for(int i = 0; i < hourExtractNumber; i++) {
  670. int extractIndex = random.nextInt((int)count);
  671. //生成不重复的索引
  672. while(extractIndexList.contains(extractIndex)) {
  673. extractIndex = random.nextInt((int)count);
  674. }
  675. extractIndexList.add(extractIndex);
  676. }
  677. }
  678. }
  679. /**
  680. * 第三步:遍历每天每小时的session,根据随机索引抽取
  681. */
  682. //执行groupByKey算子,得到<dateHour,(session aggrInfo)>
  683. JavaPairRDD<String, Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();
  684. //我们用flatMap算子遍历所有的<dateHour,(session aggrInfo)>格式的数据
  685. //然后会遍历每天每小时的session
  686. //如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上
  687. //那么抽取该session,直接写入MySQL的random_extract_session表
  688. //将抽取出来的session id返回回来,形成一个新的JavaRDD<String>
  689. //然后最后一步,用抽取出来的sessionid去join它们的访问行为明细数据写入session表
  690. JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(
  691. new PairFlatMapFunction<Tuple2<String, Iterable<String>>, String, String>() {
  692. private static final long serialVersionUID = 1L;
  693. public Iterable<Tuple2<String, String>> call(
  694. Tuple2<String, Iterable<String>> tuple)
  695. throws Exception {
  696. List<Tuple2<String, String>> extractSessionids =
  697. new ArrayList<Tuple2<String, String>>();
  698. String dateHour = tuple._1;
  699. String date = dateHour.split("_")[0];
  700. String hour = dateHour.split("_")[1];
  701. Iterator<String> iterator = tuple._2.iterator();
  702. //拿到这一天这一小时的随机索引
  703. List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);
  704. //先建domain和DAO
  705. //先在包com.erik.sparkproject.domain中新建SessionRandomExtract.java
  706. //然后在包com.erik.sparkproject.dao中新建ISessionRandomExtractDAO.java
  707. //接着在包com.erik.sparkproject.impl中新建SessinoRandomExtractDAOImpl.java
  708. //最后在DAOFactory.java中添加
  709. //public static ISessionRandomExtractDAO getSessionRandomExtractDAO() {
  710. //return new SessinoRandomExtractDAOImpl();
  711. //}
  712. ISessionRandomExtractDAO sessionRandomExtractDAO =
  713. DAOFactory.getSessionRandomExtractDAO();
  714. int index = 0;
  715. while(iterator.hasNext()) {
  716. String sessionAggrInfo = iterator.next();
  717. if(extractIndexList.contains(index)) {
  718. String sessionid = StringUtils.getFieldFromConcatString(
  719. sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
  720. //将数据写入MySQL
  721. SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
  722. //增加参数
  723. //private static void randomExtractSession(
  724. //final long taskid,
  725. //JavaPairRDD<String, String> sessionid2AggrInfoRDD)
  726. sessionRandomExtract.setTaskid(taskid);
  727. sessionRandomExtract.setSessionid(sessionid);
  728. sessionRandomExtract.setSessionid(StringUtils.getFieldFromConcatString(
  729. sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID));
  730. sessionRandomExtract.setSearchKeywords(StringUtils.getFieldFromConcatString(
  731. sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS));
  732. sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(
  733. sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS));
  734. sessionRandomExtractDAO.insert(sessionRandomExtract);
  735. //将sessionid加入list
  736. extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));
  737. }
  738. index ++;
  739. }
  740. return extractSessionids;
  741. }
  742. });
  743. /**
  744. * 第四步:获取抽取出来的session的明细数据
  745. */
  746. JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD =
  747. extractSessionidsRDD.join(sessionid2actionRDD);
  748. extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<String, Row>>>() {
  749. private static final long serialVersionUID = 1L;
  750. public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
  751. //在包com.erik.sparkproject.domain中新建SessionDetail.java
  752. //在包com.erik.sparkproject.dao中新建ISessionDetailDAO.java接口
  753. //在包com.erik.sparkproject.impl中新建SessionDetailDAOImpl.java
  754. //在DAOFactory.java中添加
  755. //public static ISessionDetailDAO getSessionDetailDAO() {
  756. //return new SessionDetailDAOImpl();
  757. //}
  758. Row row = tuple._2._2;
  759. //封装sessionDetail的domain
  760. SessionDetail sessionDetail = new SessionDetail();
  761. sessionDetail.setTaskid(taskid);
  762. sessionDetail.setUserid(row.getLong(1));
  763. sessionDetail.setSessionid(row.getString(2));
  764. sessionDetail.setPageid(row.getLong(3));
  765. sessionDetail.setActionTime(row.getString(4));
  766. sessionDetail.setSearchKeyword(row.getString(5));
  767. sessionDetail.setClickCategoryId(row.getLong(6));
  768. sessionDetail.setClickProductId(row.getLong(7));
  769. sessionDetail.setOrderCategoryIds(row.getString(8));
  770. sessionDetail.setOrderProductIds(row.getString(9));
  771. sessionDetail.setPayCategoryIds(row.getString(10));
  772. sessionDetail.setPayProductIds(row.getString(11));
  773. ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
  774. sessionDetailDAO.insert(sessionDetail);
  775. }
  776. });
  777. }
  778. /*
  779. * 计算各session范围占比,并写入MySQL
  780. */
  781. private static void calculateAndPersistAggrStat(String value, long taskid) {
  782. //从Accumulator统计串中获取值
  783. long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(
  784. value, "\\|", Constants.SESSION_COUNT));
  785. long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(
  786. value, "\\|", Constants.TIME_PERIOD_1s_3s));
  787. long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(
  788. value, "\\|", Constants.TIME_PERIOD_4s_6s));
  789. long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(
  790. value, "\\|", Constants.TIME_PERIOD_7s_9s));
  791. long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(
  792. value, "\\|", Constants.TIME_PERIOD_10s_30s));
  793. long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(
  794. value, "\\|", Constants.TIME_PERIOD_30s_60s));
  795. long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(
  796. value, "\\|", Constants.TIME_PERIOD_1m_3m));
  797. long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(
  798. value, "\\|", Constants.TIME_PERIOD_3m_10m));
  799. long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
  800. value, "\\|", Constants.TIME_PERIOD_10m_30m));
  801. long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
  802. value, "\\|", Constants.TIME_PERIOD_30m));
  803. long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(
  804. value, "\\|", Constants.STEP_PERIOD_1_3));
  805. long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(
  806. value, "\\|", Constants.STEP_PERIOD_4_6));
  807. long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(
  808. value, "\\|", Constants.STEP_PERIOD_7_9));
  809. long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(
  810. value, "\\|", Constants.STEP_PERIOD_10_30));
  811. long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
  812. value, "\\|", Constants.STEP_PERIOD_30_60));
  813. long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
  814. value, "\\|", Constants.STEP_PERIOD_60));
  815. //计算各个访问时长和访问步长的范围
  816. double visit_length_1s_3s_ratio = NumberUtils.formatDouble(
  817. (double)visit_length_1s_3s / (double)session_count, 2);
  818. double visit_length_4s_6s_ratio = NumberUtils.formatDouble(
  819. (double)visit_length_4s_6s / (double)session_count, 2);
  820. double visit_length_7s_9s_ratio = NumberUtils.formatDouble(
  821. (double)visit_length_7s_9s / (double)session_count, 2);
  822. double visit_length_10s_30s_ratio = NumberUtils.formatDouble(
  823. (double)visit_length_10s_30s / (double)session_count, 2);
  824. double visit_length_30s_60s_ratio = NumberUtils.formatDouble(
  825. (double)visit_length_30s_60s / (double)session_count, 2);
  826. double visit_length_1m_3m_ratio = NumberUtils.formatDouble(
  827. (double)visit_length_1m_3m / (double)session_count, 2);
  828. double visit_length_3m_10m_ratio = NumberUtils.formatDouble(
  829. (double)visit_length_3m_10m / (double)session_count, 2);
  830. double visit_length_10m_30m_ratio = NumberUtils.formatDouble(
  831. (double)visit_length_10m_30m / (double)session_count, 2);
  832. double visit_length_30m_ratio = NumberUtils.formatDouble(
  833. (double)visit_length_30m / (double)session_count, 2);
  834. double step_length_1_3_ratio = NumberUtils.formatDouble(
  835. (double)step_length_1_3 / (double)session_count, 2);
  836. double step_length_4_6_ratio = NumberUtils.formatDouble(
  837. (double)step_length_4_6 / (double)session_count, 2);
  838. double step_length_7_9_ratio = NumberUtils.formatDouble(
  839. (double)step_length_7_9 / (double)session_count, 2);
  840. double step_length_10_30_ratio = NumberUtils.formatDouble(
  841. (double)step_length_10_30 / (double)session_count, 2);
  842. double step_length_30_60_ratio = NumberUtils.formatDouble(
  843. (double)step_length_30_60 / (double)session_count, 2);
  844. double step_length_60_ratio = NumberUtils.formatDouble(
  845. (double)step_length_60 / (double)session_count, 2);
  846. //将访问结果封装成Domain对象
  847. SessionAggrStat sessionAggrStat = new SessionAggrStat();
  848. sessionAggrStat.setTaskid(taskid);
  849. sessionAggrStat.setSession_count(session_count);
  850. sessionAggrStat.setVisit_length_1s_3s_ratio(visit_length_1s_3s_ratio);
  851. sessionAggrStat.setVisit_length_4s_6s_ratio(visit_length_4s_6s_ratio);
  852. sessionAggrStat.setVisit_length_7s_9s_ratio(visit_length_7s_9s_ratio);
  853. sessionAggrStat.setVisit_length_10s_30s_ratio(visit_length_10s_30s_ratio);
  854. sessionAggrStat.setVisit_length_30s_60s_ratio(visit_length_30s_60s_ratio);
  855. sessionAggrStat.setVisit_length_1m_3m_ratio(visit_length_1m_3m_ratio);
  856. sessionAggrStat.setVisit_length_3m_10m_ratio(visit_length_3m_10m_ratio);
  857. sessionAggrStat.setVisit_length_10m_30m_ratio(visit_length_10m_30m_ratio);
  858. sessionAggrStat.setVisit_length_30m_ratio(visit_length_30m_ratio);
  859. sessionAggrStat.setStep_length_1_3_ratio(step_length_1_3_ratio);
  860. sessionAggrStat.setStep_length_4_6_ratio(step_length_4_6_ratio);
  861. sessionAggrStat.setStep_length_7_9_ratio(step_length_7_9_ratio);
  862. sessionAggrStat.setStep_length_10_30_ratio(step_length_10_30_ratio);
  863. sessionAggrStat.setStep_length_30_60_ratio(step_length_30_60_ratio);
  864. sessionAggrStat.setStep_length_60_ratio(step_length_60_ratio);
  865. //调用对应的DAO插入统计结果
  866. ISessionAggrStatDAO sessionAggrStatDAO = DAOFactory.getSessionAggrStatDAO();
  867. sessionAggrStatDAO.insert(sessionAggrStat);
  868. }
  869. /**
  870. * 获取Top10热门品类
  871. * @param filteredSessionid2AggrInfoRDD
  872. * @param sessionid2actionRDD
  873. */
  874. private static List<Tuple2<CategorySortKey, String>> getTop10Category(
  875. long taskid,
  876. JavaPairRDD<String, Row> sessionid2detailRDD) {
  877. /**
  878. * 第一步:获取符合条件的session访问过的所有品类
  879. */
  880. //获取session访问过的所有品类id
  881. //访问过指的是点击、下单、支付的品类
  882. JavaPairRDD<Long, Long> categoryidRDD = sessionid2detailRDD.flatMapToPair(
  883. new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>() {
  884. private static final long serialVersionUID = 1L;
  885. public Iterable<Tuple2<Long, Long>> call(
  886. Tuple2<String, Row> tuple) throws Exception {
  887. Row row = tuple._2;
  888. List<Tuple2<Long, Long>> list = new ArrayList<Tuple2<Long, Long>>();
  889. Long clickCategoryId = row.getLong(6);
  890. if(clickCategoryId != null) {
  891. list.add(new Tuple2<Long ,Long>(clickCategoryId, clickCategoryId));
  892. }
  893. String orderCategoryIds = row.getString(8);
  894. if(orderCategoryIds != null) {
  895. String[] orderCategoryIdsSplited = orderCategoryIds.split(",");
  896. for(String orderCategoryId : orderCategoryIdsSplited) {
  897. list.add(new Tuple2<Long, Long>(Long.valueOf(orderCategoryId),
  898. Long.valueOf(orderCategoryId)));
  899. }
  900. }
  901. String payCategoryIds = row.getString(10);
  902. if(payCategoryIds != null) {
  903. String[] payCategoryIdsSplited = payCategoryIds.split(",");
  904. for(String payCategoryId : payCategoryIdsSplited) {
  905. list.add(new Tuple2<Long, Long>(Long.valueOf(payCategoryId),
  906. Long.valueOf(payCategoryId)));
  907. }
  908. }
  909. return list;
  910. }
  911. });
  912. //必须去重
  913. //如果不去重的话,会出现重复的categoryid,排序会对重复的categoryid以及countInfo进行排序
  914. //最后很可能会拿到重复的数据
  915. categoryidRDD = categoryidRDD.distinct();
  916. /**
  917. * 第二步:计算各品类的点击、下单和支付的次数
  918. */
  919. //访问明细中,其中三种访问行为是点击、下单和支付
  920. //分别来计算各品类点击、下单和支付的次数,可以先对访问明细数据进行过滤
  921. //分别过滤点击、下单和支付行为,然后通过map、reduceByKey等算子进行计算
  922. //计算各个品类点击次数
  923. JavaPairRDD<Long, Long> clickCategoryId2CountRDD =
  924. getClickCategoryId2CountRDD(sessionid2detailRDD);
  925. //计算各个品类的下单次数
  926. JavaPairRDD<Long, Long> orderCategoryId2CountRDD =
  927. getOrderCategoryId2CountRDD(sessionid2detailRDD);
  928. //计算各个品类的支付次数
  929. JavaPairRDD<Long, Long> payCategoryId2CountRDD =
  930. getPayCategoryId2CountRDD(sessionid2detailRDD);
  931. /**
  932. * 第三步:join各品类与它的点击、下单和支付的次数
  933. *
  934. * caetoryidRDD中包含了所有符合条件的session访问过的品类id
  935. *
  936. * 上面分别计算出来的三份各品类的点击、下单和支付的次数,可能不是包含所有品类的,
  937. * 比如,有的品类就只是被点击过,但是没有人下单和支付
  938. * 所以,这里就不能使用join操作,而是要使用leftOutJoin操作,
  939. * 就是说,如果categoryidRDD不能join到自己的某个数据,比如点击、下单或支付数据,
  940. * 那么该categoryidRDD还是要保留下来的
  941. * 只不过,没有join到的那个数据就是0了
  942. *
  943. */
  944. JavaPairRDD<Long, String> categoryid2countRDD = joinCategoryAndData(
  945. categoryidRDD, clickCategoryId2CountRDD, orderCategoryId2CountRDD,
  946. payCategoryId2CountRDD);
  947. /**
  948. * 第四步:自定义二次排序key
  949. */
  950. /**
  951. * 第五步:将数据映射成<SortKey,info>格式的RDD,然后进行二次排序(降序)
  952. */
  953. JavaPairRDD<CategorySortKey, String> sortKey2countRDD = categoryid2countRDD.mapToPair(
  954. new PairFunction<Tuple2<Long, String>, CategorySortKey, String>() {
  955. private static final long serialVersionUID = 1L;
  956. public Tuple2<CategorySortKey, String> call(
  957. Tuple2<Long, String> tuple) throws Exception {
  958. String countInfo = tuple._2;
  959. long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
  960. countInfo, "\\|", Constants.FIELD_CLICK_COUNT));
  961. long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
  962. countInfo, "\\|", Constants.FIELD_ORDER_COUNT));
  963. long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
  964. countInfo, "\\|", Constants.FIELD_PAY_COUNT));
  965. CategorySortKey sortKey = new CategorySortKey(clickCount,
  966. orderCount, payCount);
  967. return new Tuple2<CategorySortKey, String>(sortKey, countInfo);
  968. }
  969. });
  970. JavaPairRDD<CategorySortKey, String> sortedCategoryCountRDD =
  971. sortKey2countRDD.sortByKey(false);
  972. /**
  973. * 第六步:用kake(10)取出top10热门品类,并写入MySQL
  974. */
  975. ITop10CategoryDAO top10CategoryDAO = DAOFactory.getTop10CategoryDAO();
  976. List<Tuple2<CategorySortKey, String>> top10CategoryList =
  977. sortedCategoryCountRDD.take(10);
  978. for(Tuple2<CategorySortKey, String> tuple : top10CategoryList) {
  979. String countInfo = tuple._2;
  980. long categoryid = Long.valueOf(StringUtils.getFieldFromConcatString(
  981. countInfo, "\\|", Constants.FIELD_CATEGORY_ID));
  982. long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
  983. countInfo, "\\|", Constants.FIELD_CLICK_COUNT));
  984. long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
  985. countInfo, "\\|", Constants.FIELD_ORDER_COUNT));
  986. long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
  987. countInfo, "\\|", Constants.FIELD_PAY_COUNT));
  988. //封装domain对象
  989. Top10Category category = new Top10Category();
  990. category.setTaskid(taskid);
  991. category.setCategoryid(categoryid);
  992. category.setClickCount(clickCount);
  993. category.setOrderCount(orderCount);
  994. category.setPayCount(payCount);
  995. top10CategoryDAO.insert(category);
  996. }
  997. return top10CategoryList;
  998. }
  999. /**
  1000. * 获取各品类点击次数RDD
  1001. */
  1002. private static JavaPairRDD<Long, Long> getClickCategoryId2CountRDD(
  1003. JavaPairRDD<String, Row> sessionid2detailRDD) {
  1004. JavaPairRDD<String, Row> clickActionRDD = sessionid2detailRDD.filter(
  1005. new Function<Tuple2<String, Row>, Boolean>() {
  1006. private static final long serialVersionUID = 1L;
  1007. public Boolean call(Tuple2<String, Row> tuple) throws Exception {
  1008. Row row = tuple._2;
  1009. return row.get(6) != null ? true : false;
  1010. }
  1011. });
  1012. JavaPairRDD<Long, Long> clickCategoryIdRDD = clickActionRDD.mapToPair(
  1013. new PairFunction<Tuple2<String, Row>, Long, Long>() {
  1014. private static final long serialVersionUID = 1L;
  1015. public Tuple2<Long, Long> call(Tuple2<String, Row> tuple)
  1016. throws Exception {
  1017. long clickCategoryId = tuple._2.getLong(6);
  1018. return new Tuple2<Long, Long>(clickCategoryId, 1L);
  1019. }
  1020. });
  1021. JavaPairRDD<Long, Long> clickCategoryId2CountRDD = clickCategoryIdRDD.reduceByKey(
  1022. new Function2<Long, Long, Long>() {
  1023. private static final long serialVersionUID = 1L;
  1024. public Long call(Long v1, Long v2) throws Exception {
  1025. return v1 + v2;
  1026. }
  1027. });
  1028. return clickCategoryId2CountRDD;
  1029. }
  1030. private static JavaPairRDD<Long, Long> getOrderCategoryId2CountRDD(
  1031. JavaPairRDD<String, Row>sessionid2detailRDD) {
  1032. JavaPairRDD<String, Row> orderActionRDD = sessionid2detailRDD.filter(
  1033. new Function<Tuple2<String, Row>, Boolean>() {
  1034. private static final long serialVersionUID = 1L;
  1035. public Boolean call(Tuple2<String, Row> tuple) throws Exception {
  1036. Row row = tuple._2;
  1037. return row.getString(8) != null ? true : false;
  1038. }
  1039. });
  1040. JavaPairRDD<Long, Long> orderCategoryIdRDD = orderActionRDD.flatMapToPair(
  1041. new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>(){
  1042. private static final long serialVersionUID = 1L;
  1043. public Iterable<Tuple2<Long, Long>> call(Tuple2<String, Row> tuple)
  1044. throws Exception {
  1045. Row row = tuple._2;
  1046. String orderCategoryIds = row.getString(8);
  1047. String[] orderCategoryIdsSplited = orderCategoryIds.split(",");
  1048. List<Tuple2<Long, Long>> list = new ArrayList<Tuple2<Long, Long>>();
  1049. for(String orderCategoryId : orderCategoryIdsSplited) {
  1050. list.add(new Tuple2<Long, Long>(Long.valueOf(orderCategoryId), 1L));
  1051. }
  1052. return list;
  1053. }
  1054. });
  1055. JavaPairRDD<Long, Long> orderCategoryId2CountRDD = orderCategoryIdRDD.reduceByKey(
  1056. new Function2<Long, Long, Long>() {
  1057. private static final long serialVersionUID = 1L;
  1058. public Long call(Long v1, Long v2) throws Exception {
  1059. return v1 + v2;
  1060. }
  1061. });
  1062. return orderCategoryId2CountRDD;
  1063. }
  1064. private static JavaPairRDD<Long, Long> getPayCategoryId2CountRDD(
  1065. JavaPairRDD<String, Row> sessionid2detailRDD) {
  1066. JavaPairRDD<String, Row> payActionRDD = sessionid2detailRDD.filter(
  1067. new Function<Tuple2<String, Row>, Boolean>() {
  1068. private static final long serialVersionUID = 1L;
  1069. public Boolean call(Tuple2<String, Row> tuple) throws Exception {
  1070. Row row = tuple._2;
  1071. return row.getString(10) != null ? true : false;
  1072. }
  1073. });
  1074. JavaPairRDD<Long, Long> payCategoryIdRDD = payActionRDD.flatMapToPair(
  1075. new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>(){
  1076. private static final long serialVersionUID = 1L;
  1077. public Iterable<Tuple2<Long, Long>> call(Tuple2<String, Row> tuple)
  1078. throws Exception {
  1079. Row row = tuple._2;
  1080. String payCategoryIds = row.getString(10);
  1081. String[] payCategoryIdsSplited = payCategoryIds.split(",");
  1082. List<Tuple2<Long, Long>> list = new ArrayList<Tuple2<Long, Long>>();
  1083. for(String payCategoryId : payCategoryIdsSplited) {
  1084. list.add(new Tuple2<Long, Long>(Long.valueOf(payCategoryId), 1L));
  1085. }
  1086. return list;
  1087. }
  1088. });
  1089. JavaPairRDD<Long, Long> payCategoryId2CountRDD = payCategoryIdRDD.reduceByKey(
  1090. new Function2<Long, Long, Long>(){
  1091. private static final long serialVersionUID = 1L;
  1092. public Long call(Long v1, Long v2) throws Exception {
  1093. return v1 + v2;
  1094. }
  1095. });
  1096. return payCategoryId2CountRDD;
  1097. }
  1098. private static JavaPairRDD<Long, String> joinCategoryAndData(
  1099. JavaPairRDD<Long, Long> categoryidRDD,
  1100. JavaPairRDD<Long, Long> clickCategoryId2CountRDD,
  1101. JavaPairRDD<Long, Long> orderCategoryId2CountRDD,
  1102. JavaPairRDD<Long, Long> payCategoryId2CountRDD) {
  1103. //如果使用leftOuterJoin就有可能出现右边那个RDD中join过来时没有值
  1104. //所以Tuple2中的第二个值用Optional<Long>类型就代表可能有值也可能没有值
  1105. JavaPairRDD<Long, Tuple2<Long, Optional<Long>>> tmpJoinRDD =
  1106. categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD);
  1107. JavaPairRDD<Long, String> tmpMapRDD = tmpJoinRDD.mapToPair(
  1108. new PairFunction<Tuple2<Long,Tuple2<Long,Optional<Long>>>, Long, String>() {
  1109. private static final long serialVersionUID = 1L;
  1110. public Tuple2<Long, String> call(
  1111. Tuple2<Long, Tuple2<Long, Optional<Long>>> tuple)
  1112. throws Exception {
  1113. long categoryid = tuple._1;
  1114. Optional<Long> optional = tuple._2._2;
  1115. long clickCount = 0L;
  1116. if(optional.isPresent()) {
  1117. clickCount = optional.get();
  1118. }
  1119. String value = Constants.FIELD_CATEGORY_ID + "=" + categoryid + "|" +
  1120. Constants.FIELD_CLICK_COUNT + "=" + clickCount;
  1121. return new Tuple2<Long, String>(categoryid, value);
  1122. }
  1123. });
  1124. tmpMapRDD = tmpMapRDD.leftOuterJoin(orderCategoryId2CountRDD).mapToPair(
  1125. new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() {
  1126. private static final long serialVersionUID = 1L;
  1127. public Tuple2<Long, String> call(
  1128. Tuple2<Long, Tuple2<String, Optional<Long>>> tuple)
  1129. throws Exception {
  1130. long categoryid = tuple._1;
  1131. String value = tuple._2._1;
  1132. Optional<Long> optional = tuple._2._2;
  1133. long orderCount = 0L;
  1134. if(optional.isPresent()) {
  1135. orderCount = optional.get();
  1136. }
  1137. value = value + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount;
  1138. return new Tuple2<Long, String>(categoryid, value);
  1139. }
  1140. });
  1141. tmpMapRDD = tmpMapRDD.leftOuterJoin(payCategoryId2CountRDD).mapToPair(
  1142. new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() {
  1143. private static final long serialVersionUID = 1L;
  1144. public Tuple2<Long, String> call(
  1145. Tuple2<Long, Tuple2<String, Optional<Long>>> tuple)
  1146. throws Exception {
  1147. long categoryid = tuple._1;
  1148. String value = tuple._2._1;
  1149. Optional<Long> optional = tuple._2._2;
  1150. long payCount = 0L;
  1151. if(optional.isPresent()) {
  1152. payCount = optional.get();
  1153. }
  1154. value = value + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount;
  1155. return new Tuple2<Long, String>(categoryid, value);
  1156. }
  1157. });
  1158. return tmpMapRDD;
  1159. }
  1160. /**
  1161. * 获取top10活跃session
  1162. * @param taskid
  1163. * @param sessionid2detailRDD
  1164. */
  1165. private static void getTop10Session(
  1166. JavaSparkContext sc,
  1167. final long taskid,
  1168. List<Tuple2<CategorySortKey, String>> top10CategoryList,
  1169. JavaPairRDD<String, Row> sessionid2detailRDD) {
  1170. /**
  1171. * 第一步:将top10热门品类的id生成一份RDD
  1172. */
  1173. //处理List
  1174. List<Tuple2<Long, Long>> top10CategoryIdList =
  1175. new ArrayList<Tuple2<Long, Long>>();
  1176. for (Tuple2<CategorySortKey, String> category : top10CategoryList) {
  1177. long categoryid = Long.valueOf(StringUtils.getFieldFromConcatString(
  1178. category._2, "\\|", Constants.FIELD_CATEGORY_ID));
  1179. top10CategoryIdList.add(new Tuple2<Long, Long>(categoryid, categoryid));
  1180. }
  1181. JavaPairRDD<Long, Long> top10CategoryIdRDD =
  1182. sc.parallelizePairs(top10CategoryIdList);
  1183. /**
  1184. * 第二步:计算top10品类被各session点击的次数
  1185. */
  1186. JavaPairRDD<String, Iterable<Row>> sessionid2detailsRDD =
  1187. sessionid2detailRDD.groupByKey();
  1188. JavaPairRDD<Long, String> categoryid2sessionCountRDD = sessionid2detailsRDD.flatMapToPair(
  1189. new PairFlatMapFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
  1190. private static final long serialVersionUID = 1L;
  1191. public Iterable<Tuple2<Long, String>> call(
  1192. Tuple2<String, Iterable<Row>> tuple) throws Exception {
  1193. String sessionid = tuple._1;
  1194. Iterator<Row> iterator = tuple._2.iterator();
  1195. Map<Long, Long> categoryCountMap = new HashMap<Long, Long>();
  1196. //计算出该session对每个品类的点击次数
  1197. while(iterator.hasNext()) {
  1198. Row row = iterator.next();
  1199. if(row.get(6) != null) {
  1200. long categoryid = row.getLong(6);
  1201. Long count = categoryCountMap.get(categoryid);
  1202. if(count == null) {
  1203. count = 0L;
  1204. }
  1205. count ++;
  1206. categoryCountMap.put(categoryid, count);
  1207. }
  1208. }
  1209. //返回结果为<categoryid,session:count>格式
  1210. List<Tuple2<Long, String>> list = new ArrayList<Tuple2<Long, String>>();
  1211. for(Map.Entry<Long, Long> categoryCountEntry : categoryCountMap.entrySet()) {
  1212. long categoryid = categoryCountEntry.getKey();
  1213. long count = categoryCountEntry.getValue();
  1214. String value = sessionid + "," + count;
  1215. list.add(new Tuple2<Long, String>(categoryid, value));
  1216. }
  1217. return list;
  1218. }
  1219. });
  1220. //获取到top10热门品类被各个session点击的次数
  1221. JavaPairRDD<Long, String> top10CategorySessionCountRDD =
  1222. top10CategoryIdRDD.join(categoryid2sessionCountRDD)
  1223. .mapToPair(new PairFunction<Tuple2<Long, Tuple2<Long, String>>, Long, String>() {
  1224. private static final long serialVersionUID = 1L;
  1225. public Tuple2<Long, String> call(
  1226. Tuple2<Long, Tuple2<Long, String>> tuple) throws Exception {
  1227. return new Tuple2<Long, String>(tuple._1, tuple._2._2);
  1228. }
  1229. });
  1230. /**
  1231. * 第三步:分组取topN算法实现,获取每个品类的top10活跃用户
  1232. */
  1233. JavaPairRDD<Long, Iterable<String>> top10CategorySessionCountsRDD =
  1234. top10CategorySessionCountRDD.groupByKey();
  1235. JavaPairRDD<String, String> top10SessionRDD = top10CategorySessionCountsRDD.flatMapToPair(
  1236. new PairFlatMapFunction<Tuple2<Long, Iterable<String>>, String, String>() {
  1237. private static final long serialVersionUID = 1L;
  1238. public Iterable<Tuple2<String, String>> call(
  1239. Tuple2<Long, Iterable<String>> tuple) throws Exception {
  1240. long categoryid = tuple._1;
  1241. Iterator<String> iterator = tuple._2.iterator();
  1242. //定义取TopN的排序数组
  1243. String[] top10Sessions = new String[10];
  1244. while(iterator.hasNext()) {
  1245. String sessionCount = iterator.next();
  1246. long count = Long.valueOf(sessionCount.split(",")[1]);
  1247. //遍历排序数组
  1248. for(int i = 0; i < top10Sessions.length; i++) {
  1249. //如果当前位没有数据,直接将i位数据赋值为当前sessioncount
  1250. if(top10Sessions[i] == null) {
  1251. top10Sessions[i] = sessionCount;
  1252. break;
  1253. }else {
  1254. long _count = Long.valueOf(top10Sessions[i].split(",")[1]);
  1255. //如果sessionCount比i位的sessionCount大
  1256. if(count > _count) {
  1257. //从排序数组最后一位开始,到i位所有数据往后挪一位
  1258. for(int j = 9; j > i; j--) {
  1259. top10Sessions[j] = top10Sessions[j - 1];
  1260. }
  1261. //将i位赋值为sessionCount
  1262. top10Sessions[i] = sessionCount;
  1263. break;
  1264. }
  1265. //如果sessionCount比i位的sessionCount要小,继续外层for循环
  1266. }
  1267. }
  1268. }
  1269. //将数据写入MySQL表
  1270. List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
  1271. for(String sessionCount : top10Sessions) {
  1272. if(sessionCount != null) {
  1273. String sessionid = sessionCount.split(",")[0];
  1274. long count = Long.valueOf(sessionCount.split(",")[1]);
  1275. //将top10session插入MySQL表
  1276. Top10Session top10Session = new Top10Session();
  1277. top10Session.setTaskid(taskid);
  1278. top10Session.setCategoryid(categoryid);
  1279. top10Session.setSessionid(sessionid);
  1280. top10Session.setClickCount(count);
  1281. ITop10SessionDAO top10SessionDAO = DAOFactory.getTop10SessionDAO();
  1282. top10SessionDAO.insert(top10Session);
  1283. list.add(new Tuple2<String, String>(sessionid, sessionid));
  1284. }
  1285. }
  1286. return list;
  1287. }
  1288. });
  1289. /**
  1290. * 第四步:获取top10活跃session的明细数据,并写入MySQL
  1291. */
  1292. JavaPairRDD<String, Tuple2<String, Row>> sessionDetailRDD =
  1293. top10SessionRDD.join(sessionid2detailRDD);
  1294. sessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
  1295. private static final long serialVersionUID = 1L;
  1296. public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
  1297. Row row = tuple._2._2;
  1298. SessionDetail sessionDetail = new SessionDetail();
  1299. sessionDetail.setTaskid(taskid);
  1300. sessionDetail.setUserid(row.getLong(1));
  1301. sessionDetail.setSessionid(row.getString(2));
  1302. sessionDetail.setPageid(row.getLong(3));
  1303. sessionDetail.setActionTime(row.getString(4));
  1304. sessionDetail.setSearchKeyword(row.getString(5));
  1305. sessionDetail.setClickCategoryId(row.getLong(6));
  1306. sessionDetail.setClickProductId(row.getLong(7));
  1307. sessionDetail.setOrderCategoryIds(row.getString(8));
  1308. sessionDetail.setOrderProductIds(row.getString(9));
  1309. sessionDetail.setPayCategoryIds(row.getString(10));
  1310. sessionDetail.setPayProductIds(row.getString(11));
  1311. ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
  1312. sessionDetailDAO.insert(sessionDetail);
  1313. }
  1314. });
  1315. }
  1316. }