Processor API Guide

The processor has two responsibilities: choosing which aggregator to choose for a metric instrument and store the last record for each metric ready to be exported.

有两个职责:为指标度量工具选择哪一个聚合器,和存储每一个导出的指标的最后一条记录。

为指标选择特定的聚合器

有时,您可能希望对您的指标之一使用特定的聚合器,导出最后 X 值的平均值,而不是仅导出最后一个值。

下面是一个聚合器的使用样例:

  1. import { Aggregator } from '@opentelemetry/sdk-metrics-base';
  2. import { hrTime } from '@opentelemetry/core';
  3. export class AverageAggregator implements Aggregator {
  4. private _values: number[] = [];
  5. private _limit: number;
  6. constructor (limit?: number) {
  7. this._limit = limit ?? 10;
  8. }
  9. update (value: number) {
  10. this._values.push(value);
  11. if (this._values.length >= this._limit) {
  12. this._values = this._values.slice(0, 10);
  13. }
  14. }
  15. toPoint() {
  16. const sum =this._values.reduce((agg, value) => {
  17. agg += value;
  18. return agg;
  19. }, 0);
  20. return {
  21. value: this._values.length > 0 ? sum / this._values.length : 0,
  22. timestamp: hrTime(),
  23. }
  24. }
  25. }

现在我们需要实现我们自己的处理器来配置 sdk 以使用我们的新聚合器。 为了进一步简化,我们将只扩展 UngroupedProcessor(这是默认设置)以避免重新实现整个 Aggregator 接口。

结果如下:

  1. import {
  2. UngroupedProcessor,
  3. MetricDescriptor,
  4. CounterSumAggregator,
  5. ObserverAggregator,
  6. MeasureExactAggregator,
  7. } from '@opentelemetry/sdk-metrics-base';
  8. export class CustomProcessor extends UngroupedProcessor {
  9. aggregatorFor (metricDescriptor: MetricDescriptor) {
  10. if (metricDescriptor.name === 'requests') {
  11. return new AverageAggregator(10);
  12. }
  13. // this is exactly what the "UngroupedProcessor" does, we will re-use it
  14. // to fallback on the default behavior
  15. switch (metricDescriptor.metricKind) {
  16. case MetricKind.COUNTER:
  17. return new CounterSumAggregator();
  18. case MetricKind.OBSERVER:
  19. return new ObserverAggregator();
  20. default:
  21. return new MeasureExactAggregator();
  22. }
  23. }
  24. }

最后,我们需要指定MeterProvider在创建新仪表时使用我们的CustomProcessor

  1. import {
  2. UngroupedProcessor,
  3. MetricDescriptor,
  4. CounterSumAggregator,
  5. ObserverAggregator,
  6. MeasureExactAggregator,
  7. MeterProvider,
  8. Aggregator,
  9. } from '@opentelemetry/sdk-metrics-base';
  10. import { hrTime } from '@opentelemetry/core';
  11. export class AverageAggregator implements Aggregator {
  12. private _values: number[] = [];
  13. private _limit: number;
  14. constructor (limit?: number) {
  15. this._limit = limit ?? 10;
  16. }
  17. update (value: number) {
  18. this._values.push(value);
  19. if (this._values.length >= this._limit) {
  20. this._values = this._values.slice(0, 10);
  21. }
  22. }
  23. toPoint() {
  24. const sum =this._values.reduce((agg, value) => {
  25. agg += value;
  26. return agg;
  27. }, 0);
  28. return {
  29. value: this._values.length > 0 ? sum / this._values.length : 0,
  30. timestamp: hrTime(),
  31. }
  32. }
  33. }
  34. export class CustomProcessor extends UngroupedProcessor {
  35. aggregatorFor (metricDescriptor: MetricDescriptor) {
  36. if (metricDescriptor.name === 'requests') {
  37. return new AverageAggregator(10);
  38. }
  39. // this is exactly what the "UngroupedProcessor" does, we will re-use it
  40. // to fallback on the default behavior
  41. switch (metricDescriptor.metricKind) {
  42. case MetricKind.COUNTER:
  43. return new CounterSumAggregator();
  44. case MetricKind.OBSERVER:
  45. return new ObserverAggregator();
  46. default:
  47. return new MeasureExactAggregator();
  48. }
  49. }
  50. }
  51. const meter = new MeterProvider({
  52. processor: new CustomProcessor(),
  53. interval: 1000,
  54. }).getMeter('example-custom-processor');
  55. const requestsLatency = meter.createValueRecorder('requests', {
  56. monotonic: true,
  57. description: 'Average latency'
  58. });