数据采集
package com.jdxia.ack;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class AckSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
//初始化方法
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
//循环调用,每调用一次就发送一条消息
@Override
public void nextTuple() {
//随机生产一条数据
String uuid = UUID.randomUUID().toString().replace("_", " ");
collector.emit(new Values(uuid), new Values(uuid));
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//定义发送的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uuid"));
}
@Override
public void ack(Object msgId) {
System.out.println("消息处理成功" + msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("消息处理失败" + msgId);
//重新发送消息
collector.emit((List) msgId, msgId);
}
}
数据处理
package com.jdxia.ack;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class Bolt1 extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//输出数据
collector.emit(new Values(input.getString(0)));
System.out.println("Bolt1输出消息");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uuid"));
}
}
package com.jdxia.ack;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class Bolt2 extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//如果需要抛出异常,成功就不要抛出异常
throw new FailedException("异常");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
任务编排
package com.jdxia.ack;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class AckTopologyDriver {
public static void main(String[] args) {
//1. 准备任务信息
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout", new AckSpout(), 1);
topologyBuilder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("mySpout");
topologyBuilder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1");
//2. 任务提交
Config config = new Config();
StormTopology stormTopology = topologyBuilder.createTopology();
//本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("ack", config, stormTopology);
}
}