1.storm优点
2.核心概念
3.Storm架构
4.storm工作流程
编程流程:
Spout
那么首先开始编写Spout类。一般是实现 IRichSpout 或继承BaseRichSpout该类,然后实现该方法。
这里我们继承BaseRichSpout这个类,该类需要实现这几个主要的方法:
代码示例:
@Override 
public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
System.out.println(“open:”+map.get(“test”));
this.collector = collector;
}
二、nextTuplenextTuple()方法是Spout实现的核心。也就是主要执行方法,用于输出信息,通过collector.emit方法发射。@Overridepublic void nextTuple() {if(count<=2){System.out.println("第"+count+"次开始发送数据...");this.collector.emit(new Values(message));}count++;}三、declareOutputFieldsdeclareOutputFields是在IComponent接口中定义,用于声明数据格式。即输出的一个Tuple中,包含几个字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {System.out.println("定义格式...");declarer.declare(new Fields(field));}四、ackack是在ISpout接口中定义,用于表示Tuple处理成功。五、failfail是在ISpout接口中定义,用于表示Tuple处理失败。六、closeclose是在ISpout接口中定义,用于表示Topology停止。代码示例:@Overridepublic void close() {System.out.println("关闭...");}BoltBolt是用于处理数据的组件,主要是由execute方法来进行实现。一般来说需要实现 IRichBolt 或继承BaseRichBolt该类,然后实现其方法。需要实现方法如下:一、prepare在Bolt启动前执行,提供Bolt启动环境配置的入口。参数基本和Sqout一样。一般对于不可序列化的对象进行实例化。这里的我们就简单的打印下@Overridepublic void prepare(Map map, TopologyContext arg1, OutputCollector collector) {System.out.println("prepare:"+map.get("test"));this.collector=collector;}注:如果是可以序列化的对象,那么最好是使用构造函数。二、executeexecute()方法是Bolt实现的核心。也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。从tuple中获取消息可以使用 tuple.getString()和tuple.getStringByField();这两个方法。个人推荐第二种,可以通过field来指定接收的消息。注:如果继承的是IRichBolt,则需要手动ack。这里就不用了,BaseRichBolt会自动帮我们应答代码示例:@Overridepublic void execute(Tuple tuple) {//String msg=tuple.getString(0);String msg=tuple.getStringByField("test");//这里我们就不做消息的处理,只打印System.out.println("Bolt第"+count+"接受的消息:"+msg);count++;/**** 没次调用处理一个输入的tuple,所有的tuple都必须在一定时间内应答。* 可以是ack或者fail。否则,spout就会重发tuple。*///collector.ack(tuple);}三、declareOutputFields和Spout的一样。因为到了这里就不再输出了,所以就什么都没写。@Overridepublic void declareOutputFields(OutputFieldsDeclarer arg0) {}cleanupcleanup是IBolt接口中定义,用于释放bolt占用的资源。Storm在终止一个bolt之前会调用这个方法。因为这里没有什么资源需要释放,所以就简单的打印一句就行了。@Overridepublic void cleanup() {System.out.println("资源释放");}
