1.storm优点
2.核心概念
3.Storm架构
4.storm工作流程
编程流程:
Spout
那么首先开始编写Spout类。一般是实现 IRichSpout 或继承BaseRichSpout该类,然后实现该方法。
这里我们继承BaseRichSpout这个类,该类需要实现这几个主要的方法:
代码示例:
@Override
public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
System.out.println(“open:”+map.get(“test”));
this.collector = collector;
}
二、nextTuple
nextTuple()方法是Spout实现的核心。
也就是主要执行方法,用于输出信息,通过collector.emit方法发射。
@Override
public void nextTuple() {
if(count<=2){
System.out.println("第"+count+"次开始发送数据...");
this.collector.emit(new Values(message));
}
count++;
}
三、declareOutputFields
declareOutputFields是在IComponent接口中定义,用于声明数据格式。
即输出的一个Tuple中,包含几个字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println("定义格式...");
declarer.declare(new Fields(field));
}
四、ack
ack是在ISpout接口中定义,用于表示Tuple处理成功。
五、fail
fail是在ISpout接口中定义,用于表示Tuple处理失败。
六、close
close是在ISpout接口中定义,用于表示Topology停止。
代码示例:
@Override
public void close() {
System.out.println("关闭...");
}
Bolt
Bolt是用于处理数据的组件,主要是由execute方法来进行实现。
一般来说需要实现 IRichBolt 或继承BaseRichBolt该类,然后实现其方法。
需要实现方法如下:
一、prepare
在Bolt启动前执行,提供Bolt启动环境配置的入口。
参数基本和Sqout一样。
一般对于不可序列化的对象进行实例化。
这里的我们就简单的打印下
@Override
public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
System.out.println("prepare:"+map.get("test"));
this.collector=collector;
}
注:如果是可以序列化的对象,那么最好是使用构造函数。
二、execute
execute()方法是Bolt实现的核心。
也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
从tuple中获取消息可以使用 tuple.getString()和tuple.getStringByField();这两个方法。
个人推荐第二种,可以通过field来指定接收的消息。
注:如果继承的是IRichBolt,则需要手动ack。这里就不用了,BaseRichBolt会自动帮我们应答
代码示例:
@Override
public void execute(Tuple tuple) {
//String msg=tuple.getString(0);
String msg=tuple.getStringByField("test");
//这里我们就不做消息的处理,只打印
System.out.println("Bolt第"+count+"接受的消息:"+msg);
count++;
/**
*
* 没次调用处理一个输入的tuple,所有的tuple都必须在一定时间内应答。
* 可以是ack或者fail。否则,spout就会重发tuple。
*/
//collector.ack(tuple);
}
三、declareOutputFields
和Spout的一样。
因为到了这里就不再输出了,所以就什么都没写。
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
cleanup
cleanup是IBolt接口中定义,用于释放bolt占用的资源。
Storm在终止一个bolt之前会调用这个方法。
因为这里没有什么资源需要释放,所以就简单的打印一句就行了。
@Override
public void cleanup() {
System.out.println("资源释放");
}