vlambda博客
学习文章列表

大数据开发系列二:自定义flink Metric kafka Reporter

点击上方“IT那活儿”,关注后了解更多内容,不管IT什么活儿,干就完了!!!

我们通常在实现大数据场景的时候,需要分析任务上线后内部性能建康状况,比如cpu使用比率,内存使用率,checkpoint生成情况,任务是否异常等,像这些指标通过人肉运维方式是无法实时捕获异常信息的,所以需要一套实时监测体系来监测任务运行状态,通过实时获取相关指标数据,对数据进行规则告警,可以更好的分析定位问题。


基于这个需求,flink本身提供了很多的任务运行时刻Metrics相关指标,避免任务的运行处于黑盒状态,通过分析指标,可以快速的调整任务的资源、定位遇到的问题。目前获取 任务Metrics 有三种方式:

方式一:

通过flink  WebUI 进入Metrics 选项卡,根据不同算子,选择需要监测指标,实时查看指标数据,缺点比较明显,无法查看历史监测数据,需要一直打开,并且无法设置告警,适合开发过程时使用。

大数据开发系列二:自定义flink Metric kafka Reporter

方式二:

官方提供了一种通过 REST API获取方式指标的方式,

方式三:

flink 提供了一种MetricsReporter机制,可以将各个组件的metrics数据,通过不同的Metric Reporter插件将数据自动暴露给外部系统,这样可以充份利用使用第三方的存储和分析能力。

目前flink已经支持了很多reporter,如Graphite、JMX、InfluxDB、Prometheus等,不管用哪一处方式,都需要额外部署第三方系统来,进行接收、解析、分析metric数据。

我们本身已有了自动化运维平台,不会考虑部署像Prometheus这样的第三方平台,需要做的是如何将metric数据跟自动运维平台告警模块进行对接使用,告警模块主要是通过kafka进行对接数据,所以采用自定义kafka reporter 解决数据对接问题。

flink metrices指标项比较多,指标数据量级跟所跑的任务个数有着直接的关系,我们关注的核心指标项,对核心指标进行规则告警,接下来介绍如何基于flink 现有的reporter 代码实现kafka reporter功能点:

1. 下载对应版本flink 分支代码,如

https://github.com/apache/flink/releases/tag/release-1.13.6

2. 解压源代码,导入开发工具,查看flink-metrics模块代码

大数据开发系列二:自定义flink Metric kafka Reporter

3. 根据自己实际场景,对吐数据格式要求,参考不同自带模板代码,以flink-metrics-InfluxDB模块代码为例,新建flink-metrics-kafka

大数据开发系列二:自定义flink Metric kafka Reporter

3.1 修改flink-metrics/pom.xml 文件,新增<module>flink-metrics-kafka</module>。

3.2 新增KafkaReporterFactory主程序,需要实现MetricReporterFactory接口并重写方法。

