求和案例
需求:
1+2+3+4+5+6+….=???
实现方案:
Spout发送数据作为input
使用Bolt来处理业务:求和
将结果输出到控制台
拓扑设计:
DataSourceSpout —>SumBolt
1. 定义数据源代码——Spout
建立一个类继承BaseRichSpout,并重写open、nextTuple、declareOutputFields方法
定义SpoutOutputCollector,因为nextTuple方法会用到。
private SpoutOutputCollector spoutOutputCollector;
- 编写open方法
this.spoutOutputCollector = spoutOutputCollector;
- 编写nextTuple方法,并创建一个整型number 模拟获取数据。
this.spoutOutputCollector.emit(new Values(++number));
- 编写declareOutputFields方法声明输出字段
outputFieldsDeclarer.declare(new Fields("num"));
完整代码:
/**
* Spout需要继承BaseRiceSpout
* 数据源需要产生数据并发射
*/
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector spoutOutputCollector;
/**
* 初始化方法 ,是会被调用一次
* @param map 配置参数
* @param topologyContext 上下文
* @param spoutOutputCollector
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
int number = 0 ;
/**
* 会产生数据,在生产上肯定是从消息队列中获取数据
* 死循环,一直不停的执行
*/
@Override
public void nextTuple() {
this.spoutOutputCollector.emit(new Values(++number));
System.out.println("Spout:"+number);
//防止数据产生太快
Utils.sleep(1000);
}
/**
* 声明输出字段
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
/**
* 因为上面的this.spoutOutputCollector.emit(new Values(++number)); number是一个
* 所以num也是一个
* 如果是:
* this.spoutOutputCollector.emit(new Values(++number,number2));
* outputFieldsDeclarer.declare(new Fields("num","num2"));
*/
outputFieldsDeclarer.declare(new Fields("num"));
}
}
注意:
因为上面的this.spoutOutputCollector.emit(new Values(number++)); number是一个,所以num也是一个,
如果是:
this.spoutOutputCollector.emit(new Values(number++,number2));
那么:
outputFieldsDeclarer.declare(new Fields("num","num2"));
2. 定义Bolt
- 建立一个类继承BaseRichBolt,并重写prepare、execute、declareOutputFields方法(和Spout类似)
- 不需要像Spout一样声明OutputCollector,因为本案例中不需要往下执行了。在一个Bolt中就可以操作了。
- 编辑execute方法,通过tuple.getIntegerByField(“num”)获取上一步传入的值。num与上一步定义的一样
- 不需要往下一个Bolt走,无需操作declareOutputFields方法
完整代码:
/**
* 定义Bolt
*/
public static class SumBolt extends BaseRichBolt {
/**
* 初始化方法
* @param map 配置
* @param topologyContext 上下文
* @param outputCollector 往下传需要用到
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
int sum = 0 ;
/**
* 也是死循环 ,职责:获取spout发送过来的数据,
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
//Bolt中获取值,可以根据index获取,也可以根据上一个环节中定义的Field获取(建议使用Field)
Integer value = tuple.getIntegerByField("num");
sum +=value;
System.out.println("[Bolt]Sum:"+sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
3.Topology提交功能
使用本地模式:
本地模式可模拟过程中的Storm集群,对于开发和测试topologies非常有用。在本地模式下运行topologies 类似于运行topologies 。(在本地运行时会发现日志打印出了关于ZooKeeper的信息,说明它在本地通过ZooKeeper模拟了Storm集群)
相关文档
http://storm.apache.org/releases/1.2.3/Local-mode.html (1.1.1文档没了,1.2.3差不多)
http://storm.apache.org/releases/1.2.3/Running-topologies-on-a-production-cluster.html
步骤:
- 创建一个本地Storm集群
LocalCluster cluster = new LocalCluster();
关联Spout和Bolt
- TopologyBuilder根据Spout和Bolt来构建出Topology
- Strom中任何一个作业都是通过Topology的方式提交的
- Topology中需要指定Spout和Bolt的执行顺序
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout());
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
SumBolt**DataSourceSpout**.shuffleGrouping(“DataSourceSpout”);
- 使用cluster.submitTopology()方法将拓扑提交到集群
cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
完整代码:
/**
* Topology提交功能
*/
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout());
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
/**
* 创建一个本地的storm集群,本地模式不需要搭建Storm集群
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
}
运行:
E:\DEVELOP\JDK8\bin\java.exe "-javaagent:E:\DEVELOP\JetBrains\IntelliJ IDEA
......
......
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=C:\Users\love3\AppData\Local\Temp\
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA>
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Windows 10
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=amd64
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=10.0
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=love3
12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server
......
......
......
70856 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Preparing bolt __system:(-1)
[Bolt]Sum:1
70859 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Prepared bolt __system:(-1)
Spout:2
[Bolt]Sum:3
Spout:3
[Bolt]Sum:6
Spout:4
[Bolt]Sum:10
Spout:5
[Bolt]Sum:15
Spout:6
[Bolt]Sum:21
Spout:7
[Bolt]Sum:28
......