词频统计
需求:
读取指定目录的数据,并实现单词计数功能
实现方案:
- Spout来读取指定目录的数据,作为后续Bolt处理的input
- 使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
- 使用一个Bolt来进行最终的单词次数统计操作
- 输出
涉及到文件操作,为了简单使用commons-io
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
在E:\stormtest 创建文件test.txt,作为我们的测试数据
内容为:
a,b,c,d
a,b,c
a,c
a,d
f,b
拓扑设计:DataSourceSpout 》SplitBolt》CountBolt
1.定义数据源DataSourceSpout
创建DataSourceSpout继承BaseRichSpout重写open、nextTuple、declareOutputFields
public static class DataSourceSpout extends BaseRichSpout{
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Collection<File> files = FileUtils.listFiles(new File("E:\\stormtest"),new String[]{"txt"},true);
for (File file:files){
try {
List<String> lines = FileUtils.readLines(file);
for (String words:lines){
spoutOutputCollector.emit(new Values(words));
}
//此方法一直循环(nextTuple),所以要把处理完的文件改名
FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("lines"));
}
}
2.定义SplitBolt
创建SplitBolt继承BaseRichBolt,重写prepare、execute、declareOutputFields
public static class SplitBolt extends BaseRichBolt{
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String line = tuple.getStringByField("lines");
String[] words = line.split(",");
for (String word : words){
outputCollector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
3.定义CountBolt
创建CountBolt继承BaseRichBolt,重写prepare、execute、declareOutputFields
public static class CountBolt extends BaseRichBolt{
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
private Map<String,Integer> map = new HashMap<>();
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = map.get(word);
if (null==count){
count=0;
}
count++;
map.put(word,count);
System.out.println("=====");
Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
for (Map.Entry<String,Integer> entry:entrySet){
System.out.println(entry);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
4.Topology提交功能
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("DataSourceSpout",new DataSourceSpout());
topologyBuilder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
topologyBuilder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("LocalWordCountStormTopology",new Config(),topologyBuilder.createTopology());
5.注意
在本案例中,运行完一次后要把文件改名,因为nextTuple()方法一直执行。所以要每发送一个文件后都要给这个文件改名,避免再次发送造成统计错误。