1. Flink-Metrics监控

1.1 什么是Metrics?

1.1.1 Metrics介绍

由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的Task 日志,比如作业很大或者有很多作业的情况下,该如何处理?此时 Metrics 可以很好的帮助开发人员了解作业的当前状况。
Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。
image.png

1.1.2 Metric Types

Metrics 的类型如下:
1,常用的如 Counter,写过 mapreduce 作业的开发人员就应该很熟悉 Counter,其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。
2,Gauge,Gauge 是最简单的 Metrics,它反映一个值。比如要看现在 Java heap 内存用了多少,就可以每次实时的暴露一个 Gauge,Gauge 当前的值就是heap使用的量。
3,Meter,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。
4,Histogram,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。
Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。

1.1 WebUI监控

在flink的UI的界面上点击任务详情,然后点击Task Metrics会弹出如下的界面,在 add metic按钮上可以添加我需要的监控指标。
l 自定义监控指标
○ 案例:在map算子内计算输入的总数据
○ 设置MetricGroup为:flink_test_metric
○ 指标变量为:mapDataNub
○ 参考代码
○ 程序启动之后就可以在任务的ui界面上查看

  1. package cn.itcast.hello;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.RichMapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.metrics.Counter;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.KeyedStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.util.Collector;
  12. /**
  13. * Author itcast
  14. * Desc
  15. */
  16. public class WordCount5_Metrics {
  17. public static void main(String[] args) throws Exception {
  18. //1.准备环境-env
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  21. //2.准备数据-source
  22. //2.source
  23. DataStream<String> linesDS = env.socketTextStream("node1", 9999);
  24. //3.处理数据-transformation
  25. DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
  26. @Override
  27. public void flatMap(String value, Collector<String> out) throws Exception {
  28. //value就是一行行的数据
  29. String[] words = value.split(" ");
  30. for (String word : words) {
  31. out.collect(word);//将切割处理的一个个的单词收集起来并返回
  32. }
  33. }
  34. });
  35. //3.2对集合中的每个单词记为1
  36. DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new RichMapFunction<String, Tuple2<String, Integer>>() {
  37. Counter myCounter;
  38. @Override
  39. public void open(Configuration parameters) throws Exception {
  40. myCounter= getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");
  41. }
  42. @Override
  43. public Tuple2<String, Integer> map(String value) throws Exception {
  44. myCounter.inc();
  45. //value就是进来一个个的单词
  46. return Tuple2.of(value, 1);
  47. }
  48. });
  49. //3.3对数据按照单词(key)进行分组
  50. KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
  51. //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
  52. DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
  53. //4.输出结果-sink
  54. result.print().name("mySink");
  55. //5.触发执行-execute
  56. env.execute();
  57. }
  58. }
  59. // /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
  60. // /export/server/flink/bin/flink run --class cn.itcast.hello.WordCount5_Metrics /root/metrics.jar
  61. // 查看WebUI

image.png

Prometheus+grafana集成Flink进行监控