应用层

cache aside pattern

步骤

  1. 查找: 先查缓存,缓存没有查 db, db 查到重新设置到缓存,再响应请求
  2. 更新: 更新 db,删除缓存

为什么是删除缓存而不是更新缓存

  • 如果缓存中的数据是那种需要计算的数据,每次数据更新就需要重新计算一次再设置到缓存中,而该数据的访问却是冷数据,那么就造成性能浪费
  • 一种 懒加载 的思路

实时性数据的数据双写不一致

缓存系统 - 图3

非实时性数据的架构

缓存系统 - 图4

大数据

缓存系统 - 图5

storm 的入门案例

安装

  • 需要 jdk、zk、python2 环境

    配置

  • conf/storm.yaml

  • 列下主要的点
  • 主要要写域名
  • 一旦修改了配置,到 zk 中 rmr /storm 干掉原来的配置 ```yaml

    ZooKeeper服务器列表

    storm.zookeeper.servers:
    • “localhost”

      nimbus 运行位置

      nimbus.seeds: [“localhost”]

storm使用的本地文件系统目录(必须存在并且storm进程可读写)

storm.local.dir: “D:/dev_env/apache-storm-2.1.0/data”

supervisor上能够运行workers的端口列表。每个worker占用一个端口,且每个端口只运行一个worker。

通过这项配置可以调整每台机器上运行的worker数。(调整slot数/每机)

supervisor.slots.ports:

  • 6700
  • 6701
  • 6702
  • 6703

指定运行大小

to nimbus

nimbus.childopts: “-Xmx1024m”

to supervisor

supervisor.childopts: “-Xmx1024m”

to worker

worker.childopts: “-Xmx768m”

  1. <a name="MClj7"></a>
  2. #### 运行
  3. > storm nimbus
  4. > storm supervisor
  5. > storm logviewer // 注意是在 supervisor 上运行
  6. > storm ui
  7. 打开 `localhost:8080`
  8. <a name="XR9VB"></a>
  9. #### 依赖
  10. ```xml
  11. <dependencies>
  12. <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
  13. <dependency>
  14. <groupId>org.apache.storm</groupId>
  15. <artifactId>storm-core</artifactId>
  16. <version>1.2.3</version>
  17. </dependency>
  18. <!-- 解决 本地运行时 java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter -->
  19. <!-- <dependency>-->
  20. <!-- <groupId>io.dropwizard.metrics</groupId>-->
  21. <!-- <artifactId>metrics-core</artifactId>-->
  22. <!-- <version>3.2.4</version>-->
  23. <!-- </dependency>-->
  24. </dependencies>
  25. <!-- 打包方式 -->
  26. <build>
  27. <plugins>
  28. <plugin>
  29. <groupId>org.apache.maven.plugins</groupId>
  30. <artifactId>maven-shade-plugin</artifactId>
  31. <configuration>
  32. <createDependencyReducedPom>true</createDependencyReducedPom>
  33. <filters>
  34. <filter>
  35. <artifact>*:*</artifact>
  36. <excludes>
  37. <exclude>META-INF/*.SF</exclude>
  38. <exclude>META-INF/*.sf</exclude>
  39. <exclude>META-INF/*.DSA</exclude>
  40. <exclude>META-INF/*.dsa</exclude>
  41. <exclude>META-INF/*.RSA</exclude>
  42. <exclude>META-INF/*.rsa</exclude>
  43. <exclude>META-INF/*.EC</exclude>
  44. <exclude>META-INF/*.ec</exclude>
  45. <exclude>META-INF/MSFTSIG.SF</exclude>
  46. <exclude>META-INF/MSFTSIG.RSA</exclude>
  47. </excludes>
  48. </filter>
  49. </filters>
  50. <artifactSet>
  51. <excludes>
  52. <exclude>org.apache.storm:storm-core</exclude>
  53. </excludes>
  54. </artifactSet>
  55. </configuration>
  56. <executions>
  57. <execution>
  58. <phase>package</phase>
  59. <goals>
  60. <goal>shade</goal>
  61. </goals>
  62. <configuration>
  63. <transformers>
  64. <transformer
  65. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  66. <transformer
  67. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  68. </transformer>
  69. </transformers>
  70. </configuration>
  71. </execution>
  72. </executions>
  73. </plugin>
  74. </plugins>
  75. </build>

主要逻辑

  1. /**
  2. * 随机句子拓扑
  3. *
  4. * @author lyl
  5. */
  6. public class RandomSentenceTopology {
  7. /**
  8. * 生产句子的 Spout
  9. */
  10. public static class SentencesSpout extends BaseRichSpout {
  11. /**
  12. * 数据发射器
  13. */
  14. private SpoutOutputCollector collector;
  15. private Random random;
  16. /**
  17. * open
  18. * 对 spout 进行初始化,可以设置线程池、数据库连接池、httpClient 等用于连接数据源
  19. *
  20. * @param map
  21. * @param topologyContext
  22. * @param spoutOutputCollector 可以利用该对象发送数据出去
  23. */
  24. @Override
  25. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  26. this.collector = spoutOutputCollector;
  27. this.random = new Random();
  28. }
  29. /**
  30. * storm 中的 task 会一直调用该方法
  31. */
  32. @Override
  33. public void nextTuple() {
  34. try {
  35. TimeUnit.MILLISECONDS.sleep(200);
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. // 随机获取一个句子
  40. String[] sentences = new String[]{
  41. "hello, i am akarin",
  42. "maybe is silly",
  43. "but i am lucky"
  44. };
  45. String sentence = sentences[random.nextInt(sentences.length)];
  46. System.err.println("spout 即将发送的句子: " + sentence);
  47. // 发送一个 tuple 到下一个 task,无限的 task 组成一个概念 stream
  48. this.collector.emit(new Values(sentence));
  49. }
  50. /**
  51. * 定义 tuple 的 field 名称
  52. *
  53. * @param outputFieldsDeclarer
  54. */
  55. @Override
  56. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  57. outputFieldsDeclarer.declare(new Fields("sentence"));
  58. }
  59. }
  60. public static class SplitBolt extends BaseRichBolt {
  61. /**
  62. * 数据发射器
  63. */
  64. private OutputCollector collector;
  65. /**
  66. * 初始化
  67. *
  68. * @param map
  69. * @param topologyContext
  70. * @param outputCollector
  71. */
  72. @Override
  73. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  74. this.collector = outputCollector;
  75. }
  76. /**
  77. * storm 的 task 无限调用该方法
  78. *
  79. * @param tuple
  80. */
  81. @Override
  82. public void execute(Tuple tuple) {
  83. // 通过 tuple 的 field 获取内容
  84. String sentence = tuple.getStringByField("sentence");
  85. // 截取句子,貌似我这里搞错了,切错了
  86. String[] words = sentence.split(" ");
  87. // 发送 tuple 到下一个 task
  88. for (String word : words) {
  89. this.collector.emit(new Values(word));
  90. }
  91. }
  92. /**
  93. * 也是定义 tuple 的 field名称
  94. *
  95. * @param outputFieldsDeclarer
  96. */
  97. @Override
  98. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  99. outputFieldsDeclarer.declare(new Fields("word"));
  100. }
  101. }
  102. public static class CalculateBolt extends BaseRichBolt {
  103. /**
  104. * 数据发射器
  105. */
  106. private OutputCollector collector;
  107. /**
  108. * 记录词频
  109. */
  110. private Map<String, Integer> countMap = new ConcurrentHashMap<>();
  111. /**
  112. * 初始化
  113. *
  114. * @param map
  115. * @param topologyContext
  116. * @param outputCollector
  117. */
  118. @Override
  119. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  120. this.collector = outputCollector;
  121. }
  122. /**
  123. * task 无限调用
  124. *
  125. * @param tuple
  126. */
  127. @Override
  128. public void execute(Tuple tuple) {
  129. String word = tuple.getStringByField("word");
  130. Integer count = countMap.get(word);
  131. if (Objects.isNull(count)) {
  132. count = 1;
  133. } else {
  134. count++;
  135. }
  136. countMap.put(word, count);
  137. System.err.println("单机统计,word: " + word + " count: " + count);
  138. this.collector.emit(new Values(word, count));
  139. }
  140. /**
  141. * 命名 tuple 的各个 field
  142. *
  143. * @param outputFieldsDeclarer
  144. */
  145. @Override
  146. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  147. outputFieldsDeclarer.declare(new Fields("word", "count"));
  148. }
  149. }
  150. public static void main(String[] args) {
  151. // 指定拓扑
  152. TopologyBuilder builder = new TopologyBuilder();
  153. builder.setSpout(
  154. // spout 的名称
  155. "randomSentenceSpout",
  156. // spout 实现类
  157. new RandomSentenceTopology.SentencesSpout(),
  158. // executor 个数
  159. 2
  160. );
  161. builder.setBolt(
  162. // bolt 名字
  163. "splitSentenceBolt",
  164. // bolt 实现类
  165. new RandomSentenceTopology.SplitBolt(),
  166. // executor 个数
  167. 3)
  168. // 默认 task 和 executor 个数保持一致,可以设置
  169. .setNumTasks(6)
  170. // 指定数据上游 spout/bolt;选择消费策略为负载均衡
  171. .shuffleGrouping("randomSentenceSpout");
  172. builder.setBolt(
  173. "calculateCountBolt",
  174. new RandomSentenceTopology.CalculateBolt(),
  175. 3)
  176. .setNumTasks(6)
  177. // 指定数据上游 spout/bolt;选择消费策略为根据 tuple field 固定数据流向同一个本 task
  178. // 避免在不同的 task 计算了相同的 word
  179. .fieldsGrouping("splitSentenceBolt", new Fields("word"));
  180. // storm's config
  181. Config config = new Config();
  182. // 命令行执行
  183. if (args != null && args.length > 0) {
  184. // 指定 worker 数量
  185. config.setNumWorkers(3);
  186. try {
  187. StormSubmitter.submitTopology(
  188. // 拓扑名称
  189. args[0], config, builder.createTopology());
  190. } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
  191. e.printStackTrace();
  192. }
  193. } else {
  194. // 本地执行
  195. // 本地测试修改并行度,即分配总的 task 个数,让拓扑自己配置
  196. config.setMaxTaskParallelism(20);
  197. LocalCluster cluster = new LocalCluster();
  198. cluster.submitTopology(
  199. // 拓扑名称
  200. "RandomSentenceTopology",
  201. config, builder.createTopology());
  202. try {
  203. TimeUnit.SECONDS.sleep(60);
  204. } catch (InterruptedException e) {
  205. e.printStackTrace();
  206. }
  207. cluster.shutdown();
  208. }
  209. }
  210. }

