搜文章
推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > 17聊技术 > flume-ng 源码分析-整体架构之一【启动篇】

flume-ng 源码分析-整体架构之一【启动篇】

17聊技术 2017-11-06

什么是flume

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。flume常用场景:log-->flume-->[hdfs,hbase,kafka],收集日志并落地到各种不同的存储,以供不同需求的计算。


主要模块介绍

flume源码结构

flume-ng-core:

flume的整个核心框架,包含了各个模块的接口以及逻辑关系实现。core下大部分代码都是source,channle,sink中。

flume-ng-channels:

里面包含了fileChannel,jdbcChannel,kafkaChannel,spillableMemoryChannel等通道实现。

flume-ng-sinks

各种sink的实现,包括但不限于:hdfsSink,hiveSink,esSink,kafkaSink。

flume-ng-sources:

各种source的实现,包括但不限于: jms,kafka,scirbe,twitter.其他source则在flume-ng-core模块中。

flume-ng-node:

实现flume的一些基本类。包括agent的main(Application)。这也是我们的分析代码的入口类。


一个agent包含三个基本组件:source、channal、sink

flume-ng 源码分析-整体架构之一【启动篇】

flume逻辑结构


flume启动脚本flume-ng分析

#####################################################################

# constants flume常量的设定,不同环境执行不同的类

#####################################################################

FLUME_AGENT_CLASS="org.apache.flume.node.Application"

FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"

FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"

FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"


#####################################################################

#真正启动flume,具体由$FLUME_APPLICATON_CLASS指定

#####################################################################

run_flume() {

    local FLUME_APPLICATION_CLASS

    if [ "$#" -gt 0 ]; then

        FLUME_APPLICATION_CLASS=$1

        shift

    else

        error "Must specify flume application class" 1

    fi

    if [ ${CLEAN_FLAG} -ne 0 ]; then

        set -x

    fi

    $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp

"$FLUME_CLASSPATH" \    -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*

}

##################################################

# main 启动过程中用到的变量,都可以在启动的时指定

# 如果不设置java堆空间大小,默认大小为20M,可以在flume.sh中进行设置

##################################################

# set default params

FLUME_CLASSPATH=""

FLUME_JAVA_LIBRARY_PATH=""

JAVA_OPTS="-Xmx20m"

LD_LIBRARY_PATH=""

opt_conf=""

opt_classpath=""

opt_plugins_dirs=""

arr_java_props=()

arr_java_props_ct=0

opt_dryrun=""

mode=$1

shift

##################################################

#最后根据不同参数启动不同的类,可以看到启动agent时,

#执行的是flume-ng-node中Applicaton.java

# finally, invoke the appropriate command

##################################################

if [ -n "$opt_agent" ] ; then

    run_flume $FLUME_AGENT_CLASS $args

elif [ -n "$opt_avro_client" ] ; then

    run_flume $FLUME_AVRO_CLIENT_CLASS $args

elif [ -n "${opt_version}" ] ; then

    run_flume $FLUME_VERSION_CLASS $args

elif [ -n "${opt_tool}" ] ; then

    run_flume $FLUME_TOOLS_CLASS $args

else

    error "This message should never appear" 1

fi


agent的启动分析Application.java

从上面的分析可以知道当我们启动一个Agent时,执行的是org.apache.flume.node.Application

看main函数的源码

flume-ng 源码分析-整体架构之一【启动篇】

主要是对命令行参数的校验和解析

在我们启动Agent时,会指定-n -f等一些参数

继续往下看

flume-ng 源码分析-整体架构之一【启动篇】

从以上代码我们可以看出,当配置文件是配置的是zk上的路径时,如果需要reload,则会启动PollingZooKeeperConfigurationProvider,该类里面会监听zk的变化,再通过guava的EventBus(类似于观察者模式,EventBus),传递消息。

注意

此时只是将PollingZooKeeperConfigurationProvider加入components中,并没有真正的启动。

PollingZooKeeperConfigurationProvider 部分关键代码

flume-ng 源码分析-整体架构之一【启动篇】

在zk node上设置listener,如果zk node有任何的变化则会触发refreshConfiguration方法

flume-ng 源码分析-整体架构之一【启动篇】

好了我们继续分析Application的代码。上面讲到了利用zk来做flume配置文件的代码。当然flume也支持本地文件的方式。代码如下:

flume-ng 源码分析-整体架构之一【启动篇】

如果-f 指定的配置文件不存在,那么将快速失败,抛出异常。

再判断配置文件发生改变时是否需要重新reload,套路和用zk保存配置文件一个道理如果需要动态加载配置文件,那么启动PollingPropertiesFileConfigurationProvider,每三十秒加载一次配置文件。

之后执行application.start()方法。让我们继续看start()方法

flume-ng 源码分析-整体架构之一【启动篇】

在start方法中遍历compents 执行supervisor.suervise()方法.

在继续分析之前我们先看一下LifecycleSupervisorPollingPropertiesFileConfigurationProvider 的类结构。

