指标 - Metrics

Flink公开了一个指标系统,可以收集和暴露指标给外部系统.

Registering metrics

你可以调用 getRuntimeContext().getMetricGroup()方法来访问任何继承自RichFunction函数的用户函数的指标系统.这个方法返回一个MetricGroup对象,通过这个对象可以创建和注册新的指标.

Metric types

Flink支持的指标类型:Counters,Gauges,HistogramsMeters.

Counter

Counter用作某方面计数,通过调用inc()/inc(long n)或者 dec()/dec(long n)方法来使当前的值增加或者减少. 通过调用MetricGroupcounter(String name)方法可以创建和注册一个Counter.

  1. public class MyMapper extends RichMapFunction<String, Integer> {
  2. private Counter counter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .counter("myCounter");
  8. }
  9. @public Integer map(String value) throws Exception {
  10. this.counter.inc();
  11. }
  12. }

或者你也可以使用自己实现的Counter

  1. public class MyMapper extends RichMapFunction<String, Integer> {
  2. private Counter counter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .counter("myCustomCounter", new CustomCounter());
  8. }
  9. }

Gauge

Gauge按需提供任意类型的值,要使用Gauge,你必须首先创建一个类并实现org.apache.flink.metrics.Gauge接口。 这里对返回值的类型没有限制。 可以调用MetricGroupgauge(String name,Gauge gauge)方法来注册一个gauge。

  1. public class MyMapper extends RichMapFunction<String, Integer> {
  2. private int valueToExpose;
  3. @Override
  4. public void open(Configuration config) {
  5. getRuntimeContext()
  6. .getMetricGroup()
  7. .gauge("MyGauge", new Gauge<Integer>() {
  8. @Override
  9. public Integer getValue() {
  10. return valueToExpose;
  11. }
  12. });
  13. }
  14. }
  1. public class MyMapper extends RichMapFunction[String,Int] {
  2. val valueToExpose = 5
  3. override def open(parameters: Configuration): Unit = {
  4. getRuntimeContext()
  5. .getMetricGroup()
  6. .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  7. }
  8. ...
  9. }

请注意reporters会将暴露的对象转化成String型,这意味着需要去实现一个有意义的toString()方法

Histogram

Histogram用于度量长值分布情况, 你可以通过调用MetricGrouphistogram(String name, Histogram histogram)方法来注册一个Histogram

  1. public class MyMapper extends RichMapFunction<Long, Integer> {
  2. private Histogram histogram;
  3. @Override
  4. public void open(Configuration config) {
  5. this.histogram = getRuntimeContext()
  6. .getMetricGroup()
  7. .histogram("myHistogram", new MyHistogram());
  8. }
  9. @public Integer map(Long value) throws Exception {
  10. this.histogram.update(value);
  11. }
  12. }

Flink没有提供一个默认的Histogram实现。但是提供了一个{% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java “Wrapper” %}来允许使用 Codahale/DropWizard 直方图。 如需使用此包装器,请在您的pom.xml中添加以下依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-metrics-dropwizard</artifactId>
  4. <version>{{site.version}}</version>
  5. </dependency>

你可以注册一个Codahale/DropWizard 直方图类似于:

  1. public class MyMapper extends RichMapFunction<Long, Integer> {
  2. private Histogram histogram;
  3. @Override
  4. public void open(Configuration config) {
  5. com.codahale.metrics.Histogram histogram =
  6. new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
  7. this.histogram = getRuntimeContext()
  8. .getMetricGroup()
  9. .histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
  10. }
  11. }

Meter

Meter用于度量平均吞吐量,使用markEvent()方法可以注册一个发生的事件.同时发生的多个事件可以使用markEvent(long n)方法来进行注册。 通过调用MetricGroupmeter(String name, Meter meter)方法来注册一个meter

  1. public class MyMapper extends RichMapFunction<Long, Integer> {
  2. private Meter meter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.meter = getRuntimeContext()
  6. .getMetricGroup()
  7. .meter("myMeter", new MyMeter());
  8. }
  9. @public Integer map(Long value) throws Exception {
  10. this.meter.markEvent();
  11. }
  12. }

Flink提供了一个 {% gh_link flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java “Wrapper” %}来允许使用 Codahale/DropWizard meters. 要使用此包装器,请在您的pom.xml中添加以下依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-metrics-dropwizard</artifactId>
  4. <version>{{site.version}}</version>
  5. </dependency>

