求和案例

需求:

1+2+3+4+5+6+….=???

实现方案:

Spout发送数据作为input
使用Bolt来处理业务:求和
将结果输出到控制台

拓扑设计:

DataSourceSpout —>SumBolt

1. 定义数据源代码——Spout

  1. 建立一个类继承BaseRichSpout,并重写opennextTupledeclareOutputFields方法

  2. 定义SpoutOutputCollector,因为nextTuple方法会用到。

    1. private SpoutOutputCollector spoutOutputCollector;
  1. 编写open方法
    1. this.spoutOutputCollector = spoutOutputCollector;
  1. 编写nextTuple方法,并创建一个整型number 模拟获取数据。
    1. this.spoutOutputCollector.emit(new Values(++number));
  1. 编写declareOutputFields方法声明输出字段
    1. outputFieldsDeclarer.declare(new Fields("num"));

完整代码:

  1. /**
  2. * Spout需要继承BaseRiceSpout
  3. * 数据源需要产生数据并发射
  4. */
  5. public static class DataSourceSpout extends BaseRichSpout{
  6. private SpoutOutputCollector spoutOutputCollector;
  7. /**
  8. * 初始化方法 ,是会被调用一次
  9. * @param map 配置参数
  10. * @param topologyContext 上下文
  11. * @param spoutOutputCollector
  12. */
  13. @Override
  14. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  15. this.spoutOutputCollector = spoutOutputCollector;
  16. }
  17. int number = 0 ;
  18. /**
  19. * 会产生数据,在生产上肯定是从消息队列中获取数据
  20. * 死循环,一直不停的执行
  21. */
  22. @Override
  23. public void nextTuple() {
  24. this.spoutOutputCollector.emit(new Values(++number));
  25. System.out.println("Spout:"+number);
  26. //防止数据产生太快
  27. Utils.sleep(1000);
  28. }
  29. /**
  30. * 声明输出字段
  31. * @param outputFieldsDeclarer
  32. */
  33. @Override
  34. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  35. /**
  36. * 因为上面的this.spoutOutputCollector.emit(new Values(++number)); number是一个
  37. * 所以num也是一个
  38. * 如果是:
  39. * this.spoutOutputCollector.emit(new Values(++number,number2));
  40. * outputFieldsDeclarer.declare(new Fields("num","num2"));
  41. */
  42. outputFieldsDeclarer.declare(new Fields("num"));
  43. }
  44. }

注意:

  1. 因为上面的this.spoutOutputCollector.emit(new Values(number++)); number是一个,所以num也是一个,
  2. 如果是:
  3. this.spoutOutputCollector.emit(new Values(number++,number2));
  4. 那么:
  5. outputFieldsDeclarer.declare(new Fields("num","num2"));

2. 定义Bolt

  1. 建立一个类继承BaseRichBolt,并重写prepareexecutedeclareOutputFields方法(和Spout类似
  2. 不需要像Spout一样声明OutputCollector,因为本案例中不需要往下执行了。在一个Bolt中就可以操作了。
  3. 编辑execute方法,通过tuple.getIntegerByField(“num”)获取上一步传入的值。num与上一步定义的一样
  4. 不需要往下一个Bolt走,无需操作declareOutputFields方法

完整代码:

  1. /**
  2. * 定义Bolt
  3. */
  4. public static class SumBolt extends BaseRichBolt {
  5. /**
  6. * 初始化方法
  7. * @param map 配置
  8. * @param topologyContext 上下文
  9. * @param outputCollector 往下传需要用到
  10. */
  11. @Override
  12. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  13. }
  14. int sum = 0 ;
  15. /**
  16. * 也是死循环 ,职责:获取spout发送过来的数据,
  17. * @param tuple
  18. */
  19. @Override
  20. public void execute(Tuple tuple) {
  21. //Bolt中获取值,可以根据index获取,也可以根据上一个环节中定义的Field获取(建议使用Field)
  22. Integer value = tuple.getIntegerByField("num");
  23. sum +=value;
  24. System.out.println("[Bolt]Sum:"+sum);
  25. }
  26. @Override
  27. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  28. }
  29. }

3.Topology提交功能

使用本地模式:

本地模式可模拟过程中的Storm集群,对于开发和测试topologies非常有用。在本地模式下运行topologies 类似于运行topologies 。(在本地运行时会发现日志打印出了关于ZooKeeper的信息,说明它在本地通过ZooKeeper模拟了Storm集群)

相关文档

http://storm.apache.org/releases/1.2.3/Local-mode.html (1.1.1文档没了,1.2.3差不多)

http://storm.apache.org/releases/1.2.3/Running-topologies-on-a-production-cluster.html

步骤:

  1. 创建一个本地Storm集群
    1. LocalCluster cluster = new LocalCluster();
  1. 关联Spout和Bolt

    • TopologyBuilder根据SpoutBolt来构建出Topology
    • Strom中任何一个作业都是通过Topology的方式提交的
    • Topology中需要指定SpoutBolt执行顺序
      1. TopologyBuilder builder = new TopologyBuilder();
      2. builder.setSpout("DataSourceSpout",new DataSourceSpout());
      3. builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");

SumBolt**DataSourceSpout**.shuffleGrouping(“DataSourceSpout”);

  1. 使用cluster.submitTopology()方法将拓扑提交到集群
    1. cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());

完整代码:

  1. /**
  2. * Topology提交功能
  3. */
  4. public static void main(String[] args) {
  5. TopologyBuilder builder = new TopologyBuilder();
  6. builder.setSpout("DataSourceSpout",new DataSourceSpout());
  7. builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
  8. /**
  9. * 创建一个本地的storm集群,本地模式不需要搭建Storm集群
  10. */
  11. LocalCluster cluster = new LocalCluster();
  12. cluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
  13. }

运行:

  1. E:\DEVELOP\JDK8\bin\java.exe "-javaagent:E:\DEVELOP\JetBrains\IntelliJ IDEA
  2. ......
  3. ......
  4. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=C:\Users\love3\AppData\Local\Temp\
  5. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA>
  6. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Windows 10
  7. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=amd64
  8. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=10.0
  9. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=love3
  10. 12953 [main] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Server
  11. ......
  12. ......
  13. ......
  14. 70856 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Preparing bolt __system:(-1)
  15. [Bolt]Sum:1
  16. 70859 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.d.executor - Prepared bolt __system:(-1)
  17. Spout:2
  18. [Bolt]Sum:3
  19. Spout:3
  20. [Bolt]Sum:6
  21. Spout:4
  22. [Bolt]Sum:10
  23. Spout:5
  24. [Bolt]Sum:15
  25. Spout:6
  26. [Bolt]Sum:21
  27. Spout:7
  28. [Bolt]Sum:28
  29. ......