1.storm优点

    2.核心概念

    3.Storm架构

    4.storm工作流程

    编程流程:

    Spout
    那么首先开始编写Spout类。一般是实现 IRichSpout 或继承BaseRichSpout该类,然后实现该方法。
    这里我们继承BaseRichSpout这个类,该类需要实现这几个主要的方法:

    1. 代码示例:

    @Override
    public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
    System.out.println(“open:”+map.get(“test”));
    this.collector = collector;
    }

    1. 二、nextTuple
    2. nextTuple()方法是Spout实现的核心。
    3. 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。
    4. @Override
    5. public void nextTuple() {
    6. if(count<=2){
    7. System.out.println("第"+count+"次开始发送数据...");
    8. this.collector.emit(new Values(message));
    9. }
    10. count++;
    11. }
    12. 三、declareOutputFields
    13. declareOutputFields是在IComponent接口中定义,用于声明数据格式。
    14. 即输出的一个Tuple中,包含几个字段
    15. @Override
    16. public void declareOutputFields(OutputFieldsDeclarer declarer) {
    17. System.out.println("定义格式...");
    18. declarer.declare(new Fields(field));
    19. }
    20. 四、ack
    21. ack是在ISpout接口中定义,用于表示Tuple处理成功。
    22. 五、fail
    23. fail是在ISpout接口中定义,用于表示Tuple处理失败。
    24. 六、close
    25. close是在ISpout接口中定义,用于表示Topology停止。
    26. 代码示例:
    27. @Override
    28. public void close() {
    29. System.out.println("关闭...");
    30. }
    31. Bolt
    32. Bolt是用于处理数据的组件,主要是由execute方法来进行实现。
    33. 一般来说需要实现 IRichBolt 或继承BaseRichBolt该类,然后实现其方法。
    34. 需要实现方法如下:
    35. 一、prepare
    36. Bolt启动前执行,提供Bolt启动环境配置的入口。
    37. 参数基本和Sqout一样。
    38. 一般对于不可序列化的对象进行实例化。
    39. 这里的我们就简单的打印下
    40. @Override
    41. public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
    42. System.out.println("prepare:"+map.get("test"));
    43. this.collector=collector;
    44. }
    45. 注:如果是可以序列化的对象,那么最好是使用构造函数。
    46. 二、execute
    47. execute()方法是Bolt实现的核心。
    48. 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
    49. tuple中获取消息可以使用 tuple.getString()和tuple.getStringByField();这两个方法。
    50. 个人推荐第二种,可以通过field来指定接收的消息。
    51. 注:如果继承的是IRichBolt,则需要手动ack。这里就不用了,BaseRichBolt会自动帮我们应答
    52. 代码示例:
    53. @Override
    54. public void execute(Tuple tuple) {
    55. //String msg=tuple.getString(0);
    56. String msg=tuple.getStringByField("test");
    57. //这里我们就不做消息的处理,只打印
    58. System.out.println("Bolt第"+count+"接受的消息:"+msg);
    59. count++;
    60. /**
    61. *
    62. * 没次调用处理一个输入的tuple,所有的tuple都必须在一定时间内应答。
    63. * 可以是ack或者fail。否则,spout就会重发tuple。
    64. */
    65. //collector.ack(tuple);
    66. }
    67. 三、declareOutputFields
    68. Spout的一样。
    69. 因为到了这里就不再输出了,所以就什么都没写。
    70. @Override
    71. public void declareOutputFields(OutputFieldsDeclarer arg0) {
    72. }
    73. cleanup
    74. cleanupIBolt接口中定义,用于释放bolt占用的资源。
    75. Storm在终止一个bolt之前会调用这个方法。
    76. 因为这里没有什么资源需要释放,所以就简单的打印一句就行了。
    77. @Override
    78. public void cleanup() {
    79. System.out.println("资源释放");
    80. }