您可以注册Codahale / DropWizard meter类似于这样:

  1. public class MyMapper extends RichMapFunction<Long, Integer> {
  2. private Meter meter;
  3. @Override
  4. public void open(Configuration config) {
  5. com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();
  6. this.meter = getRuntimeContext()
  7. .getMetricGroup()
  8. .meter("myMeter", new DropwizardMeterWrapper(meter));
  9. }
  10. }

Scope

每个指标被分配一个标识符,该标识符将基于3个组件进行汇报:注册指标时用户提供的名称,可选的用户自定义域和系统提供的域。例如,如果A.B是系统域,C.D是用户域,E是名称,那么指标的标识符将是A.B.C.D.E. 你可以通过设置conf/flink-conf.yam里面的metrics.scope.delimiter参数来配置标识符的分隔符(默认:.).

User Scope

你可以通过调用MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)来定义一个用户域

  1. counter = getRuntimeContext()
  2. .getMetricGroup()
  3. .addGroup("MyMetrics")
  4. .counter("myCounter");

System Scope

系统域包含关于这个指标的上下文信息,例如其注册的任务或该任务属于哪个作业.

可以通过在conf/flink-conf.yaml中设置以下关键字来配置它的上下文信息.这些关键字的每一个都期望可以包含常量的格式字符串(例如:“taskmanager”)和将在运行时被替换的变量(例如:”<task_id>”)

  • metrics.scope.jm
    • 默认: <host>.jobmanager
    • 适用于属于一个job manager的所有指标.
  • metrics.scope.jm.job
    • 默认: <host>.jobmanager.<job_name>
    • 适用于属于一个job manager和job的所有指标.
  • metrics.scope.tm
    • 默认: <host>.taskmanager.<tm_id>
    • 适用于属于一个task manager的所有指标.
  • metrics.scope.tm.job
    • 默认: <host>.taskmanager.<tm_id>.<job_name>
    • 适用于属于一个task manager和job的所有指标.
  • metrics.scope.task
    • 默认: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
      • 适用于属于一个task的所有指标.
  • metrics.scope.operator
    • 默认: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • 适用于属于一个operator的所有指标.

这里对变量的数量和顺序没有限制,变量区分大小写.

运算指标的默认域将导致类似的标识符:

localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

如果你想包含任务名称,但省略task manager的信息,你可以指定以下格式:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

这可以创建标识符localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.

注意对于此格式的字符串,如果同一作业同时运行多次,则可能会发生标识符冲突,导致指标标准数据不一致.因此,建议使用格式字符串,通过包括ID(例如 <job_id>)或通过为作业和操作符分配唯一的名称来保证一定程度上的唯一性.

变量列表

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_name>, <subtask_index>

Reporter

通过在conf/flink-conf.yaml中配置一个或者一些reporters,能够让指标暴露给一个外部系统.这些reporters将在每个job和task manager启动时被实例化.

  • metrics.reporters: reporters的名称列表.
  • metrics.reporter.<name>.<config>: 给定reporter名称<name>的通用设置.
  • metrics.reporter.<name>.class: 给定reporter名称<name>的reporter类 .
  • metrics.reporter.<name>.interval: 给定reporter名称<name>的reporter间隔.
  • metrics.reporter.<name>.scope.delimiter: 给定reporter名称<name>所使用的分割符标识(默认值用:metrics.scope.delimiter)

所有的reporters必须至少具备class属性,有些允许指定一个reporting的interval,以下,我们将列举更多针对每个reporter的设置.

举例说明指定多个reporters的配置

  1. metrics.reporters: my_jmx_reporter,my_other_reporter
  2. metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
  3. metrics.reporter.my_jmx_reporter.port: 9020-9040
  4. metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
  5. metrics.reporter.my_other_reporter.host: 192.168.1.1
  6. metrics.reporter.my_other_reporter.port: 10000

重要提示:当Flink启动的时候,通过放入到/lib目录下包含reporter的jar文件必须可访问.

你可以通过实现org.apache.flink.metrics.reporter.MetricReporter接口来定义自己的Reporter, 如果这个Reporter必须定期发送报告,那你也必须同时实现Scheduled接口.

下面的章节列举了支持的reporters.

JMX (org.apache.flink.metrics.jmx.JMXReporter)

不必包含其他依赖关系,因为JMX reporter默认可用,但是并没有被激活

参数:

  • port - (可选) JMX侦听连接的端口,也可以是端口范围。当指定范围时,相关job或者task manager 日志将显示实际端口。如果设置了此设置,Flink将为给定的端口/范围启动一个额外的JMX连接器。在默认本地JMX接口上指标始终可用.

