vlambda博客
学习文章列表

logback通过kafka接入ELK详解

数据流向图示参考(图片来自百度以图搜图,向原作者致敬)

logback通过kafka接入ELK详解

logback通过kafka接入ELK详解

开发人员如何使用elk

开发人员需要做的就是将日志信息通过各种渠道发到kafka,本实例以logback为例进行说明,其他接入方式请自行搜索。请先行准备好可以正常使用的已经接入logback的系统,==如果你没有准备好,本文将假装你已经准备好。==

logback接入elk

  1. pom文件引入如下依赖

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.1.0</version>
<scope>runtime</scope>
</dependency>

2.在logback配置文件中增加相应的appender

<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout" >
<includeContext>true</includeContext>
<includeCallerData>true</includeCallerData>
<customFields>{"system":"test"}</customFields>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</layout>
<charset>UTF-8</charset>
</encoder>
<!--kafka topic 需要与配置文件里面的topic一致 否则kafka会沉默并鄙视你-->
<topic>applog</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<producerConfig>bootstrap.servers=10.39.232.218:9092</producerConfig>
</appender>

<!--你可能还需要加点这个玩意儿-->
<logger name="Application_ERROR">
<appender-ref ref="KafkaAppender"/>
</logger>

<!--还有这个玩意儿-->
<root>
<level value="INFO" />
<appender-ref ref="CONSOLE" />
<appender-ref ref="KafkaAppender" />
</root>
  1. 代码中使用

  • 如果你将KafkaAppender添加到了root标签下 那么你的INFO极其更严格的基本的日志都将发给kafka,例如你可以这样使用

Logger logger = LoggerFactory.getLogger(this.getClass());
logger.debug("This is a debug message"); 此条日志收日志级别限制 不会同步到kafka
logger.info("This is an info message");
logger.warn("This is a warn message");
logger.error("This is an error message");
  • 如果你将KafkaAppender添加到了Application_ERROR下 那么你可以这样使用

Logger logger= LoggerFactory.getLogger("Application_ERROR");
logger.debug("This is a debug message");
logger.info("This is an info message");
logger.warn("This is a warn message");
logger.error("This is an error message");
  1. 数据查询 在一切顺利的情况下,你可以通过kibana查到类似的记录logback通过kafka接入ELK详解

配置解释

前面我们按照配置成功的将项目的日志信息写入到了elk中,并在kibana上查到了相应的日志信息。接下来我们相信解析一下前面的配置。

是否包含上下文

<includeContext>true</includeContext>

开启的话会包含hostname等logback的context信息

logback通过kafka接入ELK详解


是否包含日志来源

<includeCallerData>false</includeCallerData>

差异如下

"caller": {
"class": "com.example.elkdemo.com.example.elktest.utils.LogHelper",
"method": "helpMethod",
"file": "LogHelper.java",
"line": 11
}

自定义附加字段

<customFields>{"system":"test"}</customFields>

自定义字段的简称

<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>

内容如下

public class ShortenedFieldNames extends LogstashFieldNames {
public static final String FIELD_LOGGER = "logger";
public static final String FIELD_THREAD = "thread";
public static final String FIELD_LEVEL_VAL = "levelVal";
public static final String FIELD_CALLER = "caller";
public static final String FIELD_CLASS = "class";
public static final String FIELD_METHOD = "method";
public static final String FIELD_FILE = "file";
public static final String FIELD_LINE = "line";
public static final String FIELD_STACKTRACE = "stacktrace";

public ShortenedFieldNames() {
this.setLogger("logger");
this.setThread("thread");
this.setLevelValue("levelVal");
this.setCaller("caller");
this.setCallerClass("class");
this.setCallerMethod("method");
this.setCallerFile("file");
this.setCallerLine("line");
this.setStackTrace("stacktrace");
}
}

前面我们讲解了,生成的JSON数据日志的一些layout属性配置,下面我们讲解一下appender将日志信息发送给kafka的时候的一些配置

概念准备

本文使用的logback-kafka-appender相对于kafka集群来说就是kafka集群消息的生产者。

  • producer:  消息生产者,发布消息到 kafka 集群的终端或服务。

  • broker:  kafka 集群中包含的服务器。

  • topic:  每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。

  • partition:  partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。

  • consumer:  从 kafka 集群中消费消息的终端或服务。

  • Consumer group:  high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

  • replica:  partition 的副本,保障 partition 的高可用。

  • leader:  replica 中的一个角色, producer 和 consumer 只跟 leader 交互。

  • follower:  replica 中的一个角色,从 leader 中复制数据。

  • controller:  kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。

  • zookeeper:  kafka 通过 zookeeper 来存储集群的 meta 信息。

kafka消息提交策略(deliveryStrategy标签用)

由于logback的日志需要通过网络提交到kafka集群,难免会存在因为网络不稳定、kafka集群不稳定等原因导致日志信息不能抵达的情况,此时配置好相应的策略告诉logback-kafka-appender怎么解决这个问题就显得至关重要。

logback-kafka-appender为我们提供了两种策略,异步提交策略(AsynchronousDeliveryStrategy)和阻塞提交策略(BlockingDeliveryStrategy)

异步提交策略(AsynchronousDeliveryStrategy)

任何消息提交给kafka生产者,如果因为某些原因导致交付,该消息会被分发给备胎appenders,可是在网络(比如与kafka服务器的连接断了)有问题时这种交付策略会在生产者的发送缓冲区塞满了以后堵塞。为了避免这个阻塞的产生,我们可以开启producerConfig block.on.buffer.full = false。开启后所有不能快速通过网络抵达kafka集群的消息都会被分发到备胎appender。