flume-ng 源码分析-整体架构之一【启动篇】

从以上两图中可以看出它们都实现了LifecycleAware接口。这个接口定义了flume组件的生命周期。

LifecycleSupervisor提供了实现。

LifecycleAware.java

flume-ng 源码分析-整体架构之一【启动篇】

让我们继续分析LifecycleSupervisor.supervise()方法

flume-ng 源码分析-整体架构之一【启动篇】

在上面的代码中创建了一个MonitorRunnable对象,通过jdk的scheduleWithFixedDelay进行定时调用,每次执行完成延迟3秒调度。

再看monitorRunable中的内容。

run 方法中部分内容。

flume-ng 源码分析-整体架构之一【启动篇】

flume-ng 源码分析-整体架构之一【启动篇】

首先因为monitorRunnbale对象是重复调用的,所以在run方法中作了一个状态判断,当该组件的状态不等于期望的状态时继续往下执行,否则什么都不做。这样避免重复启动。当组件第一次被启动的时候,组件本身的状态是IDEL,而desired state 是START,此时就会执行组件的start方法。

总结一下启动的时序图

flume-ng 源码分析-整体架构之一【启动篇】


比如启动PollingPropertiesFileConfigurationProvider组件,这个组件的作用就是定时去获取flume的配置。那么会调用PollingPropertiesFileConfigurationProvider的start方法。

下面以PollingPropertiesFileConfigurationProvider为例,分析flume的配置是如何动态载入的。


配置载入分析

从上面分析得知,

启动PollingPropertiesFileConfigurationProvider ,则执行该组件的start方法。

查看start方法如下

flume-ng 源码分析-整体架构之一【启动篇】

在start方法中单独启动一个线程,执行FileWatcherRunnable,并设置状态为START

继续看fileWatcher

flume-ng 源码分析-整体架构之一【启动篇】

在fileWatcher中通过对文件修改时间来判断配置文件是否发生变化。如果配置文件发生变化调用eventBus.post(getConfiguration()); 将配置文件的内容发布。

在Application.java 中有如下代码

flume-ng 源码分析-整体架构之一【启动篇】

此方法订阅了eventBus的消息。当一有消息将会触发该方法,此方法的功能相当于重启flume组件。还记得上面分析的代码吗?要是用户配置no-reload-conf 那么将会直接调用该方法。

那么getConfiguration()方法是如何实现的呢?

flume-ng 源码分析-整体架构之一【启动篇】

getConfiguration()中调用了getFlumeConfiguration()方法;getFlumeConfiguration() 是一个抽象方法,以PollingPropertiesFileConfigurationProvider 实现为例。该实现在父类中。

flume-ng 源码分析-整体架构之一【启动篇】

该方法通过基本的流加载方法返回FlumeConfigruation对象。该对象封装一个Map对象

。在FlumeConfigruation的构造函数中将会遍历这个Map对象,调用addRawProperty方法

该方法首先会进行一些合法性的检查,并且该方法会创建一个AgentConfiguration对象的aoconf。

该方法最后调用aconf.addProperty 方法。

在aconf.addProperty方法中会区分source,channel,sink ,sinkgroup。将对应的配置信息放在sourceContextMap,channelContextMap,sinkContextMap,sinkGroupContextMap。这些信息封装在AgentConfiguration,AgentConfiguration封装在FlumeConfiguration中,key是agentName。使用时可以通过getConfigurationFor(String hostname) 来获取。

flume如何获自定义的key

在上面的分析中addProperty方法中,调用了parseConfigKey方法。

cnck = parseConfigKey(key,BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX);

具体实现如下:

上面代码中prefix为定义的常量如下:

● 比如我们配置的格式是agent1.sources.source1.type=avro(注意在后面parse时,agent1.已经被截取掉)

● 在上面的parseKey方法中首先会判断prefix的后面有多少个字符

● 解析出name 。source1就是name

● 解析出configKey 。type就是configKey

● 封装为ComponentNameAndConfigKey

● 然后有上面的分析把sources、channel、sink配置信息,分别存放到sourceContextMap、

channelConfigMap、sinkConfigMap三个HashMap,这些信息封装AgentConfiguration,AgentConfiguration封装在FlumeConfiguration中,key是agentName。使用时可以通过getConfigurationFor(String hostname) 来获取


总结

以上分析了flume启动agent的流程。部分源码没有贴出来,可以自行阅读;以及flume中如何解析。

用户自定义的source,channel,sink;以及flume如何用zk listener和fileWatcher实现配置文件的动态加载。下篇主要讲解flume整体架构--常用架构篇。


版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《flume-ng 源码分析-整体架构之一【启动篇】》的版权归原作者「17聊技术」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

关注17聊技术微信公众号

17聊技术微信公众号:gh_0c70060310cf

17聊技术

手机扫描上方二维码即可关注17聊技术微信公众号

17聊技术最新文章

精品公众号随机推荐