数据样本
i am jdxia
i am xjd
i am jdxia
i am jelly
jar包
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<!--<scope>provided</scope>-->
<version>0.9.5</version>
</dependency>
安装log4j
数据获取
BaseRichSpout类是ISpout接口和IComponent接口的一个简便的实现
open方法中接收三个参数
- conf包含了storm配置信息的map.
- TopologyContext对象提供了topology中组件的信息
- SpoutOutputCollector对象提供了发射tuple的方法
package com.learnstorm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.commons.lang.StringUtils;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
//数据获取
public class MyLocalFileSpout extends BaseRichSpout {
//控制数据输出
private SpoutOutputCollector collector;
//读取数据的
private BufferedReader bufferedReader;
//初始化方法
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
//定义这个去读取数据
this.bufferedReader = new BufferedReader(new FileReader(new File("/Users/jdxia/Desktop/MyFile/i.txt")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
//storm流式计算的特征就是数据一条一条的处理
// while(true) {
// this.nextTuple();
// }
//这个方法会被循环调用
@Override
public void nextTuple() {
//每被调用一次就会发送一条数据出去
try {
//读取一行
String line = bufferedReader.readLine();
//如果不是空的话
if (StringUtils.isNotBlank(line)) {
List<Object> arrayList = new ArrayList<Object>();
//把数据放到ArrayList中
arrayList.add(line);
//把数据发出去
collector.emit(arrayList);
}
} catch (IOException e) {
e.printStackTrace();
}
}
//定义下我的输出
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("juzi"));
}
}
数据截取
BaseBasicBolt是IComponent和IBolt接口的一个简便实现
BaseBasicBolt中还有个prepare()方法,是bolt初始化的时候调用的
package com.learnstorm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//相当于map-->world,1
//业务逻辑
//对句子进行切割
public class MySplitBolt extends BaseBasicBolt {
//处理函数
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//1.数据如何获取,用tuple获取,tuple是List数据结构,消息传输的基本单元
//强转为string,juzi是上一步定义的
String juzi = (String) tuple.getValueByField("juzi");
//2.进行切割
String[] strings = juzi.split(" ");
//3.发送数据
for (String word : strings) {
//我们之前用ArrayList存储,这边怎么变为Values
//可以看下Values的源码,他是继承了ArrayList,他存的时候用了一个循环
//values对象帮我们生成个list
basicOutputCollector.emit(new Values(word, 1));
}
}
//定义下我的输出
//单词world和他的次数
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "num"));
}
}
单词统计
这里面用到了HashMap.这是可序列化的
如果spout或者bolt在序列化之前,(比如在构造函数中生成)实例化了任何无法序列化的实例变量,在进行序列化的时候会抛出NotSerialiableException
当topology发布时,所有的bolt和spout组件首先进行序列化,然后发布到网络中.
通常情况下最好在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化
bolt中可以加入cleanup()方法,这个方法在IBolt中定义.storm在终止一个Bolt之前会调用这个方法.通常情况下cleanup会用来释放bolt占用的资源
当在集群中运行的时候,cleanup是不可靠的,不能保证会执行
package com.learnstorm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
//打印
public class MyWordCountAndPrintBolt extends BaseBasicBolt {
private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
//处理函数
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//根据之前定义的word和num
//强转为string
String word = (String) tuple.getValueByField("word");
Integer num = (Integer) tuple.getValueByField("num");
//1.查看单词对应的value是否存在
Integer integer = wordCountMap.get(word);
if (integer == null || integer.intValue() == 0) {
//如果不存在就直接放入新的
wordCountMap.put(word, num);
} else {
//如果之前已经有了,就把对应统计加上
wordCountMap.put(word, integer.intValue() + num);
}
System.out.println(wordCountMap);
}
//不需要定义输出字段了
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
//终止bolt会调用这个方法
@Override
public void cleanup() {
}
}
任务描述
这边写的是本地提交到集群
package com.learnstorm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class StormTopologyDriver {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1. 描述任务
TopologyBuilder topologyBuilder = new TopologyBuilder();
//任务的名字自己定义
topologyBuilder.setSpout("mySpout", new MyLocalFileSpout());
//shuffleGrouping和前一个任务关联.shuffleGrouping可以连接多个任务
topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("mySpout");
topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1");
//2. 任务提交
//提交给谁?提交什么内容?
Config config = new Config(); //Config类实际上是继承HashMap
//设置在几个work上运行,也就是在几个jvm中运行,如果不指定,默认是在一个work中
// config.setNumWorkers(2);
StormTopology stormTopology = topologyBuilder.createTopology();
//本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordCount", config, stormTopology);
//这种是集群模式
// StormSubmitter.submitTopology("worldCount1", config, stormTopology);
}
}
提交到集群
如果是提交到集群上面,那么storm的storm-core的作用域就要改下
<scope>provided</scope>
表示集群上提供了这个jar包,
然后maven对项目打包,上传到服务器上,执行
storm jar 上传的jar包 主类名称
然后我们在ui界面上看
点进去看
统计信息打印在bolt2上
查看下bolt2在那台机器上
hadoop03的6700端口上
到这台机器上的storm的logs目录
目录下有个worker-6700.log查看下这个日志