提交任务

  • 本地,直接运行即可
  • 提交到 storm 集群
  1. 打包,需要使用 maven-shade-plugin ,见 打包方式
  2. 提交命令, 分别是 jar 包,入口类全类名,命名(main 函数里面指定要求给拓扑一个名称,即使用 arg[0])

    storm jar testStorm-1.0-SNAPSHOT.jar storm.RandomSentenceTopology mytopology

  3. 在 storm.ui 或者 storm list 可以查看任务

    运行显示

  • 本地可以看见

    1. 单机统计,word: silly count: 22
    2. spout 即将发送的句子: maybe is silly
    3. 单机统计,word: maybe count: 22
    4. 单机统计,word: is count: 22
    5. 单机统计,word: maybe count: 23
    6. 单机统计,word: silly count: 23
    7. 单机统计,word: is count: 23
  • 提交集群可以到时候看对应 storm ui 中对应拓扑的 worker 的 port 日志

    测试项目下载

    testStorm.zip

问题大全

  1. supervisor.log 中显示 客户端没有所需的特权
    1. win下用管理员权限的 cmd 运行 supervisor,如果不行,就全部都用管理员权限的 cmd
  2. 点击 storm ui 中 nimbus的 port 查看日志,发现显示[ERROR] No available slots for topology

image.png

  1. 配置文件中设置 woker 的运行大小 worker.childopts
    1. 咋看日志啊
  2. 目录下的 /log/workers-artifacts 查看各个端口的日志
  3. 或者 storm ui 上点击对应的 拓扑,到下面的 worker 页面点击对应 port image.png

WorkerResources
Port
suprvisorld
Host
3426834tb023-192.168.1.11
18a28794-37b6-4a8~
6700

  1. 改了配置没反应啊
    1. todo
    2. 这里直接去 zk,用 rmr /storm 干掉元数据了

缓存冷启动和预热

缓存系统 - 图8

热数据

缓存系统 - 图9

预防缓存雪崩

缓存系统 - 图10