数据获取
package com.jdxia.ack;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class AckSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
//初始化方法
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
//循环调用,每调用一次就发送一条消息
@Override
public void nextTuple() {
//随机生产一条数据
String uuid = UUID.randomUUID().toString().replace("_", " ");
collector.emit(new Values(uuid), new Values(uuid));
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//定义发送的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uuid"));
}
@Override
public void ack(Object msgId) {
System.out.println("消息处理成功" + msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("消息处理失败" + msgId);
//重新发送消息
collector.emit((List) msgId, msgId);
}
}
数据处理
package com.jdxia.ack;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class Bolt1 extends BaseRichBolt {
private OutputCollector collecter;
//初始化方法只调用一次
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collecter = collector;
}
//被循环调用的处理方法
@Override
public void execute(Tuple input) {
/**
* input
* source: mySpout:5, stream: default, id: {347024301319508839=6813457638891944298}, [d679ad9f-2ab1-4ed0-bd34-a87a5ec00bdd]
*
* input.getString(0)
* d679ad9f-2ab1-4ed0-bd34-a87a5ec00bdd
*/
collecter.emit(input, new Values(input.getString(0)));
System.out.println("bolt1的execute方法被调用一次" + input.getString(0));
//告诉spout处理成功了
// collecter.ack(input);
//告诉spout处理失败了
collecter.fail(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uuid"));
}
}
package com.jdxia.ack;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class Bolt2 extends BaseRichBolt {
private OutputCollector collecter;
//初始化方法只调用一次
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collecter = collector;
}
//被循环调用的处理方法
@Override
public void execute(Tuple input) {
collecter.emit(input, new Values(input.getString(0)));
System.out.println("bolt2的execute方法被调用一次" + input.getString(0));
//告诉spout处理成功了
collecter.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uuid"));
}
}
任务编排
package com.jdxia.ack;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class AckTopologyDriver {
public static void main(String[] args) {
//1. 准备任务信息
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout", new AckSpout(), 1);
topologyBuilder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("mySpout");
topologyBuilder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1");
//2. 任务提交
Config config = new Config();
StormTopology stormTopology = topologyBuilder.createTopology();
//本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("ack", config, stormTopology);
}
}