示例配置:

  1. metrics.reporters: jmx
  2. metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
  3. metrics.reporter.jmx.port: 8789

通过JMX暴露出来的指标由一个域和一个键-属性列表来标识,它们一起形成对象名称。

域始终以 org.apache.flink打头,后面跟着通用指标标识。相对于常用的标识符,它不受域格式影响,不包含任何变量,并且作业之间是不变的,这样域的一个列子是:

org.apache.flink.job.task.numBytesOut.

键-属性列表包含所有变量的值,与配置的域格式无关,它们与给定的指标相关联,这样列表的列子是:

host=localhost,job_name=MyJob,task_name=MyTask.

因此域识别一个指标类,而键-属性性列表识别一个(或多个)指标实例

Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)

为了使用这个reporter,你必须将/opt/flink-metrics-ganglia-{{site.version}}.jar拷贝到Flink发行版本下的/lib文件夹中

参数:

  • host - 在gmond.conf中的udp_recv_channel.bind下配置的gmond主机地址
  • port - 在gmond.conf中的udp_recv_channel.port下配置的gmond端口
  • tmax - 旧指标能够保留软性限制的最长时间
  • dmax - 旧指标能够保留硬性限制的最长时间
  • ttl - 传输UDP包的生存时间
  • addressingMode - UDP使用的寻址模式(UNICAST/MULTICAST)

示例配置:

  1. metrics.reporters: gang
  2. metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
  3. metrics.reporter.gang.host: localhost
  4. metrics.reporter.gang.port: 8649
  5. metrics.reporter.gang.tmax: 60
  6. metrics.reporter.gang.dmax: 0
  7. metrics.reporter.gang.ttl: 1
  8. metrics.reporter.gang.addressingMode: MULTICAST

Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)

为了使用这个reporter,你必须将/opt/flink-metrics-graphite-{{site.version}}.jar拷贝到Flink发行版本下的/lib文件夹中

参数:

  • host - Graphite 服务器地址
  • port - Graphite 服务器端口
  • protocol - 使用的协议 (TCP/UDP)

示例配置:

  1. metrics.reporters: grph
  2. metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
  3. metrics.reporter.grph.host: localhost
  4. metrics.reporter.grph.port: 2003
  5. metrics.reporter.grph.protocol: TCP

StatsD (org.apache.flink.metrics.statsd.StatsDReporter)

为了使用reporter,你必须将/opt/flink-metrics-statsd-{{site.version}}.jar拷贝到Flink发行版本下/lib文件夹中

参数:

  • host - the StatsD 服务器地址
  • port - the StatsD 服务器端口

示例配置

  1. metrics.reporters: stsd
  2. metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
  3. metrics.reporter.stsd.host: localhost
  4. metrics.reporter.stsd.port: 8125

System metrics

默认情况下,Flink收集了几个能够深入了解当前状态的指标,本章节是所有这些指标的一个参考 以下表格通常有4列:

  • “Scope”列描述了用于生成系统域的域格式,例如,如果单元格包含“Operator”,则使用“metric.scope.operator”的作用域格式,如果单元格包含以斜杠分割的多个值,则会根据不同的实体报告多个指标,例如job-和taskmanagers.

  • “Infix”(可选) 列描述了哪些中缀附加到系统域中.

  • “Metrics” 列中列出了给定域和中缀的所有注册指标的名称.

  • “Description” 列提供有关给定指标度量的相关信息.

请注意,中缀/指标名称列中的所有点仍然遵循“metrics.delimiter”设置,因此,为了推断指标标识符:

  1. 根据域列选择域格式
  2. 添加这个值在中缀列,如果存在,则表示“metrics.delimiter”设置
  3. 添加指标名称

CPU:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.CPU Load JVM最近CPU使用情况.
Time JVM使用的CPU时间.

内存:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.Memory Memory.Heap.Used 当前使用的堆内存大小.
Heap.Committed 保证JVM可用的堆内存大小.
Heap.Max 可用于内存管理的堆内存最大值.
NonHeap.Used 当前使用的非堆内存大小.
NonHeap.Committed 保证JVM可用的非堆内存大小.
NonHeap.Max 可用于内存管理的非堆内存最大值.
Direct.Count 直接缓冲池中的缓冲区数量.
Direct.MemoryUsed JVM中用于直接缓冲池的内存大小.
Direct.TotalCapacity 直接缓冲池中所有缓冲区的总容量.
Mapped.Count 映射缓冲池中缓冲区的数量.
Mapped.MemoryUsed JVM中用于映射缓冲池的内存大小.
Mapped.TotalCapacity 映射缓冲池中缓冲区的数量.

