数据获取

  1. package com.jdxia.ack;
  2. import backtype.storm.spout.SpoutOutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichSpout;
  6. import backtype.storm.tuple.Fields;
  7. import backtype.storm.tuple.Values;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.UUID;
  11. public class AckSpout extends BaseRichSpout {
  12. private SpoutOutputCollector collector;
  13. //初始化方法
  14. @Override
  15. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  16. this.collector = collector;
  17. }
  18. //循环调用,每调用一次就发送一条消息
  19. @Override
  20. public void nextTuple() {
  21. //随机生产一条数据
  22. String uuid = UUID.randomUUID().toString().replace("_", " ");
  23. collector.emit(new Values(uuid), new Values(uuid));
  24. try {
  25. Thread.sleep(10 * 1000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. //定义发送的字段
  31. @Override
  32. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  33. declarer.declare(new Fields("uuid"));
  34. }
  35. @Override
  36. public void ack(Object msgId) {
  37. System.out.println("消息处理成功" + msgId);
  38. }
  39. @Override
  40. public void fail(Object msgId) {
  41. System.out.println("消息处理失败" + msgId);
  42. //重新发送消息
  43. collector.emit((List) msgId, msgId);
  44. }
  45. }

数据处理


  1. package com.jdxia.ack;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichBolt;
  6. import backtype.storm.tuple.Fields;
  7. import backtype.storm.tuple.Tuple;
  8. import backtype.storm.tuple.Values;
  9. import java.util.Map;
  10. public class Bolt1 extends BaseRichBolt {
  11. private OutputCollector collecter;
  12. //初始化方法只调用一次
  13. @Override
  14. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  15. this.collecter = collector;
  16. }
  17. //被循环调用的处理方法
  18. @Override
  19. public void execute(Tuple input) {
  20. /**
  21. * input
  22. * source: mySpout:5, stream: default, id: {347024301319508839=6813457638891944298}, [d679ad9f-2ab1-4ed0-bd34-a87a5ec00bdd]
  23. *
  24. * input.getString(0)
  25. * d679ad9f-2ab1-4ed0-bd34-a87a5ec00bdd
  26. */
  27. collecter.emit(input, new Values(input.getString(0)));
  28. System.out.println("bolt1的execute方法被调用一次" + input.getString(0));
  29. //告诉spout处理成功了
  30. // collecter.ack(input);
  31. //告诉spout处理失败了
  32. collecter.fail(input);
  33. }
  34. @Override
  35. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  36. declarer.declare(new Fields("uuid"));
  37. }
  38. }

  1. package com.jdxia.ack;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichBolt;
  6. import backtype.storm.tuple.Fields;
  7. import backtype.storm.tuple.Tuple;
  8. import backtype.storm.tuple.Values;
  9. import java.util.Map;
  10. public class Bolt2 extends BaseRichBolt {
  11. private OutputCollector collecter;
  12. //初始化方法只调用一次
  13. @Override
  14. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  15. this.collecter = collector;
  16. }
  17. //被循环调用的处理方法
  18. @Override
  19. public void execute(Tuple input) {
  20. collecter.emit(input, new Values(input.getString(0)));
  21. System.out.println("bolt2的execute方法被调用一次" + input.getString(0));
  22. //告诉spout处理成功了
  23. collecter.ack(input);
  24. }
  25. @Override
  26. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  27. declarer.declare(new Fields("uuid"));
  28. }
  29. }

任务编排

  1. package com.jdxia.ack;
  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.generated.StormTopology;
  5. import backtype.storm.topology.TopologyBuilder;
  6. public class AckTopologyDriver {
  7. public static void main(String[] args) {
  8. //1. 准备任务信息
  9. TopologyBuilder topologyBuilder = new TopologyBuilder();
  10. topologyBuilder.setSpout("mySpout", new AckSpout(), 1);
  11. topologyBuilder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("mySpout");
  12. topologyBuilder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1");
  13. //2. 任务提交
  14. Config config = new Config();
  15. StormTopology stormTopology = topologyBuilder.createTopology();
  16. //本地模式
  17. LocalCluster localCluster = new LocalCluster();
  18. localCluster.submitTopology("ack", config, stormTopology);
  19. }
  20. }