阻塞提交策略(BlockingDeliveryStrategy)

这个策略会一直阻塞调用线程直到每一个日志消息实际抵达kafka。这种策略会导致比较消极和不开心以及让人难过的影响,因为它对吞吐量有巨大的负面影响。

==警告:这个策略不应使用一起producerConfig linger.ms==

==温馨提示==

异步提交策略(AsynchronousDeliveryStrategy)并不能阻止在向kafka提交数据时候的阻塞。这意味着:假如在logging上下文启动的时候所有的kafka服务器都不可达,或全部kafka服务器在配置的时间段(>metadata.max.age.ms)依然不可达,你的appender最终也将阻塞。这种行为是我们不希望看到的,kafka-clients migitated 0.9版本已改善这种情况(见#16)。在此之前的版本,可使用如下方案。

<configuration>

<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!-- Kafka Appender configuration -->
</appender>

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="kafkaAppender" />
</appender>

<root level="info">
<appender-ref ref="ASYNC" />
</root>
</configuration>

自定义策略

当然 如果你认为以上策略不是你的菜或者你实在看不惯这些策略,你也可以自定义你自己的策略,你只需要继承om.github.danielwegener.logback.kafka.delivery.DeliveryStrategy就可以了

备胎(fallback)appender

假如某些原因导致我们的生产者(logback-kafka-appender)不能发布日志某条消息,这条消息还可以通过配置的备胎appender(比如基于STDOUT和STDERR的ConsoleAppender)记录下来。

作为开发者,我们只需要在相应的可能出现问题的appenders里面加入一个标签appender-ref,所有不能抵达的消息日志都会分发到appender-ref申明的appenders去。

比如  STDOUT就是后备方案

值得注意的是,采用异步提交策略会重用原来的kafka生产者的io线程写消息到后备appenders,因此所有的后备方案的appenders的io速度必须满足要求,否则可能拖慢速度或者导致宕机。

生产者(logback-kafka-appender)配置调优(基于kafka0.8.2,logback-kafka-appender默认配置)

对logback-kafka-appender的所有配置 我们都可以通过 Name=Value 进行同名覆盖,这给予我们更好的微调能力(比如 batch.size, compression.type 和 linger.ms)。

序列化

这个模块提供了一个与logback的LayoutWrappingEncoder类似的LayoutKafkaMessageEncoder(他们的区别就是以创建字节数组取代同步输出流)

LayoutKafkaMessageEncoder使用了常用的ch.qos.logback.core.Layout作为layout-parameter(布局参数)。

这就允许我们使用任何实现了 ILoggingEvent 或者 IAccessEvent 的组件,比如众所周知的PatternLayout和logstash-logback-encoder的LogstashLayout。

自定义序列化(略)

https://github.com/danielwegener/logback-kafka-appender#keying-strategies--partitioning

主键分区策略

kafka的可伸缩性和顺序保证严重依赖分区的概念。对于应用日志来说这就意味着我们需要思考我们想要如何分配日志消息到多个kafka的topic分区。这个决定的一个含义是这些消息在被多个不同消费者消费时有序,因为kafka只提供了在单一分区读取顺序的保证。另一个含义是我们的日志消息如何均匀分布在所有可用分区并且保持不同服务器间的负载均衡。

RoundRobinKeying策略(默认)

这个策略均匀地分配所有写日志消息到所有可用的kafka分区。这一策略可能会导致消费者客户端读取顺序异常。

HostNameKeying策略

这一策略使用HOSTNAME来划分发送到kafka的日志消息。这是有用的因为它确保所有通过这个host发布的日志消息将以正确的顺序交付给任何消费者。但是这种策略会导致日志不均匀的分布少量的主机(相对于分区的数量)。

ContextNameKeying(策略)

这一策略使用logbacks CONTEXT_NAME分区发送到kafka的日志消息。这是确保所有相同日志context的日志消息将可以被任何消费者以正确的顺序消费。但是这种策略会导致日志不均匀分布少量的主机(相对于分区的数量)。这个策略只适用于ILoggingEvents。

ThreadNameKeying策略

这一策略使用调用线程的名字作为分区键。这将确保所有相同线程的消息将可以被任何消费者以正确的顺序消费。但是这种策略会导致日志不均匀分布少量线程(名称)(相对于分区的数量)。这个策略只适用于ILoggingEvents。

LoggerNameKeying策略

*这个策略使用日志记录器的名字(logger name)作为分区键。这将确保所有使用相同logger name的消息将可以被任何消费者以正确的顺序消费。但是这种策略会导致日志不均匀分布与一些少量的不同的loggers上(相对于分区的数量)。这个策略只适用于ILoggingEvents。

自定义主键分区策略

如果你对以上分区策略都不满足你的需求,你也可以很容易的通过实现keyingStrategy类来实现自己的分区策略,比如:

package foo;
import com.github.danielwegener.logback.kafka.keying.KeyingStrategy;

public class LevelKeyingStrategy implements KeyingStrategy<ILoggingEvent> {
@Override
public byte[] createKey(ILoggingEvent e) {
return ByteBuffer.allocate(4).putInt(e.getLevel()).array();
}
}

参考大多数的logback组件,自定义分区策略可能需要实现如下接口

ch.qos.logback.core.spi.ContextAware and ch.qos.logback.core.spi.LifeCycle interfaces.

Q&A

如果想使用不同的kafka topic怎么办?

你只需要为这个topic增加一个appenders即可

如何让log的格式为logstash里面的json格式

使用 logstash-logback-encoder 里面的LogstashLayout 比如

<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<layout class="net.logstash.logback.layout.LogstashLayout" />
</encoder>