搜文章
推荐 原创 视频 Java开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发
Lambda在线 > 17聊技术 > flume-ng源码分析-核心组件分析

flume-ng源码分析-核心组件分析

17聊技术 2017-11-06

从第一篇分析可知,flume中所有的组件都会实现LifecycleAware 接口。该接口定义如下:

在组件启动的时候会调用start方法,当有异常时调用stop方法。getLifecycleState 方法返回该组件的状态。包含IDLE, START, STOP, ERROR;

当在组件开发中需要配置一些属性的时候可以实现Configurable接口

flume-ng源码分析-核心组件分析

下面开始分析Agent中各个组件的实现


source 实现

source定义

flume-ng源码分析-核心组件分析

可以看到Source 继承了LifecycleAware 接口,并且提供了ChannelProcessor的getter和setter方法,channelProcessor在常用架构篇中讲到提供了日志过滤链,和channel选择的功能。所以Source的逻辑应该都在LifecycleAware中的start,stop方法中。

source创建

启动篇中,我们讲到了flume是如何启动的。大致流程就是读取配置文件,生成flume的各种组件,执行各个组件的start()方法。在getConfiguration()方法中调用了loadSources()方法。

可以看到在loadSources方法中如何创建Source的

flume-ng源码分析-核心组件分析

flume-ng源码分析-核心组件分析

从上面的分析中可以看出,Source是后SourceFactory创建的,创建之后绑定到SourceRunner中,并且在SourceRunner中启动了Source。

SourceFactory只有一个实现DefaultSourceFactory。创建Source过程如下:

flume-ng源码分析-核心组件分析

在创建重,通过type来或者source类的class。在getClass方法中,首先会去找type对应类型的class。在SourceType中定义的。如果没有找到,则直接获得配置的类全路径。最后通过Class.forName(String)获取class对象。

source提供了两种方式类获取数据:轮训拉去和事件驱动

flume-ng源码分析-核心组件分析

PollableSource 提供的默认实现如下:

flume-ng源码分析-核心组件分析

比如KafkaSource 利用Kafka的ConsumerApi,主动去拉去数据。

EventDrivenSource 提供的默认实现如下

flume-ng源码分析-核心组件分析

如HttpSource,Net catSource就是事件驱动的,所谓事件驱动也就是被动等待。在HttpSource中内置了一个Jetty server,并且设置FlumeHTTPServlet 作为handler去处理数据。


source的启动

从上面的分析中知道,在启动flume读取配置文件时,会将所有的组件封装好,然后再启动。对于Source而言,封装成了SourceRunner,通过SourceRunner间接启动Source。


flume-ng源码分析-核心组件分析

从上面可以看出SourceRunner 默认提供两种实现,PollableSourceRunner ,EventDrivenSource分别对应PollableSource 和EventDrivenSource。


查看PollableSourceRunner是如何启动的

flume-ng源码分析-核心组件分析

在PollableSourceRunner中我们看到单独启动一个线程去执行PollingRunner,这个线程的作用就是不断的去轮询。

查看PollingRunner的实现

flume-ng源码分析-核心组件分析

比如KafkaSource ,它的逻辑就在process方法中

flume-ng源码分析-核心组件分析

EventDrivenSourceRunner

flume-ng源码分析-核心组件分析

可以看到EventDrivenSourceRunner和PollableSourceRunnner 启动流程大致相同,只是PollableSourceRunner会额外启动一个线程去轮询source。


channel的实现

source 获取到数据后,会交给channelProcessor处理,发送到channel。最后由sink消费掉。所以channel是source,sink实现异步化的关键。

channelProcessor 中两个重要的成员

    private final ChannelSelector selector;//channel选择器器

    private final InterceptorChain interceptorChain; //过滤链

InterceptorChain 是由多个Interceptor组成,并且实现了Interceptor接口

flume-ng源码分析-核心组件分析

Interceptor.java

flume-ng源码分析-核心组件分析

Interceptor定义了一些处理Event的接口,再Event处理之后都会返回改Envent

从source的分析中我们可以知道,如果是PollableSourceRunner会调用source 中的process()方法。如果是EventDrivenSourceRunner,就会用特定的方法来获取source,比如httpSource 利用FlumeHTTPServlet来接受消息。

flume-ng源码分析-核心组件分析

比如KafkaSource 是PollableSourceRunner 那么会调用KafkaSource中的process()方法。

flume-ng源码分析-核心组件分析

从以上分析不管source是轮询还是事件驱动的,都会触发ChannelProcessor中的processEvent或者ProcesEventBatch方法

flume-ng源码分析-核心组件分析

最后就是ChannelSelector ,flume默认提供两种实现多路复用和复制。多路复用选择器可以根据header中的值而选择不同的channel,复制就会把event复制到多个channel中。flume默认是复制选择器。

flume-ng源码分析-核心组件分析

同样Selector的创建也是通过ChannelSelectorFactory创建的。

flume-ng源码分析-核心组件分析

默认提供复制选择器,如果配置文件中配置了选择器那么就从配置文件中获取。

上面看到在processEventBatch 方法中调用channel的put方法。channel中提供了基本的

put和take方法来实现Event的流转。

flume-ng源码分析-核心组件分析

flume提供的默认channel如下图所示:

flume-ng源码分析-核心组件分析


sink的实现

sink的定义:

flume-ng源码分析-核心组件分析

提供了channel的setter,getter方法。process方法用来消费。并返回状态READY,BACKOFF


