ELK + kafka + logback搭建游戏日志系统
ELK
/app
目录,所有的中间件都是源码安装在该目录下,新建
/app/data
目录和
/app/logs
目录。
在搭建ELK的时候大部分中间件需要设置后台运行,如果搭建集群和做负载均衡,这样的后台运行会更多建议使用
daemontools
来管理。
启动顺序:先启动kafka和elasticsearch,接着启动logstash,最后启动filebeat或metricbeat
elasticsearch
要求安装的版本必须和kibana的版本一致,新建目录 /app/logs/es
es的配置文件 elasticsearch.yml
cluster.name: myNode
node.name: master-node
path.data: /app/data
path.logs: /app/logs/es
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
#transport.tcp.port: 9300
http.cors.enabled: true
http.cors.allow-origin: "*"
创建一个 output
索引
curl -XPUT 'http://localhost:9200/output'
ES安装插件ingest-geoip(地理位置插件),因为es不能用root用户启动,所以在安装插件的时候切换到es的管理用户进行安装,需要提前配置该用户可以使用sudo命令(在启动filebeat的时候报错 Thismodulerequires anElasticsearchplugin that provides the geoip processor.
)。
sudo ./bin/elasticsearch-plugin install ingest-geoip
上面这种安装插件的方法会出现一个问题。elasticsearch源码安装在安装的时候有两种包,一种是普通的tar.gz结尾的包,一种是tar.gz.sha结尾的包。第二种包需要经过 SHA
加密算法验证。用上面的安装方法,elasticsearch在安装ingest-geoip的时候可能会出现错误 SHA-512mismatch
网上有一种方案是说查看系统中是否有 shasum
命令,没有就安装;这种方案试了一下没成功。
解决方案:下载[ingest-geoip 插件源码](https://artifacts.elastic.co/downloads/elasticsearch-plugins/ingest-geoip/ingest-geoip-6.6.2.zip "ingest-geoip")进行安装,elasticsearch使用zip文件安装插件下面是把安装包放在 /app/soft
下进行安装
sudo ./bin/elasticsearch-plugin install file:///app/soft/ingest-geoip.zip
logstash
(可以搭建多个logstash做负载均衡)
logstash的配置文件需要自己定义,logstash有三个模块,输入,过滤,输出。由于logstash比较消耗资源,暂时考虑数据处理交给elasticsearch做 手动写一个配置文件,在 /etc
目录下创建一个目录 elk
,在 /etc/elk
中创建一个 elk.conf
文件。logstash的每个模块都有很多插件可用,可以查看Gemfile文件,也可以用命令查看
./bin/logstash-plugin list
在elk.conf中添加内容
#输入模块
input{
#从kafka中获取数据
kafka{
#kafka集群主机和端口号,可配置多个
bootstrap_servers => ["localhost:9092"]
#指定获取获取的topic
topics => ["topic-test"]
}
}
filter{
#kafka为了考虑效率的问题,将json对象作为一条数据发送,下面是将输入的json解析为一条一条的数据,并作为顶级属性展示
json{
source => "message"
#配置target可以使json中的内容展示在jsonContent属性下
#target => "jsonContent"
}
}
#输出模块
output{
elasticsearch{
#指定elasticsearch集群的主机和端口,可配置多个
hosts => ["localhost:9200"]
#指定输出到elasticsearch中的索引
index => "output-%{+YYYY.MM.dd}"
}
}
启动logstash, --config.test_and_index
(可选)会测试配置文件的正确性,给出错误信息
./bin/logstash -f /etc/elk/elk.conf --config.test_and_exit
logback
logback是springboot项目默认的日志插件,项目会自动导入logback-core,和logback-classic
在pom.xml中加入logback-kafka依赖
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC1</version>
</dependency>
在resource目录下创建一个logback.xml文件
<?xml version="1.0" encoding="UTF-8" ?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<!--配置日志输出到kafka中 -->
<appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%message %n</pattern>
<charset>utf8</charset>
</encoder>
<!--指定kafka中的一个topic -->
<topic>topic-test</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"></keyingStrategy>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"></deliveryStrategy>
<producerConfig>bootstrap.servers=47.105.193.24:9092</producerConfig>
<producerConfig>retries=1</producerConfig>
<producerConfig>acks=0</producerConfig>
<producerConfig>linger.ms=1000</producerConfig>
<producerConfig>block.on.buffer.full=false</producerConfig>
<!--<appender-ref ref="kafka"></appender-ref>-->
<!--<producerConfig>batch-size=16384</producerConfig>
<producerConfig>buffer-memory=33554432</producerConfig>
<producerConfig>properties.max.request.size=2097152</producerConfig>-->
</appender>
<!--配置日志在控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%message %n</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!--指定哪些包下的日志需要输出 additivity判断是否需要像上级传递,如果是true当配置了root的时候,日志会输出两次 -->
<logger name="com.ga" level="INFO" additivity="false">
<!--指定才用那种模式输出,对应appender的name -->
<appender-ref ref="kafka"></appender-ref>
</logger>
<root level="INFO">
<appender-ref ref="kafka"></appender-ref>
<appender-ref ref="console"></appender-ref>
</root>
</configuration>
kafka
kafka安装(如果采用源码安装的方式,注意不要下载中间有src字样的压缩包,这个是源码包,不能运行)如果filebeat启动的时候连不上kafka,修改kafka配置文件 config/server.properties
,添加 host.name
, port
, listeners
, advertised.listeners
,listeners中的ip路径为ifconfig命令中对应的路径, host.name
和 advertised.listeners
是真实路径
broker.id=0
host.name=47.105.193.24
port=9092
listeners=PLAINTEXT://172.31.252.88:9092
advertised.listeners=PLAINTEXT://47.105.193.24:9092
kafka高版本自带zookeeper,不需要再安装zookeeper,启动zookeeper
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /app/logs/zookeeper.log &
启动kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties >> /app/logs/kafka.log &
查看当前服务器中的所有topic
./bin/kafka-topics.sh --zookeeper ip:2181 --list
创建topic --replication-factor3
指定副本数 --partitions1
指定分区数
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic topic-test
发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test
消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-test --from-beginning
获取kafka中所有组
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
kibana
kibana用来展示es中的数据 kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"
运行kibana
./bin/kibana
在浏览器打开输入 127.0.0.1:5601
就可以访问kibana了
filebeat
使用filebeat收集系统信息和数据库日志信息
查看filebeat中的所有模块
./filebeat modules list
启动filebeat中的mysql模块
./filebeat modules enable mysql
启动filebeat中的system模块,只要没有改过 /etc/rsyslog.conf
文件中日志的输出路径就不需要修改system模块的配置文件
./filebeat modules enable system
禁用filebeat中的某个模块
./filebeat modules disable XXX
进入module.d目录,所有的模块的配置文件都在该目录下,修改mysql.yml文件
- module: mysql
#错误日志配置
error:
enabled: true
#设置mysql的错误日志的路径
var.paths: ["/app/data/err.log"]
#慢查询日志配置
slowlog:
enabled: true
#设置慢查询日志路径
var.paths: ["/app/data/slow.log"]
修改filebeat.yml文件
#filebeat输入
filebeat.inputs:
- type: log
#如果不需要自定义输入日志文件就先设置为false
enabled: false
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
#使用kafka输出时注释掉elasticsearch和kibana相关的配置信息
#setup.template.settings:
#设置elasticsearch中的索引的分片数
# index.number_of_shards: 1
#配置输出为elasticsearch
#setup.kibana
output.kafka:
#配置kafka集群节点
hosts: ["localhost:9200"]
topic: topic-test
启动filebeat, -e
可以输出具体信息,不加-e可以在filebeat下的logs目录查看filebeat文件
nohup ./filebeat -e >> /app/logs/filebeat.log &
metricbeat
收集系统的状态信息,例如CPU,磁盘,内存等信息,也可以用来收集一些中间件的状态信息,操作和filebeat类似,metricbeat默认启动system模块
查看metricbeat模块列表
./metricbeat modules list
修改配置文件metricbeat.yml
metricbeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
output.kafka:
hosts: ["47.105.193.24:9092"]
topic: "topic-test"
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
启动metricbeat
nohup ./metricbeat -e >> /app/logs/metricbeat.log &