应用层
cache aside pattern
步骤
- 查找: 先查缓存,缓存没有查 db, db 查到重新设置到缓存,再响应请求
- 更新: 更新 db,删除缓存
为什么是删除缓存而不是更新缓存
- 如果缓存中的数据是那种需要计算的数据,每次数据更新就需要重新计算一次再设置到缓存中,而该数据的访问却是冷数据,那么就造成性能浪费
- 一种 懒加载 的思路
实时性数据的数据双写不一致
非实时性数据的架构
大数据
storm 的入门案例
安装
-
配置
conf/storm.yaml- 列下主要的点
- 主要要写域名
- 一旦修改了配置,到 zk 中
rmr /storm干掉原来的配置 ```yamlZooKeeper服务器列表
storm.zookeeper.servers:
storm使用的本地文件系统目录(必须存在并且storm进程可读写)
storm.local.dir: “D:/dev_env/apache-storm-2.1.0/data”
supervisor上能够运行workers的端口列表。每个worker占用一个端口,且每个端口只运行一个worker。
通过这项配置可以调整每台机器上运行的worker数。(调整slot数/每机)
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
指定运行大小
to nimbus
nimbus.childopts: “-Xmx1024m”
to supervisor
supervisor.childopts: “-Xmx1024m”
to worker
worker.childopts: “-Xmx768m”
<a name="MClj7"></a>#### 运行> storm nimbus> storm supervisor> storm logviewer // 注意是在 supervisor 上运行> storm ui打开 `localhost:8080`<a name="XR9VB"></a>#### 依赖```xml<dependencies><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.2.3</version></dependency><!-- 解决 本地运行时 java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter --><!-- <dependency>--><!-- <groupId>io.dropwizard.metrics</groupId>--><!-- <artifactId>metrics-core</artifactId>--><!-- <version>3.2.4</version>--><!-- </dependency>--></dependencies><!-- 打包方式 --><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><configuration><createDependencyReducedPom>true</createDependencyReducedPom><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.sf</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.dsa</exclude><exclude>META-INF/*.RSA</exclude><exclude>META-INF/*.rsa</exclude><exclude>META-INF/*.EC</exclude><exclude>META-INF/*.ec</exclude><exclude>META-INF/MSFTSIG.SF</exclude><exclude>META-INF/MSFTSIG.RSA</exclude></excludes></filter></filters><artifactSet><excludes><exclude>org.apache.storm:storm-core</exclude></excludes></artifactSet></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
主要逻辑
/*** 随机句子拓扑** @author lyl*/public class RandomSentenceTopology {/*** 生产句子的 Spout*/public static class SentencesSpout extends BaseRichSpout {/*** 数据发射器*/private SpoutOutputCollector collector;private Random random;/*** open* 对 spout 进行初始化,可以设置线程池、数据库连接池、httpClient 等用于连接数据源** @param map* @param topologyContext* @param spoutOutputCollector 可以利用该对象发送数据出去*/@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;this.random = new Random();}/*** storm 中的 task 会一直调用该方法*/@Overridepublic void nextTuple() {try {TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}// 随机获取一个句子String[] sentences = new String[]{"hello, i am akarin","maybe is silly","but i am lucky"};String sentence = sentences[random.nextInt(sentences.length)];System.err.println("spout 即将发送的句子: " + sentence);// 发送一个 tuple 到下一个 task,无限的 task 组成一个概念 streamthis.collector.emit(new Values(sentence));}/*** 定义 tuple 的 field 名称** @param outputFieldsDeclarer*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("sentence"));}}public static class SplitBolt extends BaseRichBolt {/*** 数据发射器*/private OutputCollector collector;/*** 初始化** @param map* @param topologyContext* @param outputCollector*/@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}/*** storm 的 task 无限调用该方法** @param tuple*/@Overridepublic void execute(Tuple tuple) {// 通过 tuple 的 field 获取内容String sentence = tuple.getStringByField("sentence");// 截取句子,貌似我这里搞错了,切错了String[] words = sentence.split(" ");// 发送 tuple 到下一个 taskfor (String word : words) {this.collector.emit(new Values(word));}}/*** 也是定义 tuple 的 field名称** @param outputFieldsDeclarer*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}}public static class CalculateBolt extends BaseRichBolt {/*** 数据发射器*/private OutputCollector collector;/*** 记录词频*/private Map<String, Integer> countMap = new ConcurrentHashMap<>();/*** 初始化** @param map* @param topologyContext* @param outputCollector*/@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}/*** task 无限调用** @param tuple*/@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Integer count = countMap.get(word);if (Objects.isNull(count)) {count = 1;} else {count++;}countMap.put(word, count);System.err.println("单机统计,word: " + word + " count: " + count);this.collector.emit(new Values(word, count));}/*** 命名 tuple 的各个 field** @param outputFieldsDeclarer*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word", "count"));}}public static void main(String[] args) {// 指定拓扑TopologyBuilder builder = new TopologyBuilder();builder.setSpout(// spout 的名称"randomSentenceSpout",// spout 实现类new RandomSentenceTopology.SentencesSpout(),// executor 个数2);builder.setBolt(// bolt 名字"splitSentenceBolt",// bolt 实现类new RandomSentenceTopology.SplitBolt(),// executor 个数3)// 默认 task 和 executor 个数保持一致,可以设置.setNumTasks(6)// 指定数据上游 spout/bolt;选择消费策略为负载均衡.shuffleGrouping("randomSentenceSpout");builder.setBolt("calculateCountBolt",new RandomSentenceTopology.CalculateBolt(),3).setNumTasks(6)// 指定数据上游 spout/bolt;选择消费策略为根据 tuple field 固定数据流向同一个本 task// 避免在不同的 task 计算了相同的 word.fieldsGrouping("splitSentenceBolt", new Fields("word"));// storm's configConfig config = new Config();// 命令行执行if (args != null && args.length > 0) {// 指定 worker 数量config.setNumWorkers(3);try {StormSubmitter.submitTopology(// 拓扑名称args[0], config, builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {// 本地执行// 本地测试修改并行度,即分配总的 task 个数,让拓扑自己配置config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology(// 拓扑名称"RandomSentenceTopology",config, builder.createTopology());try {TimeUnit.SECONDS.sleep(60);} catch (InterruptedException e) {e.printStackTrace();}cluster.shutdown();}}}
提交任务
- 本地,直接运行即可
- 提交到 storm 集群
- 打包,需要使用
maven-shade-plugin,见 打包方式 提交命令, 分别是 jar 包,入口类全类名,命名(main 函数里面指定要求给拓扑一个名称,即使用 arg[0])
storm jar testStorm-1.0-SNAPSHOT.jar storm.RandomSentenceTopology mytopology
在 storm.ui 或者
storm list可以查看任务运行显示
本地可以看见
单机统计,word: silly count: 22spout 即将发送的句子: maybe is silly单机统计,word: maybe count: 22单机统计,word: is count: 22单机统计,word: maybe count: 23单机统计,word: silly count: 23单机统计,word: is count: 23
提交集群可以到时候看对应 storm ui 中对应拓扑的 worker 的 port 日志
测试项目下载
问题大全
- supervisor.log 中显示
客户端没有所需的特权- win下用管理员权限的 cmd 运行 supervisor,如果不行,就全部都用管理员权限的 cmd
- 点击 storm ui 中 nimbus的 port 查看日志,发现显示
[ERROR] No available slots for topology

- 配置文件中设置 woker 的运行大小
worker.childopts- 咋看日志啊
- 目录下的
/log/workers-artifacts查看各个端口的日志 - 或者 storm ui 上点击对应的 拓扑,到下面的 worker 页面点击对应 port

WorkerResources
Port
suprvisorld
Host
3426834tb023-192.168.1.11
18a28794-37b6-4a8~
6700
- 改了配置没反应啊
- todo
- 这里直接去 zk,用
rmr /storm干掉元数据了