sink的创建

flume-ng源码分析-核心组件分析

sink的创建也是通过sinkFactory

flume-ng源码分析-核心组件分析

通过传入的type找到对应的Class 要是没有找到则直接通过Class.forNamae(String name)来创建sink还提供了分组功能。该功能由SinkGroup实现。在SinkGroup内部如何调度多个Sink,则交给SinkProcessor完成。


sink的启动

和Source一样,flume也为Sink提供了SinkRunner来流转Sink

在sinkRunner中

flume-ng源码分析-核心组件分析

sinkRunner中通过启动SinkProcessor 间接启动Sink,并且单独启动一个线程,不停地调用process()方法从channel中消费数据

在SinkProcessor中,如果是DefaultSinkProcessor 那么直接调用sink.start()方法启动sink。如果是LoadBalancingSinkProcessor,FailoverSinkProcessor由于这两种处理器中包含多个Sink,所以会依次遍历sink

调用start()方法启动

flume-ng源码分析-核心组件分析

该线程会不停的执行SinkProcessor的process()方法,而SinkProcessor的process()方法会调用对应的Sink的process()方法。然后判断处理状态如果是失败补偿,那么等待超时时间后重试


SinkGroup

flume-ng源码分析-核心组件分析

SinkGroup中包含多个Sink,并且提供一个SinkProcessor来处理SinkGroup内部调度

SinkProcessor

SinkProcessor 默认提供三种实现。

DefaultSinkProcessor,LoadBalancingSinkProcessor,FailoverSinkProcessor

flume-ng源码分析-核心组件分析

DefaultSinkProcessor:默认实现,适用于单个sink

LoadBalancingSinkProcessor:提供负载均衡

FailoverSinkProcessor:提供故障转移


DefaultSinkProcessor

flume-ng源码分析-核心组件分析

从上面可以看出DefaultSinkProcessor 只能处理一个Sink。在process方法中调用sink的方法。具体到某个具体的Sink,比如HDFSEventSink,那么就执行该sink的process方法

接下来分析SinkProcessor中负载均衡和故障转移 是如何具体实现的。


FailOverSinkProcessor 实现分析

FailOverSinkProcessor 中process()方法实现如下:


flume-ng源码分析-核心组件分析

存活队列是一个SortMap 其中key是sink的优先级。activeSink 默认取存活队列中的最后一个,存活队列是根据配置的sink优先级来排序的

失败队列是一个优先队列,按照FailSink的refresh属性进行排序

flume-ng源码分析-核心组件分析

refresh 属性,在FailSink创建时和sink 处理发生异常时 会触发调整

refresh 调整策略 如下:

flume-ng源码分析-核心组件分析

refresh 等于系统当前的毫秒加上最大等待时间(默认30s)和失败次数指数级增长值中最小的一个。

FAILURE_PENALTY等于1s;(1 << sequentialFailures) * FAILURE_PENALTY)用于实现根据失败次数等待时间指数级递增。

一个配置的failOver具体的例子:

host1.sinkgroups = group1

host1.sinkgroups.group1.sinks = sink1 sink2

host1.sinkgroups.group1.processor.type = failover

host1.sinkgroups.group1.processor.priority.sink1 = 5

host1.sinkgroups.group1.processor.priority.sink2 = 10

host1.sinkgroups.group1.processor.maxpenalty = 10000


LoadBalancingSinkProcessor实现分析

loadBalaneingSinkProcessor 用于实现sink的负载均衡,其功能通过SinkSelector实现。类似于ChannelSelector和Channel的关系

flume-ng源码分析-核心组件分析

flume-ng源码分析-核心组件分析


SinkSelector中模式有三种实现

1.固定顺序

2.轮询

3.随机

LoadBalancingSinkProcessor 中使用均衡负载的方式

flume-ng源码分析-核心组件分析

在上面的解释中,最大的两个疑惑就是:

    这个Sink迭代器也就是createSinkIterator() 是如何实现的

    发生异常后SinkSelector的处理是如何实现的

先来看createSinkIterator 的实现。首先看RoundRobinSinkSelector的实现

flume-ng源码分析-核心组件分析

如上图所示RoundRobinSinkSelector 内部包含一个OrderSelector的属性。

flume-ng源码分析-核心组件分析

内部通过一个RoundRobinOrderSelector 来实现。查看起createIterator实现

flume-ng源码分析-核心组件分析

接下来看一下getIndexList 的实现

stateMap是一个LinkedHashMap其中T在这里指的是Sink。

如果没有开启了退避算法,那么会认为每个sink都是存活的,所有的sink都加到IndexList。否则等到了失败补偿时间才会加入到IndexList。可以通过processor.backoff = true配置开启

最后分析一下当sink处理失败SinkSelector是如何处理的

CONSIDER_SEQUENTIAL_RANGE 是一个常量 只为1小时。

EXP_BACKOFF_COUNTER_LIMIT 为期望最大的退避次数,值为16。如果上次失败到现在的是哪在上次退避等待时间超过一个小时后 或者 退避次数超过EXP_BACKOFF_COUN

TER_LIMIT 那么退避的等待时间将不再增加。



版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《flume-ng源码分析-核心组件分析》的版权归原作者「17聊技术」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

关注17聊技术微信公众号

17聊技术微信公众号:gh_0c70060310cf

17聊技术

手机扫描上方二维码即可关注17聊技术微信公众号

17聊技术最新文章

精品公众号随机推荐