分享

干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

 新用户248587GZ 2022-02-14

  ELK 大家应该比较熟悉了,zabbix 应该也不陌生,那么将 ELK 和 zabbix 放到一起的话,可能大家就会有疑问了?这两个放到一起是什么目的呢,听我细细道来

  ELK 是一套日志收集套件,它其实是由 Elasticsearch、Logstash 和 Kibana 三个软件组成,通过 ELK 可以收集系统日志、网站日志、应用系统日志等各种日志数据,并且还可以对日志进行过滤、清洗,然后进行集中存放并可用于实时检索、分析。这是 ELK 的基础功能。

  但是有些时候,我们希望在收集日志的时候,能够将日志中的异常信息(警告、错误、失败等信息)及时地提取出来,因为日志中的异常信息意味着操作系统、应用程序可能存在故障,如果能将日志中的故障信息及时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以避免很多故障的发生。

  那么如何才能做到将 ELK 收集的日志数据中出现的异常信息及时地告知运维人员呢,这就需要用到 zabbix 了,ELK(更确切的说应该是 logstash)可以实时地读取日志的内容,并且还可以过滤日志信息,通过 ELK 的读取和过滤功能,就可以将日志中的一些异常关键字(error、failed、OutOff、Warning)过滤出来,然后通过 logstash 的 zabbix 插件将这个错误日志信息发送给 zabbix,那么 zabbix 在接收到这个数据后,结合自身的机制,然后发起告警动作,这样就实现了日志异常 zabbix 实时告警的功能了。

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  Logstash 支持多种输出介质,比如 syslog、HTTP、TCP、elasticsearch、kafka 等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了 logstash-output-zabbix 这个插件,此插件可以将 Logstash 与 zabbix 进行整合,也就是将 Logstash 收集到的数据进行过滤,将有错误标识的日志输出到 zabbix 中,最后通过 zabbix 的告警机制进行触发、告警。

  logstash-output-zabbix 是一个社区维护的插件,它默认没有在 Logstash 中安装,但是安装起来也很容易,直接在 logstash 中运行如下命令即可:

  [root@elk-master bin]# /usr/share/logstash/bin/logstash-plugin install logstash-output-zabbix

  其中,/usr/share/logstash/bin/是 Logstash 的 yum 默认安装目录,如果是源码安装的需要自己修改

  此外,logstash-plugin 命令还有多种用法,我们来看一下:

  列出所有已安装的插件

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin list

  列出已安装的插件及版本信息

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin list --verbose

  列出包含 namefragment 所有已安装插件

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin list "*namefragment*"

  列出特定组的所有已安装插件( input,filter,codec,output)

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin --group output

  要安装某个插件,例如安装 kafka 插件,可执行如下命令:

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin install logstash-output-kafka

  要使用此命令安装插件,需要你的电脑可以访问互联网。此插件安装方法,会检索托管在公共存储库(RubyGems)上的插件,然后下载到本地机器并在 Logstash 安装之上进行自动安装

  每个插件有自己的发布周期和版本更新,这些更新通常是独立于 Logstash 的发布周期的。因此,有时候需要单独更新插件,可以使用 update 子命令获得最新版本的插件

  更新所有已安装的插件

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin update

  仅更新指定的插件

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin update logstash-output-kafka

  如果需要从 Logstash 插件中删除插件,可执行如下命令:

  [root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin remove logstash-output-kafka

  这样就删除了 logstash-output-kafka 插件

  logstash-output-zabbix 安装好之后,就可以在 logstash 配置文件中使用了,下面是一个 logstash-output-zabbix 使用的例子:

  zabbix {

  zabbix_host=>"[@metadata][zabbix_host]"zabbix_key=>"[@metadata][zabbix_key]"zabbix_server_host=>"x.x.x.x"zabbix_server_port=>"xxxx"

  zabbix_value="xxxx"

  }

  其中:

  zabbix_host:表示 Zabbix 主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必需的设置,没有默认值。

  zabbix_key:表示 Zabbix 项目键的值,也就是 zabbix 中的 item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。

  zabbix_server_host:表示 Zabbix 服务器的 IP 或可解析主机名,默认值是 “localhost”,需要设置为 zabbix server 服务器所在的地址。

  zabbix_server_port:表示 Zabbix 服务器开启的监听端口,默认值是 10051。

  zabbix_value:表示要发送给 zabbix item 监控项的值对应的字段名称,默认值是 “message”,也就是将”message”字段的内容发送给上面 zabbix_key 定义的 zabbix item 监控项,当然也可以指定一个具体的字段内容发送给 zabbix item 监控项。

  这里我们以 logstash 收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用 zabbix 告警的流程,来看看如何配置 logstash 实现 zabbix 告警。

  先说明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,例如 ERROR、Failed、WARNING 等,将日志中这些信息过滤出来,然后发送到 zabbix 上,最后借助 zabbix 的报警功能实现对系统日志中有上述关键字的告警。

  对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,例如对 http 系统,可能需要过滤 500、403、503 等这些错误码,对于 java 相关的系统,可能需要过滤 OutOfMemoryError、PermGen、Java heap 等关键字。在某些业务系统的日志输出中,可能还有一些自定义的错误信息,那么这些也需要作为过滤关键字来使用。

  接下来就是创建一个 logstash 事件配置文件,这里将配置文件分成三个部分来介绍,首先是 input 部分,内容如下:

  input {

  file {

  path="/var/log/secure"type="system"

  start_position="beginning"

  }

  }

  input 部分是从/var/log/secure 文件中读取数据,start_position 表示从 secure 文件开头读取内容。接着是 filter 部分,内容如下:

  filter {

  grok {

  match=> { "message"=> "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:message_content}" }#这里通过 grok 对 message 字段的数据进行字段划分,这里将 message 字段划分了 5 个子字段。其中,message_content 字段会在 output 中用到

  }

  mutate {

  add_field=> ["[zabbix_key]","oslogs"] #新增的字段,字段名是 zabbix_key,值为 oslogsadd_field=> ["[zabbix_host]","Zabbix server"] #新增的字段,字段名是 zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{host}获取的就是日志数据的主机名,这个主机名与 zabbix web 中“主机名称”需要保持一致 remove_field=> ["@version","message"] #这里是删除不需要的字段

  }

  date { #这里是对日志输出中的日期字段进行转换,其中 message_timestamp 字段是默认输出的时间日期字段,将这个字段的值传给 @timestamp 字段 match=> [ "message_timestamp","MMM d HH:mm:ss","MMM dd HH:mm:ss","ISO8601"]

  }

  }

  filter 部分是个重点,在这个部分中,重点关注的是 message_timestamp 字段、message_content 字段

  最后是 output 部分,内容如下

  output {

  elasticsearch{ #这部分是为把日志次发送到 es 中便于 kibana 中进行展示,如果没有部署的可以去掉 index=> "oslogs-%{+YYYY.MM.dd}"hosts=> ["192.168.73.133:9200"]

  user=> "elastic"password=> "Goldwind@2019"sniffing=> false

  }

  if [message_content]=~ /(ERR|error|ERROR|Failed)/ { #定义在 message_content 字段中,需要过滤的关键字信息,也就是在 message_content 字段中出现给出的这些关键字,那么就将这些信息发送给 zabbixzabbix {

  zabbix_host=>"[zabbix_host]" #这个 zabbix_host 将获取上面 filter 部分定义的字段变量%{host}的值 zabbix_key=>"[zabbix_key]" #这个 zabbix_key 将获取上面 filter 部分中给出的值 zabbix_server_host=>"192.168.73.133" #这是指定 zabbix server 的 IP 地址 zabbix_server_port=>"10051" #这是指定 zabbix server 的监听端口 zabbix_value=>"message_content" #这个很重要,指定要传给 zabbix 监控项 item(oslogs)的值, zabbix_value 默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面 filter 部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容

  }

  }

  #stdout{ codec=> rubydebug }

  } #这里是开启调试模式,当第一次配置的时候,建议开启,这样过滤后的日志信息直接输出的屏幕,方便进行调试,调试成功后,即可关闭

  将上面三部分内容合并到一个文件 file_to_zabbix.conf 中,然后启动 logstash 服务:

  [root@logstashserver ~]#cd /usr/local/logstash

  [root@logstashserver logstash]#nohup bin/logstash -f config/file_to_zabbix.conf --path.data /tmp/ &

  这里的—path.data 是指定此 logstash 进程的办统招文凭数据存储目录,用于在一个服务器上启动多个 logstash 进程的环境中

  登录 zabbix web 平台,选择配置—->模板—->创建模板,名称定为 logstash-output-zabbix,如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  接着,在此模块下创建一个应用集,点击应用集——->创建应用集,如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  然后,在此模块下创建一个监控项,点击监控项——->创建监控项,如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  到此为止,zabbix 监控 logstash 的日志数据配置完成

  这里我们以客户端 192.168.73.135 主机为例,也就是监控 192.168.73.135 主机上的系统日志数据,当发现日志异常就进行告警

  在上面创建了一个模板和监控项,接着还需要将此模板链接到 192.168.73.135 主机上,选择“配置”—-“主机”,然后选中 192.168.73.135 主机,选择“模板”标签,将上面创建的模板加入到 192.168.73.135 主机上,如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  这样,上面创建的监控项在 192.168.73.135 主机上就自动生效了

  下面我们模拟一个故障,在任意主机通过 ssh 登录 192.168.73.135 主机,然后输入一个错误密码,让系统的/var/log/secure 文件产生错误日志,然后看看 logstash 是否能够过滤到,是否能够发送到 zabbix 中

  首先让系统文件/var/log/secure 产生类似如下内容:

  [root@k8s-node03 ~]# tail /var/log/secure

  Dec 25 22:57:44 k8s-node03 sshd[49803]: Failed password for root from 192.168.73.1 port 60256 ssh2

  Dec 25 22:57:48 k8s-node03 sshd[49803]: error: Received disconnect from 192.168.73.1 port 60256:13: The user canceled authentication. [preauth]

  Dec 25 22:57:48 k8s-node03 sshd[49803]: Disconnected from 192.168.73.1 port 60256 [preauth]

  Dec 25 23:02:40 k8s-node03 sshd[49908]: reverse mapping checking getaddrinfo for bogon [192.168.73.1] failed - POSSIBLE BREAK-IN ATTEMPT!

  Dec 25 23:02:43 k8s-node03 unix_chkpwd[49911]: password check failed for user (root)

  Dec 25 23:02:43 k8s-node03 sshd[49908]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.73.1 user=root

  Dec 25 23:02:43 k8s-node03 sshd[49908]: pam_succeed_if(sshd:auth): requirement "uid >=1000" not met by user "root"

  Dec 25 23:02:44 k8s-node03 sshd[49908]: Failed password for root from 192.168.73.1 port 60757 ssh2

  Dec 25 23:02:46 k8s-node03 sshd[49908]: error: Received disconnect from 192.168.73.1 port 60757:13: The user canceled authentication. [preauth]

  Dec 25 23:02:46 k8s-node03 sshd[49908]: Disconnected from 192.168.73.1 port 60757 [preauth]

  这里面有我们要过滤的关键字 Failed,因此 logstash 会将此内容过滤出来,发送到 zabbix 上

  接着,登录 zabbix web 平台,点击监测中——->最新数据,如果 zabbix 能够接收到日志,就可以看到下图的最新数据:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  点击历史记录,可以查看详细内容,如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  可以看到,红框中的内容就是在 logstash 中定义的 message_content 字段的内容,到这里为止,zabbix 已经可以收到 logstash 的发送过来的数据了,但是要实现报警,还需要在 zabbix 中创建一个触发器,进入配置——->模板,选择 logstash-output-zabbix 这个模板,然后点击上面的触发器,继续点击右上角的创建触发器,如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  这里注意触发器创建中,表达式的写法,这里触发器的触发条件是:

  如果接收到 logstash 发送过来的数据,就进行告警,也就是说接收到的数据,如果长度大于 0 就告警

  触发器配置完成后,如果配置正常,就会进行告警了,使用的是钉钉进行图文告警,告警内容如下图所示:

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  Kibana 上查看安全日志

  干货!运维Zabbix 与 ELK 整合实现对安全日志数据的实时监控告警

  首先我们来捋一下思路:

  我们的架构基本不变,仍然是 filebat 收集日志推送到 kibana 消息队列,然后由 Logstash 前去拉取日志数据,经过处理最后中转出去;只不过是中转输出到 zabbix 上面而已;能够实现这个功能的,最核心的功臣就是 Logsatsh 的插件(logstash-output-zabbix);

  在这里需要注意的是:filebeat 收集端的 IP 一定要与 zabbix 监控主机的 IP 相对应,否则日志是过不来的~

  分享一个小技巧:通过该命令可以测试定义在 zabbix 上的键值;出现以下输出变为正常~,如果 failed 非零值表示失败

  #在 server 端使用 zabbix_sender 向 zabbix 发送测试

  [root@localhost zabbix_sender]# /usr/local/zabbix/bin/zabbix_sender -s 192.168.73.135 -z 192.168.73.133 -k "oslogs" -o 1

  info from server: "processed: 1; failed: 0; total: 1; seconds spent: 0.000081"

  sent: 1; skipped: 0; total: 1

  详解:-s:指定本地 agent 端

  -z:指定 zabbix 服务端

  -k:指定键值

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多