应用层
cache aside pattern
步骤
- 查找: 先查缓存,缓存没有查 db, db 查到重新设置到缓存,再响应请求
- 更新: 更新 db,删除缓存
为什么是删除缓存而不是更新缓存
- 如果缓存中的数据是那种需要计算的数据,每次数据更新就需要重新计算一次再设置到缓存中,而该数据的访问却是冷数据,那么就造成性能浪费
- 一种 懒加载 的思路
实时性数据的数据双写不一致
非实时性数据的架构
大数据
storm 的入门案例
安装
-
配置
conf/storm.yaml
- 列下主要的点
- 主要要写域名
- 一旦修改了配置,到 zk 中
rmr /storm
干掉原来的配置 ```yamlZooKeeper服务器列表
storm.zookeeper.servers:
storm使用的本地文件系统目录(必须存在并且storm进程可读写)
storm.local.dir: “D:/dev_env/apache-storm-2.1.0/data”
supervisor上能够运行workers的端口列表。每个worker占用一个端口,且每个端口只运行一个worker。
通过这项配置可以调整每台机器上运行的worker数。(调整slot数/每机)
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
指定运行大小
to nimbus
nimbus.childopts: “-Xmx1024m”
to supervisor
supervisor.childopts: “-Xmx1024m”
to worker
worker.childopts: “-Xmx768m”
<a name="MClj7"></a>
#### 运行
> storm nimbus
> storm supervisor
> storm logviewer // 注意是在 supervisor 上运行
> storm ui
打开 `localhost:8080`
<a name="XR9VB"></a>
#### 依赖
```xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.3</version>
</dependency>
<!-- 解决 本地运行时 java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter -->
<!-- <dependency>-->
<!-- <groupId>io.dropwizard.metrics</groupId>-->
<!-- <artifactId>metrics-core</artifactId>-->
<!-- <version>3.2.4</version>-->
<!-- </dependency>-->
</dependencies>
<!-- 打包方式 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>org.apache.storm:storm-core</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
主要逻辑
/**
* 随机句子拓扑
*
* @author lyl
*/
public class RandomSentenceTopology {
/**
* 生产句子的 Spout
*/
public static class SentencesSpout extends BaseRichSpout {
/**
* 数据发射器
*/
private SpoutOutputCollector collector;
private Random random;
/**
* open
* 对 spout 进行初始化,可以设置线程池、数据库连接池、httpClient 等用于连接数据源
*
* @param map
* @param topologyContext
* @param spoutOutputCollector 可以利用该对象发送数据出去
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.random = new Random();
}
/**
* storm 中的 task 会一直调用该方法
*/
@Override
public void nextTuple() {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 随机获取一个句子
String[] sentences = new String[]{
"hello, i am akarin",
"maybe is silly",
"but i am lucky"
};
String sentence = sentences[random.nextInt(sentences.length)];
System.err.println("spout 即将发送的句子: " + sentence);
// 发送一个 tuple 到下一个 task,无限的 task 组成一个概念 stream
this.collector.emit(new Values(sentence));
}
/**
* 定义 tuple 的 field 名称
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
public static class SplitBolt extends BaseRichBolt {
/**
* 数据发射器
*/
private OutputCollector collector;
/**
* 初始化
*
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
/**
* storm 的 task 无限调用该方法
*
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
// 通过 tuple 的 field 获取内容
String sentence = tuple.getStringByField("sentence");
// 截取句子,貌似我这里搞错了,切错了
String[] words = sentence.split(" ");
// 发送 tuple 到下一个 task
for (String word : words) {
this.collector.emit(new Values(word));
}
}
/**
* 也是定义 tuple 的 field名称
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
public static class CalculateBolt extends BaseRichBolt {
/**
* 数据发射器
*/
private OutputCollector collector;
/**
* 记录词频
*/
private Map<String, Integer> countMap = new ConcurrentHashMap<>();
/**
* 初始化
*
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
/**
* task 无限调用
*
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = countMap.get(word);
if (Objects.isNull(count)) {
count = 1;
} else {
count++;
}
countMap.put(word, count);
System.err.println("单机统计,word: " + word + " count: " + count);
this.collector.emit(new Values(word, count));
}
/**
* 命名 tuple 的各个 field
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) {
// 指定拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(
// spout 的名称
"randomSentenceSpout",
// spout 实现类
new RandomSentenceTopology.SentencesSpout(),
// executor 个数
2
);
builder.setBolt(
// bolt 名字
"splitSentenceBolt",
// bolt 实现类
new RandomSentenceTopology.SplitBolt(),
// executor 个数
3)
// 默认 task 和 executor 个数保持一致,可以设置
.setNumTasks(6)
// 指定数据上游 spout/bolt;选择消费策略为负载均衡
.shuffleGrouping("randomSentenceSpout");
builder.setBolt(
"calculateCountBolt",
new RandomSentenceTopology.CalculateBolt(),
3)
.setNumTasks(6)
// 指定数据上游 spout/bolt;选择消费策略为根据 tuple field 固定数据流向同一个本 task
// 避免在不同的 task 计算了相同的 word
.fieldsGrouping("splitSentenceBolt", new Fields("word"));
// storm's config
Config config = new Config();
// 命令行执行
if (args != null && args.length > 0) {
// 指定 worker 数量
config.setNumWorkers(3);
try {
StormSubmitter.submitTopology(
// 拓扑名称
args[0], config, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
// 本地执行
// 本地测试修改并行度,即分配总的 task 个数,让拓扑自己配置
config.setMaxTaskParallelism(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(
// 拓扑名称
"RandomSentenceTopology",
config, builder.createTopology());
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.shutdown();
}
}
}
提交任务
- 本地,直接运行即可
- 提交到 storm 集群
- 打包,需要使用
maven-shade-plugin
,见 打包方式 提交命令, 分别是 jar 包,入口类全类名,命名(main 函数里面指定要求给拓扑一个名称,即使用 arg[0])
storm jar testStorm-1.0-SNAPSHOT.jar storm.RandomSentenceTopology mytopology
在 storm.ui 或者
storm list
可以查看任务运行显示
本地可以看见
单机统计,word: silly count: 22
spout 即将发送的句子: maybe is silly
单机统计,word: maybe count: 22
单机统计,word: is count: 22
单机统计,word: maybe count: 23
单机统计,word: silly count: 23
单机统计,word: is count: 23
提交集群可以到时候看对应 storm ui 中对应拓扑的 worker 的 port 日志
测试项目下载
问题大全
- supervisor.log 中显示
客户端没有所需的特权
- win下用管理员权限的 cmd 运行 supervisor,如果不行,就全部都用管理员权限的 cmd
- 点击 storm ui 中 nimbus的 port 查看日志,发现显示
[ERROR] No available slots for topology
- 配置文件中设置 woker 的运行大小
worker.childopts
- 咋看日志啊
- 目录下的
/log/workers-artifacts
查看各个端口的日志 - 或者 storm ui 上点击对应的 拓扑,到下面的 worker 页面点击对应 port
WorkerResources
Port
suprvisorld
Host
3426834tb023-192.168.1.11
18a28794-37b6-4a8~
6700
- 改了配置没反应啊
- todo
- 这里直接去 zk,用
rmr /storm
干掉元数据了