Flink Metric介绍
Flink提供的Metrics可以在Flink内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的Task日志,比如作业很大或者有很多作业的情况下,该如何处理?此时Metrics可以很好的帮助开发人员了解作业当前状况。对于很多大中型企业来讲,我们对进群的作业进行管理时,更多的是关心作业精细化实时运行状态。例如,实时吞吐量的同比环比、整个集群的任务运行概览、集群水位,或者监控利用 Flink 实现的 ETL 框架的运行情况等,这时候就需要设计专门的监控系统来监控集群的任务作业情况。
Flink Metric架构
Flink Metrics 是Flink实现的一套运行信息收集库,我们不但可以收集Flink本身提供的系统指标,比如CPU、内存、线程使用情况、JVM垃圾收集情况、网络和IO等,还可以通过继承和实现指定的类或者接口打点收集用户自定义的指标。
通过使用Flink Metrics我们可以轻松地做到:
- 实时采集Flink中的Metrics信息或者自定义用户需要的指标信息并进行展示;
- 通过Flink提供的Rest API收集这些信息,并且接入第三方系统进行展示。
Flink Metric分类
Flink提供了四种类型的监控指标,分别是:Counter、Gauge、Histogram、Meter。
1. Counter
Counter 称为计数器,一般用来统计其中一个指标的总量,比如统计数据的输入、输出总量。
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("MyCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
2. Gauge
Gauge被用来统计某一个指标的瞬时值。举个例子,我们在监控Flink中某一个节点的内存使用情况或者某个map算子的输出值数量。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueNumber = 0L;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Long>() {
@Override
public Long getValue() {
return valueNumber;
}
});
}
@Override
public String map(String value) throws Exception {
valueNumber++;
return value;
}
}
3. Histogram
Histogram(直方图),Flink中属于直方图的指标非常少,它通常被用来计算指标的最大值、最小值、中位数等。
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@public Integer map(Long value) throws Exception {
this.histogram.update(value);
}
}
4. Meter
Meter被用来计算一个指标的平均值。
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@public Integer map(Long value) throws Exception {
this.meter.markEvent();
}
}
Metric Group
Flink中的Metrics是一个多层的结构,以Group的方式存在,我们用来定位唯一的一个Metrics是通过Metric Group + Metric Name的方式。
层级树如下:
- TaskManager
- TaskManagerJob
- Task
- TaskIO
- Operator
- User defined Group/Metrics
- OperatorIO
- Task
- TaskManagerJob
- JobManager
- JobManagerJob
Flink Metric类图
Flink Metrics相关的实现都是通过org.apache.flink.metrics.Metric这个类实现的,整体的类图如下所示:
为了方便对Metrics进行管理和分类,Flink提供了对Metrics进行分组的功能,这个功能是通过下图中MetricGroup实现的,在图中可以看到MetricGroup相关的子类的继承关系。
此外,Flink还提供了向外披露Metric的监测结果的接口,该接口是org.apache.flink.metrics.reporter.MetricReporter。这个接口的实现类通过Metrics的类型进行注册和移除。public abstract class AbstractReporter implements MetricReporter, CharacterFilter {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final Map<Gauge<?>, String> gauges = new HashMap();
protected final Map<Counter, String> counters = new HashMap();
protected final Map<Histogram, String> histograms = new HashMap();
protected final Map<Meter, String> meters = new HashMap();
public AbstractReporter() {
}
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
String name = group.getMetricIdentifier(metricName, this);
synchronized(this) {
if(metric instanceof Counter) {
this.counters.put((Counter)metric, name);
} else if(metric instanceof Gauge) {
this.gauges.put((Gauge)metric, name);
} else if(metric instanceof Histogram) {
this.histograms.put((Histogram)metric, name);
} else if(metric instanceof Meter) {
this.meters.put((Meter)metric, name);
} else {
this.log.warn("Cannot add unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
}
}
}
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
synchronized(this) {
if(metric instanceof Counter) {
this.counters.remove(metric);
} else if(metric instanceof Gauge) {
this.gauges.remove(metric);
} else if(metric instanceof Histogram) {
this.histograms.remove(metric);
} else if(metric instanceof Meter) {
this.meters.remove(metric);
} else {
this.log.warn("Cannot remove unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
}
}
}
}
获取Metrics
获取Metrics的方法有多种,首先我们可以通过Flink的后台管理页面看到部分指标;其次可以通过Flink提供的Http 接口查询Flink任务的状态信息,因为Flink Http接口返回的都是Json信息,我们可以很方便地将Json进行解析;最后一种方法是,我们可以通过Metric Reporter获取。下面分别对这两者进行详细讲解。Flink Rest API
Flink提供了丰富的接口来协助我们查询Flink中任务运行的状态,所有的请求都可以通过访问http://${hostname}:8081/加指定的URI方式查询,Flink支持的所有HTTP接口你都可以点击这里查询到。
Flink支持的接口包括:
- JobManagerJob
URL | Verb | Describe |
---|---|---|
/config | GET | 返回WebUI配置信息 |
/overview | GET | 返回Flink集群概览 |
/jobs | GET | 返回所有作业的概览及其当前状态 |
/jobs/overview | GET | 返回所有作业的概览 |
/jobs/ |
GET | 返回指定作业详情 |
/jobs/ |
GET | 返回指定作业配置信息 |
/jobs/ |
GET | 返回指定作业异常信息 |
/jobs/ |
GET | 返回指定作业累加器,从各自子任务聚集 |
/jobs/ |
GET | 返回指定作业详情,并为每个子任务总结 |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | |
/jobs/ |
GET | 返回指定作业执行计划 |
/jobs/ |
GET | 返回指定作业的检查点 |
/jobs/ |
GET | 取消指定作业 |
/jars | GET | 返回上传的jar列表 |
/jars/upload | POST | 上传jar到集群 |
/jars/:jarid | DELETE | 按照指定jar删除作业 |
/jars/:jarid/plan | GET | 返回指定jar对应作业的执行计划 |
/jars/:jarid/run | POST | 执行指定jar作业,可使用JSON格式参数或者查询参数 |
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/monitoring/rest_api.html
Flink Reporter
Flink还提供了很多内置的Reporter,这些Reporter在Flink的官网中可以查询到。例如,Flink提供了Graphite、InfluxDB、Prometheus等内置的Reporter,我们可以方便地对这些外部系统进行集成。关于它们的详细配置也可以在Flink官网的详情页面中看到。
自定义Metrics
不是线程安全
继承RichFunction
getRuntimeContext().geMetircGroup().addGroup()
getRuntimeContext().geMetircGroup().count/gauge/meter/histogram()
Flink Metric配置
参考:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
Flink监控指标
注:上图为Flink1.9默认监控指标(https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/monitoring/metrics.html)。