vlambda博客
学习文章列表

ELK + kafka + logback搭建游戏日志系统

ELK

本篇文章是该日志系统的最基础的搭建过程,所有中间件的版本都选择现在最新的发布版。elasticsearch,logstash,kibana,filebeat,metricbeat一系列Elastic中间件都使用7.5.1版本。
       在根目录下新建一个 /app目录,所有的中间件都是源码安装在该目录下,新建 /app/data目录和 /app/logs目录。
       在搭建ELK的时候大部分中间件需要设置后台运行,如果搭建集群和做负载均衡,这样的后台运行会更多建议使用 daemontools来管理。
       启动顺序:先启动kafka和elasticsearch,接着启动logstash,最后启动filebeat或metricbeat

elasticsearch

要求安装的版本必须和kibana的版本一致,新建目录 /app/logs/es
es的配置文件 elasticsearch.yml

 
   
   
 
  1. cluster.name: myNode

  2. node.name: master-node

  3. path.data: /app/data

  4. path.logs: /app/logs/es

  5. bootstrap.memory_lock: false

  6. network.host: 0.0.0.0

  7. http.port: 9200

  8. #transport.tcp.port: 9300

  9. http.cors.enabled: true

  10. http.cors.allow-origin: "*"

创建一个 output索引

 
   
   
 
  1. curl -XPUT 'http://localhost:9200/output'

       ES安装插件ingest-geoip(地理位置插件),因为es不能用root用户启动,所以在安装插件的时候切换到es的管理用户进行安装,需要提前配置该用户可以使用sudo命令(在启动filebeat的时候报错 Thismodulerequires anElasticsearchplugin that provides the geoip processor.)。

 
   
   
 
  1. sudo ./bin/elasticsearch-plugin install ingest-geoip

      上面这种安装插件的方法会出现一个问题。elasticsearch源码安装在安装的时候有两种包,一种是普通的tar.gz结尾的包,一种是tar.gz.sha结尾的包。第二种包需要经过 SHA加密算法验证。用上面的安装方法,elasticsearch在安装ingest-geoip的时候可能会出现错误 SHA-512mismatch网上有一种方案是说查看系统中是否有 shasum命令,没有就安装;这种方案试了一下没成功。
