数据获取

  1. package com.jdxia.storm;
  2. import backtype.storm.topology.BasicOutputCollector;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseBasicBolt;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. public class MySplitBolt extends BaseBasicBolt {
  9. //处理函数
  10. @Override
  11. public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
  12. //1. 数据用tuple获取
  13. //和kafka接入,这边的名字就变为bytes了
  14. byte[] juzi = (byte[]) tuple.getValueByField("bytes");
  15. //2. 进行切割
  16. String[] strings = new String(juzi).split(" ");
  17. //3. 发送数据
  18. for (String string : strings) {
  19. basicOutputCollector.emit(new Values(string, 1));
  20. }
  21. }
  22. //定义下我的输出
  23. @Override
  24. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  25. outputFieldsDeclarer.declare(new Fields("word", "num"));
  26. }
  27. }

数据计算

  1. package com.jdxia.storm;
  2. import backtype.storm.task.TopologyContext;
  3. import backtype.storm.topology.BasicOutputCollector;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseBasicBolt;
  6. import backtype.storm.tuple.Tuple;
  7. import redis.clients.jedis.Jedis;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. public class MyWordCountAndPrintBolt extends BaseBasicBolt {
  11. private Map<String, String> wordCountMap = new HashMap<String, String>();
  12. private Jedis jedis;
  13. //初始化连接redis
  14. @Override
  15. public void prepare(Map stormConf, TopologyContext context) {
  16. //建立redis连接
  17. jedis = new Jedis("0.0.0.0", 6379);
  18. jedis.auth("root");
  19. //调用本来的方法
  20. super.prepare(stormConf, context);
  21. }
  22. //处理函数
  23. @Override
  24. public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
  25. //根据之前定义的word和num
  26. //强转为string
  27. String word = (String) tuple.getValueByField("word");
  28. Integer num = (Integer) tuple.getValueByField("num");
  29. //1.查看单词对应的value是否存在
  30. Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));
  31. if (integer == 0) {
  32. //如果不存在就直接放入新的
  33. wordCountMap.put(word, num + "");
  34. } else {
  35. //如果之前已经有了,就把对应统计加上
  36. wordCountMap.put(word, (integer + num) + "");
  37. }
  38. //保存数据到redis
  39. // redis key wordCount:->Map
  40. jedis.hmset("wordCount",wordCountMap);
  41. }
  42. //不需要定义输出字段了
  43. @Override
  44. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  45. }
  46. }

任务提交

  1. package com.jdxia.storm;
  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.generated.StormTopology;
  5. import backtype.storm.topology.TopologyBuilder;
  6. import storm.kafka.KafkaSpout;
  7. import storm.kafka.SpoutConfig;
  8. import storm.kafka.ZkHosts;
  9. public class StormTopologyDriver {
  10. public static void main(String[] args) {
  11. //1. 描述任务
  12. TopologyBuilder topologyBuilder = new TopologyBuilder();
  13. //任务的名字自己定义
  14. //kafka中第一个参数写broker对应的zk,第二个写topic,第三个写zk的节点,第四个写id
  15. //参数3:zkRoot将offset值存放在zk的哪里
  16. //参数4:zk的子目录,防止被覆盖和其他人冲突
  17. topologyBuilder.setSpout("kafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("master:2181"), "wordCount", "/wc", "wc")));
  18. //shuffleGrouping和前一个任务关联.shuffleGrouping可以连接多个任务
  19. topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("kafkaSpout");
  20. topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1");
  21. //2. 任务提交
  22. //提交给谁?提交什么内容?
  23. Config config = new Config(); //Config类实际上是继承HashMap
  24. //设置在几个work上运行,也就是在几个jvm中运行,如果不指定,默认是在一个work中
  25. // config.setNumWorkers(2);
  26. StormTopology stormTopology = topologyBuilder.createTopology();
  27. //本地模式
  28. LocalCluster localCluster = new LocalCluster();
  29. localCluster.submitTopology("wordCount", config, stormTopology);
  30. //这种是集群模式
  31. // StormSubmitter.submitTopology("worldCount1", config, stormTopology);
  32. }
  33. }

测试

我们创建对应的topic,然后往topic写入数据,数据用空格分开