分享

使用Fluentd进行简单流处理

 RealPython 2021-03-14

在某些日志采集场景中,我们需要对数据流进行一些转换。比如,我们可能需要从日志记录中提取某些字段以进行错误告警,或向日志记录中插入新的字段用以后续的分析。

本文简单介绍一下使用Fluentd进行数据操作的技术细节。

  1. 根据日志字段取值来过滤事件
    谈到过滤,我们通常会想到正则表达式,在linux中通常使用grep来进行文本查找和过滤。Fluentd内置了filter_grep过滤插件,可对数据流进行正则过滤。

    假设我们正在使用一个web服务,比如Apache,我们需要对其访问日志进行监控。由输入插件产生的事件类似如下结构:
    { "host": "192.168.1.1", "method": "GET", "path": "/index.html", "code": 200, "size": 2344, "referer": null}

    这其中的code字段表示用户请求状态,我们可能对状态为2xx的请求不太关心,这样就可以将这类事件过滤掉,专门处理用户请求可能发生的异常情况。

    我们可以通过在Fluentd中增加如下<filter>配置来实现事件过滤。

    <filter apache.**> @type grep <exclude> key code pattern ^2\d\d$ </exclude></filter>

    使用grep过滤插件,通过key指定code字段为过滤字段,通过pattern匹配code值为2xx的事件,将这些事件排除(exclude)掉。

    filter_grep还可以对多个字段进行过滤。比如,保留状态码为5xx的事件,但过滤掉url中以/test/开头的请求。如下所示:

    <filter apache.**> @type grep <regexp> key code pattern ^5\d\d$ </regexp> <exclude> key path pattern ^/test/ </exclude></filter>
  2. 向事件中插入定制字段
    我们可以在某个处理阶段向日志记录中插入一些字段,供后续使用。这可以通过Fluentd内置的filter_record_transformer过滤器插件来实现。
    假设我们是以集群的方式来部署web服务的,我们可能需要标记用户请求是由哪台服务器来处理的。
    在Fluentd中进行如下配置即可实现此类需求:
    <filter apache.**> @type record_transformer <record> server "${hostname}" </record></filter>

    这里,record_transformer插件向事件record中插入了一个server字段,其值为web服务器的主机名。新的日志record就更新为如下格式:

    { "host": "192.168.1.1", "method": "GET", "path": "/index.html", "code": 200, "size": 2344, "referer": null, "server": "app1"}

    filter_record_transformer除了可以直接插入预定义的一些变量,如${hostname},还可以插入其他变量或者使用ruby表达式来计算字段值。具体可参考此插件的使用说明,我们也会在后续的插件系列中进行介绍。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多