解决方案:下载[ingest-geoip 插件源码](https://artifacts.elastic.co/downloads/elasticsearch-plugins/ingest-geoip/ingest-geoip-6.6.2.zip "ingest-geoip")进行安装,elasticsearch使用zip文件安装插件下面是把安装包放在 /app/soft下进行安装

 
   
   
 
  1. sudo ./bin/elasticsearch-plugin install file:///app/soft/ingest-geoip.zip

logstash

(可以搭建多个logstash做负载均衡)
       logstash的配置文件需要自己定义,logstash有三个模块,输入,过滤,输出。由于logstash比较消耗资源,暂时考虑数据处理交给elasticsearch做 手动写一个配置文件,在 /etc目录下创建一个目录 elk,在 /etc/elk中创建一个 elk.conf文件。logstash的每个模块都有很多插件可用,可以查看Gemfile文件,也可以用命令查看

 
   
   
 
  1. ./bin/logstash-plugin list

在elk.conf中添加内容

 
   
   
 
  1. #输入模块

  2. input{

  3. #从kafka中获取数据

  4. kafka{

  5. #kafka集群主机和端口号,可配置多个

  6. bootstrap_servers => ["localhost:9092"]

  7. #指定获取获取的topic

  8. topics => ["topic-test"]

  9. }

  10. }

  11. filter{

  12. #kafka为了考虑效率的问题,将json对象作为一条数据发送,下面是将输入的json解析为一条一条的数据,并作为顶级属性展示

  13. json{

  14. source => "message"

  15. #配置target可以使json中的内容展示在jsonContent属性下

  16. #target => "jsonContent"

  17. }

  18. }

  19. #输出模块

  20. output{

  21. elasticsearch{

  22. #指定elasticsearch集群的主机和端口,可配置多个

  23. hosts => ["localhost:9200"]

  24. #指定输出到elasticsearch中的索引

  25. index => "output-%{+YYYY.MM.dd}"

  26. }

  27. }

       启动logstash, --config.test_and_index(可选)会测试配置文件的正确性,给出错误信息

 
   
   
 
  1. ./bin/logstash -f /etc/elk/elk.conf --config.test_and_exit

logback

       logback是springboot项目默认的日志插件,项目会自动导入logback-core,和logback-classic
在pom.xml中加入logback-kafka依赖

 
   
   
 
  1. <dependency>

  2. <groupId>com.github.danielwegener</groupId>

  3. <artifactId>logback-kafka-appender</artifactId>

  4. <version>0.2.0-RC1</version>

  5. </dependency>

在resource目录下创建一个logback.xml文件

 
   
   
 
  1. <?xml version="1.0" encoding="UTF-8" ?>

  2. <configuration debug="false" scan="true" scanPeriod="1 seconds">

  3. <!--配置日志输出到kafka中 -->

  4. <appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">

  5. <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">

  6. <pattern>%message %n</pattern>

  7. <charset>utf8</charset>

  8. </encoder>

  9. <!--指定kafka中的一个topic -->

  10. <topic>topic-test</topic>

  11. <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"></keyingStrategy>

  12. <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"></deliveryStrategy>

  13. <producerConfig>bootstrap.servers=47.105.193.24:9092</producerConfig>

  14. <producerConfig>retries=1</producerConfig>

  15. <producerConfig>acks=0</producerConfig>

  16. <producerConfig>linger.ms=1000</producerConfig>

  17. <producerConfig>block.on.buffer.full=false</producerConfig>

  18. <!--<appender-ref ref="kafka"></appender-ref>-->

  19. <!--<producerConfig>batch-size=16384</producerConfig>

  20. <producerConfig>buffer-memory=33554432</producerConfig>

  21. <producerConfig>properties.max.request.size=2097152</producerConfig>-->

  22. </appender>


  23. <!--配置日志在控制台输出 -->

  24. <appender name="console" class="ch.qos.logback.core.ConsoleAppender">

  25. <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">

  26. <pattern>%message %n</pattern>

  27. <charset>utf8</charset>

  28. </encoder>

  29. </appender>

  30. <!--指定哪些包下的日志需要输出 additivity判断是否需要像上级传递,如果是true当配置了root的时候,日志会输出两次 -->

  31. <logger name="com.ga" level="INFO" additivity="false">

  32. <!--指定才用那种模式输出,对应appender的name -->

  33. <appender-ref ref="kafka"></appender-ref>

  34. </logger>

  35. <root level="INFO">

  36. <appender-ref ref="kafka"></appender-ref>

  37. <appender-ref ref="console"></appender-ref>

  38. </root>

  39. </configuration>

kafka

       kafka安装(如果采用源码安装的方式,注意不要下载中间有src字样的压缩包,这个是源码包,不能运行)如果filebeat启动的时候连不上kafka,修改kafka配置文件 config/server.properties,添加 host.name, port, listeners, advertised.listeners,listeners中的ip路径为ifconfig命令中对应的路径, host.nameadvertised.listeners是真实路径

 
   
   
 
  1. broker.id=0

  2. host.name=47.105.193.24

  3. port=9092

  4. listeners=PLAINTEXT://172.31.252.88:9092

  5. advertised.listeners=PLAINTEXT://47.105.193.24:9092

kafka高版本自带zookeeper,不需要再安装zookeeper,启动zookeeper

 
   
   
 
  1. nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /app/logs/zookeeper.log &

启动kafka

 
   
   
 
  1. nohup ./bin/kafka-server-start.sh ./config/server.properties >> /app/logs/kafka.log &

查看当前服务器中的所有topic

 
   
   
 
  1. ./bin/kafka-topics.sh --zookeeper ip:2181 --list

创建topic --replication-factor3指定副本数 --partitions1指定分区数

./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic topic-test

发送消息

 
   
   
 
  1. ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test

消费消息

 
   
   
 
  1. ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-test --from-beginning

获取kafka中所有组

 
   
   
 
  1. ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

kibana

kibana用来展示es中的数据 kibana.yml

 
   
   
 
  1. server.port: 5601

  2. server.host: "0.0.0.0"

  3. elasticsearch.hosts: ["http://localhost:9200"]

  4. kibana.index: ".kibana"

运行kibana

 
   
   
 
  1. ./bin/kibana

在浏览器打开输入 127.0.0.1:5601就可以访问kibana了

filebeat

使用filebeat收集系统信息和数据库日志信息
查看filebeat中的所有模块

 
   
   
 
  1. ./filebeat modules list

启动filebeat中的mysql模块

 
   
   
 
  1. ./filebeat modules enable mysql

启动filebeat中的system模块,只要没有改过 /etc/rsyslog.conf文件中日志的输出路径就不需要修改system模块的配置文件

 
   
   
 
  1. ./filebeat modules enable system

禁用filebeat中的某个模块

 
   
   
 
  1. ./filebeat modules disable XXX

进入module.d目录,所有的模块的配置文件都在该目录下,修改mysql.yml文件

 
   
   
 
  1. - module: mysql

  2. #错误日志配置

  3. error:

  4. enabled: true

  5. #设置mysql的错误日志的路径

  6. var.paths: ["/app/data/err.log"]

  7. #慢查询日志配置

  8. slowlog:

  9. enabled: true

  10. #设置慢查询日志路径

  11. var.paths: ["/app/data/slow.log"]

修改filebeat.yml文件

 
   
   
 
  1. #filebeat输入

  2. filebeat.inputs:

  3. - type: log

  4. #如果不需要自定义输入日志文件就先设置为false

  5. enabled: false

  6. filebeat.config.modules:

  7. path: ${path.config}/modules.d/*.yml

  8. reload.enabled: false

  9. #使用kafka输出时注释掉elasticsearch和kibana相关的配置信息

  10. #setup.template.settings:

  11. #设置elasticsearch中的索引的分片数

  12. # index.number_of_shards: 1

  13. #配置输出为elasticsearch

  14. #setup.kibana

  15. output.kafka:

  16. #配置kafka集群节点

  17. hosts: ["localhost:9200"]

  18. topic: topic-test

启动filebeat, -e可以输出具体信息,不加-e可以在filebeat下的logs目录查看filebeat文件

 
   
   
 
  1. nohup ./filebeat -e >> /app/logs/filebeat.log &

metricbeat

       收集系统的状态信息,例如CPU,磁盘,内存等信息,也可以用来收集一些中间件的状态信息,操作和filebeat类似,metricbeat默认启动system模块
查看metricbeat模块列表

 
   
   
 
  1. ./metricbeat modules list

修改配置文件metricbeat.yml

 
   
   
 
  1. metricbeat.config.modules:

  2. path: ${path.config}/modules.d/*.yml

  3. reload.enabled: false

  4. output.kafka:

  5. hosts: ["47.105.193.24:9092"]

  6. topic: "topic-test"

  7. processors:

  8. - add_host_metadata: ~

  9. - add_cloud_metadata: ~

  10. - add_docker_metadata: ~

  11. - add_kubernetes_metadata: ~

启动metricbeat

 
   
   
 
  1. nohup ./metricbeat -e >> /app/logs/metricbeat.log &