@InterceptInstantiationViaReflection(
        reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
public class KafkaReporterFactory implements MetricReporterFactory {

    @Override
    public MetricReporter createMetricReporter(Properties properties) {
        return new KafkaReporter();
    }
}

3.3 新增KafkaReporter实现类,需要继承AbstractReporter并实现Scheduled接口并重写方法,主要作用是收集指标数据,并推送到kafka。

@Override
public void open(MetricConfig metricConfig) {
    LOG.info("metricConfig:" + metricConfig);
    topic = metricConfig.getString("topic", "");
    if (StringUtils.isBlank(topic)) {
        LOG.error("metrics.reporter.kafka_reporter.topic is null");
    }
    String endsWithMetric = metricConfig.getString("endswith.metricname", "").trim(); //指定需要获取指标名称
    endsWithMetricList = Arrays.asList(endsWithMetric.split(","));
    String bootstrapservers = metricConfig.getString("bootstrap.servers", "");
    if (StringUtils.isBlank(bootstrapservers)) {
        LOG.error("metrics.reporter.kafka_reporter.bootstrap.servers is null");
    }
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapservers);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(
            "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProducer = new KafkaProducer<String, String>(properties);
}
  • 指标数据的类型及格式是根据它所归属metrics类型(Counters/Gauges/Histograms/Meters)有关,然后我们可以对指标数据进行格式化输出所需要格式到kafka。

① Counters: 统计的是一个累加值,用与存储数值类型指标数据。

② Gauges:用来存储任何类型指标数据。

③ Histograms:度量值的统计结果,如平均值、最大值等。

④ Meters:用来计算平均速率,平均吞吐量等。

@Override
public void report() 
{
    tryReport();
}
private final ObjectMapper mapper = new ObjectMapper();
private void tryReport() {
    Instant timestamp = Instant.now();
    try {
        String job_id = "";
        String job_name = "";
        List<MeasurementInfo> metriclist = new ArrayList<>();
        metriclist.addAll(gauges.values()); //获取gauges类型指标集
        metriclist.addAll(counters.values());//获取gauges类型指标集
        metriclist.addAll(histograms.values());//获取histograms类型指标集
        metriclist.addAll(meters.values());//获取meters类型指标集
//每个指标数据里面都加上对应的job_id, job_name
        for (MeasurementInfo info : metriclist) {
            if (info.getName().startsWith("jobmanager_job_")
                    || info.getName().startsWith("taskmanager_job_")) {
                job_id = info.getTags().getOrDefault(JOB_ID, "");
                job_name = info.getTags().getOrDefault(JOB_NAME, "");
                if (StringUtils.isBlank(job_id) || StringUtils.isBlank(job_name)) {
                    LOG.error("do not get job_id or job name:{}", info);
                }
                break;
            }
        }
        List<Map> list = new ArrayList<>();
//根据不同metrices类型遍历指标数据,只获取指定的指标项数据
        for (Map.Entry<Gauge<?>, MeasurementInfo> entry : gauges.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        for (Map.Entry<Counter, MeasurementInfo> entry : counters.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        for (Map.Entry<Histogram, MeasurementInfo> entry : histograms.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        for (Map.Entry<Meter, MeasurementInfo> entry : meters.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        if (list.size() > 0) {
            ProducerRecord<String, String> record =
                    new ProducerRecord<String, String>(
                            topic, null, mapper.writeValueAsString(list));
            kafkaProducer.send(record); //发送组装好指标数据到kafka
        }

    } catch (ConcurrentModificationException
            | NoSuchElementException
            | JsonProcessingException e) {
               LOG.error(e.getMessage());
        return;
    }
}

新增flink-metrics-kafka项目有两种打包方式:

  • 基于完整的flink源码项目,进行全量打包。

  • 保留flink maven父子结构,flink parent pom.xml 只留<module>snc-flink-metrics</module>编译成功后,将snc-flink-metrics.jar 放入flink/lib 目录下。

3.4 修改flink配置文件conf/flink-conf.yaml,主要包括Reporter全类名,上报周期,指定所需的指标名。

  • metrics.reporterskafka_reporter

  • metrics.reporter.kafka_reporter.factory.classorg.apache.flink.metrics.kafka.KafkaReporterFactory

  • metrics.reporter.kafka_reporter.interval60 SECONDS

  • metrics.reporter.kafka_reporter.bootstrap.servers:XXX.XXX.XXX.10:9090

#kafkatopic

  • metrics.reporter.kafka_reporter.topic:kafka_topic

#指标名称按后缀进行过滤,注释则不过滤

  • metrics.reporter.kafka_reporter.endswith.metricname:job_numRestarts,job_restartingTime,job_uptime,currentOutputWatermark,Status_JVM_CPU_Load,Status_JVM_Memory_Heap_Used

3.5 提交任务,消费kafka 可以获取对应的数据。

[
  {
    "name": "jobmanager.uptime",
    "time": 1647314569119,
    "fields": {
      "value": 1703478823
    },
    "tags": {
      "host": "bigdata-03",
      "job_id": "dc7a58b3f202059cd72c3467ecedb4b7",
      "job_name": "amp_zabbix_pre"
    }
  },
  {
    "name": "jobmanager.Status.JVM.Memory.Heap.Max",
    "time": 1647314569119,
    "fields": {
      "value": 468713472
    },
    "tags": {
      "host": "bigdata-03",
      "job_id": "",
      "job_name": ""
    }
  },
  ...
]



大数据开发系列二:自定义flink Metric kafka Reporter


本文作者:暨景书 vs 郭 宪

本文来源:IT那活儿(上海新炬王翦团队)