求和案例
需求:
1+2+3+4+5+6+….=???
实现方案:
Spout发送数据作为input
使用Bolt来处理业务:求和
将结果输出到控制台
拓扑设计:
DataSourceSpout —>SumBolt
1. 定义数据源代码——Spout
建立一个类继承BaseRichSpout,并重写open、nextTuple、declareOutputFields方法
定义SpoutOutputCollector,因为nextTuple方法会用到。
private SpoutOutputCollector spoutOutputCollector;
- 编写open方法
this.spoutOutputCollector = spoutOutputCollector;
- 编写nextTuple方法,并创建一个整型number 模拟获取数据。
this.spoutOutputCollector.emit(new Values(++number));
- 编写declareOutputFields方法声明输出字段
outputFieldsDeclarer.declare(new Fields("num"));
完整代码:
/*** Spout需要继承BaseRiceSpout* 数据源需要产生数据并发射*/public static class DataSourceSpout extends BaseRichSpout{private SpoutOutputCollector spoutOutputCollector;/*** 初始化方法 ,是会被调用一次* @param map 配置参数* @param topologyContext 上下文* @param spoutOutputCollector*/@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}int number = 0 ;/*** 会产生数据,在生产上肯定是从消息队列中获取数据* 死循环,一直不停的执行*/@Overridepublic void nextTuple() {this.spoutOutputCollector.emit(new Values(++number));System.out.println("Spout:"+number);//防止数据产生太快Utils.sleep(1000);}/*** 声明输出字段* @param outputFieldsDeclarer*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {/*** 因为上面的this.spoutOutputCollector.emit(new Values(++number)); number是一个* 所以num也是一个* 如果是:* this.spoutOutputCollector.emit(new Values(++number,number2));* outputFieldsDeclarer.declare(new Fields("num","num2"));*/outputFieldsDeclarer.declare(new Fields("num"));}}
注意:
因为上面的this.spoutOutputCollector.emit(new Values(number++)); number是一个,所以num也是一个,如果是:this.spoutOutputCollector.emit(new Values(number++,number2));那么:outputFieldsDeclarer.declare(new Fields("num","num2"));
2. 定义Bolt
- 建立一个类继承BaseRichBolt,并重写prepare、execute、declareOutputFields方法(和Spout类似)
- 不需要像Spout一样声明OutputCollector,因为本案例中不需要往下执行了。在一个Bolt中就可以操作了。
- 编辑execute方法,通过tuple.getIntegerByField(“num”)获取上一步传入的值。num与上一步定义的一样
- 不需要往下一个Bolt走,无需操作declareOutputFields方法
完整代码:
/*** 定义Bolt*/public static class SumBolt extends BaseRichBolt {/*** 初始化方法* @param map 配置* @param topologyContext 上下文* @param outputCollector 往下传需要用到*/@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}int sum = 0 ;/*** 也是死循环 ,职责:获取spout发送过来的数据,* @param tuple*/@Overridepublic void execute(Tuple tuple) {//Bolt中获取值,可以根据index获取,也可以根据上一个环节中定义的Field获取(建议使用Field)Integer value = tuple.getIntegerByField("num");sum +=value;System.out.println("[Bolt]Sum:"+sum);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}
3.Topology提交功能
使用本地模式:
本地模式可模拟过程中的Storm集群,对于开发和测试topologies非常有用。在本地模式下运行topologies 类似于运行topologies 。(在本地运行时会发现日志打印出了关于ZooKeeper的信息,说明它在本地通过ZooKeeper模拟了Storm集群)
相关文档
http://storm.apache.org/releases/1.2.3/Local-mode.html (1.1.1文档没了,1.2.3差不多)
http://storm.apache.org/releases/1.2.3/Running-topologies-on-a-production-cluster.html
步骤:
- 创建一个本地Storm集群
LocalCluster cluster = new LocalCluster();
关联Spout和Bolt
- TopologyBuilder根据Spout和Bolt来构建出Topology
- Strom中任何一个作业都是通过Topology的方式提交的
- Topology中需要指定Spout和Bolt的执行顺序
TopologyBuilder builder = new TopologyBuilder();builder.setSpout("DataSourceSpout",new DataSourceSpout());builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
SumBolt**DataSourceSpout**.shuffleGrouping(“DataSourceSpout”);
- 使用cluster.submitTopology()方法将拓扑提交到集群
cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
完整代码:
/*** Topology提交功能*/public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("DataSourceSpout",new DataSourceSpout());builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");/*** 创建一个本地的storm集群,本地模式不需要搭建Storm集群*/LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());}
运行:
E:\DEVELOP\JDK8\bin\java.exe "-javaagent:E:\DEVELOP\JetBrains\IntelliJ IDEA............12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=C:\Users\love3\AppData\Local\Temp\12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA>12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Windows 1012953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=amd6412953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=10.012953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=love312953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server..................70856 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Preparing bolt __system:(-1)[Bolt]Sum:170859 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Prepared bolt __system:(-1)Spout:2[Bolt]Sum:3Spout:3[Bolt]Sum:6Spout:4[Bolt]Sum:10Spout:5[Bolt]Sum:15Spout:6[Bolt]Sum:21Spout:7[Bolt]Sum:28......
