vlambda博客
学习文章列表

zabbix整合ELK收集系统异常日志触发告警

MARK(参考)


logstash支持多种输出介质,比如说syslog,http,tcp,elasticsearch,kafka等,如果我们将logstash收集的日志输出到zabbix告警,就必须要用到logstash-output-zabbix插件,通过这个插件将logstash与zabbix整合,logstash收集到的数据过滤出错误信息的日志输出到zabbix中,最后通过zabbix告警机制触发;

[root@localhost ~]# /usr/local/logstash/bin/logstash-plugin install logstash-output-zabbix      #安装logstash-output-zabbix插件
Validating logstash
-output-zabbix
Installing logstash
-output-zabbix
Installation successful

环境案例需求:

通过读系统日志文件的监控,过滤掉日志信息中的异常关键词,如ERR,error,Failed,warning等信息,将这些带有异常关键词的异常日志信息过滤出来,然后输出到zabbix,通过zabbix告警机制实现触发告警;下面环境是filebeat作为采集端;输出到kafaka消息队列,最后由logsatsh拉取日志并过滤,输出到zabbix

【filebeat】日志采集端


filebeat.inputs:
- type: log
enabled:
true
paths:
- /var/log/secure
- /var/log/messages
- /var/log/cron
fields:
log_topic: system_log
processors:
- drop_fields:
fields: [
"beat", "input", "source", "offset", "prospector"] #这里在filebeat中直接去掉不需要的字段。
filebeat.config.modules:
path: ${path.config}
/modules.d/*.yml
reload.enabled: false
name: 192.168.37.147 #这是日志输出标识,表明日志来自哪个主机,后面再logstash会用到。
output.kafka:
enabled: true
hosts: ["192.168.37.147:9092", "192.168.37.148:9092", "192.168.37.149:9092"] #日志输出到kafka集群
version: "0.10"
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000

logging.level: debug

 

【Logstash端】

 

[root@localhost ~]# vim /usr/local/logstash/config/etc/system_log.conf


input {
kafka {
bootstrap_servers
=> "192.168.37.147:9092,192.168.37.148:9092,192.168.37.149:9092"
topics
=> ["system_log"]
codec
=> "json"
}
}

filter {
if [fields][log_topic] == "system_log" { #指定filebeat产生的日志主题
mutate {
add_field
=> [ "[zabbix_key]", "oslogs" ] #新增的字段,字段名是zabbix_key,值为oslogs。
add_field
=> [ "[zabbix_host]", "%{[host][name]}" ] #新增的字段,字段名是zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{[host][name]获取的就是日志数据的来源IP,这个来源IP在filebeat配置中的name选项进行定义。
}
}
grok {
match
=> { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:message_content}" } #这里通过grok对message字段的数据进行字段划分,这里将message字段划分了5个子字段。其中,message_content字段会在output中用到。
}
mutate { #这里是删除不需要的字段
remove_field
=> "@version"
remove_field
=> "message"
}
date { #这里是对日志输出中的日期字段进行转换,其中message_timestamp字段是默认输出的时间日期字段,将这个字段的值传给 @timestamp字段。
match
=> [ "message_timestamp","MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]
}
}

output {
if [message_content] =~ /(ERR|error|ERROR|Failed)/ { #定义在message_content字段中,需要过滤的关键字信息,也就是在message_content字段中出现给出的这些关键字,那么就将这些信息发送给zabbix。
zabbix {
zabbix_host
=> "[zabbix_host]" #这个zabbix_host将获取上面filter部分定义的字段变量%{[host][name]的值
zabbix_key
=> "[zabbix_key]" #这个zabbix_key将获取上面filter部分中给出的值
zabbix_server_host
=> "192.168.37.149" #这是指定zabbix server的IP地址
zabbix_server_port
=> "10051" #这是指定zabbix server的监听端口
zabbix_value
=> "message_content" #定要传给zabbix监控项item(oslogs)的值, zabbix_value默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面filter部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容。
}
}
}


 

[root@localhost logstash]# nohup /usr/local/logstash/bin/logstash -f config/etc/system_log.conf --path.data=/tmp/          #这里的--path.data指定词logstash进程的数据存储目录,用于在一个服务器上启动多个logstash进程环境

zabbix整合ELK收集系统异常日志触发告警

[测试]不确定事件配置文件是否正确,我们可以通过前台运行,输出stdout;验证filebeat收集的日志是够成功的过滤~

stdout {codec => rubydebug}    #我们将这条指令加入output输出端,前台运行测试,看是够能够过滤出来错误日志输出,效果如下~(ok之后记得将这条指令注释掉并后台运行哦)

# /usr/local/logstash/bin/logstash -f config/etc/system_log.conf --path.data=/tmp/

zabbix整合ELK收集系统异常日志触发告警

【zabbix-监控模板创建到 告警一触即发】

1.创建模板

将词模板链接到192.168.37.147上,创建的模板上的监控项就会在192.168.37.147上自动生效了,

zabbix整合ELK收集系统异常日志触发告警

 

2.创建应用集,点击应用集-创建应用集

zabbix整合ELK收集系统异常日志触发告警

3.创建监控项,点击监控项,创建监控项

zabbix整合ELK收集系统异常日志触发告警

4.告警触发,创建 触发器

zabbix整合ELK收集系统异常日志触发告警

zabbix整合ELK收集系统异常日志触发告警

 

将咱们创建的收集日志的模板连接到 需要收集日志的主机,验证告警触发效果

zabbix整合ELK收集系统异常日志触发告警

【模拟告警】

ssh连接192.168.37.147日志收集主机,故意输错密码,让系统产生错误日志,验证是够发送到zabbix端,下面便是我们过滤的错误日志信息,如“error”,"Failed"等~到目前为止,已经成功的收集到错误日志输出了~

 【总结】

首先我们来捋一下思路:

我们的架构基本不变,仍然是filebat收集日志推送到kibana消息队列,然后由Logstash前去拉取日志数据,经过处理最后中转出去;只不过是中转输出到zabbix上面而已;能够实现这个功能的,最核心的功臣就是Logsatsh的插件(logstash-output-zabbix);

在这里需要注意的是:filebeat收集端的IP一定要与zabbix监控主机的IP相对应,否则日志是过不来的~

分享一个小技巧:通过该命令可以测试定义在zabbix上的键值;出现以下输出变为正常~,如果failed非零值表示失败

[root@localhost zabbix_sender]# /usr/local/zabbix/bin/zabbix_sender -s 192.168.37.147 -z 192.168.37.149 -k "oslogs" -o 1
info from server: "processed: 1; failed: 0; total: 1; seconds spent: 0.000081"
sent: 1; skipped: 0; total: 1

详解:-s:指定本地agent端

-z:指定zabbix服务端

-k:指定键值