线程:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.ClassLoader Threads.Count 存活线程总数.

垃圾回收:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 已发生的回收总数.
<GarbageCollector>.Time 执行垃圾回收花费的总时间.

类加载器:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded 自JVM启动以来加载类的总数.
ClassesUnloaded 自JVM启动以来卸载类的总数.

网络:

Scope Infix Metrics Description
TaskManager Status.Network AvailableMemorySegments 未使用的内存段数.
TotalMemorySegments 已分配的内存段数.
Task buffers inputQueueLength 队列输入缓冲区的数量.
outputQueueLength 队列输出缓冲区的数量.
inPoolUsage 输入缓冲区使用情况评估.
outPoolUsage 输出缓冲区使用情况评估.
Network.<Input|Output>.<gate>
(only available if taskmanager.net.detailed-metrics config option is set)
totalQueueLen 所有输入/输出通道中队列缓冲区的总数.
minQueueLen 所有输入/输出通道中队列缓冲区的最小数目.
maxQueueLen 所有输入/输出通道中队列缓冲区的最大数目.
avgQueueLen 所有输入/输出通道中队列缓冲区的平均数目.

集群:

Scope Metrics Description
JobManager numRegisteredTaskManagers 已注册taskmanagers的数量.
numRunningJobs 正在运行jobs的数量.
taskSlotsAvailable 可用task slots的数量
taskSlotsTotal task slots总数.

检查点:

Scope Metrics Description
Job (only available on JobManager) lastCheckpointDuration 完成上一次检测点所花费的时间.
lastCheckpointSize 上一次检测点的总大小.
lastCheckpointExternalPath 上一个检测点存储的路径.
Task checkpointAlignmentTime 最后一个障碍对齐所需的纳秒时间,或者当前对齐已经花费了多长时间.

IO:

Scope Metrics Description
Task currentLowWatermark 该任务已经获得的最低水位.
numBytesInLocal 该任务从本地源读取的字节总数.
numBytesInLocalPerSecond 该任务从本地源每秒读取的字节数.
numBytesInRemote 该任务从远端读取的字节总数.
numBytesInRemotePerSecond 该任务从远端每秒读取的字节数.
numBytesOut 该任务已发出的字节总数.
numBytesOutPerSecond 该任务每秒发出的字节数.
Task/Operator numRecordsIn 该任务/操作已收到的条目总数.
numRecordsInPerSecond 该任务/操作每秒收到的条目数.
numRecordsOut 该操作/任务已发出的条目总数.
numRecordsOutPerSecond 该操作/任务每秒发出的条目数.
Operator latency 所有输入源的延迟分布.
numSplitsProcessed 数据源已经处理的输入分片总数(如果操作是一个数据源).

延迟跟踪

Flink允许去跟踪条目在整个系统中运行的延迟,为了开启延迟跟踪,latencyTrackingInterval(毫秒)必须在ExecutionConfig中设置为一个正值. 在latencyTrackingInterval,源端将周期性的发送一个特殊条目,叫做LatencyMarker,这个标记包含一个从源端发出记录时的时间戳。延迟标记不能超过常规的用户条目,因此如果条目在一个操作的前面排队,将会通过这个标记添加延迟跟踪.

请注意延迟标记不记录用户条目在操作中所花费的时间,而是绕过它们。特别是这个标记是不用于记录在窗口缓冲区中的时间条目。只有当操作不能够接受新的条目时,它们才会排队,用这个标记测量的延迟将会反映出这一点.

所有中间操作通过保留每个源的最后n个延迟的列表,来计算一个延迟的分布。落地操作保留每个源的列表,然后每个并行源实例允许检测由单个机器所引起的延迟问题.

目前,Flink认为集群中所有机器的时钟是同步的。我们建议建立一个自动时钟同步服务(类似于NTP),以避免虚假的延迟结果.

仪表盘集成

为每个任务或者操作所收集到的指标,也可以展示在在仪表盘上。在一个作业的主页面,选择Metrics选项,在顶部图选择一个任务后,可以使用Add Metrics下拉菜单来选择要展示的指标值

  • 任务指标被列为 <subtask_index>.<metric_name>.
  • 操作指标被列为 <subtask_index>.<operator_name>.<metric_name>. 每个指标被可视化为一个单独的图表,用x轴表示时间和y轴表示测量值。所有的图表每10秒自动更新一次,并在导航到另一页时继续执行.

这里对可视化指标的数量没有限制;但是只有数值型指标可以可视化。