背景在基于elk的日志系统中,filebeat几乎是其中必不可少的一个组件,例外是使用性能较差的logstash file input插件或自己造个功能类似的轮子:)。 在使用和了解filebeat的过程中,笔者对其一些功能上的实现产生了疑问,诸如:
为了找到答案,笔者阅读了filebeat和部分libbeat的源码(read the fucking source code),本文即是对此过程的一次总结。一方面是方便日后回顾,另一方面也希望能解答大家对filebeat的一些疑惑。 本文主要内容包括filebeat基本介绍、源码解析两个部分,主要面向的是:想要了解filebeat实现、想改造或扩展filebeat功能或想参考filebeat开发自定义 filebeat基本介绍filebeat是一个开源的日志运输程序,属于beats家族中的一员,和其他beats一样都基于libbeat库实现。其中,libbeat是一个提供公共功能的库,功能包括: 配置解析、日志打印、事件处理和发送等。 对于任一种beats来说,主要逻辑都包含两个部分
其中第二点已由libbeat实现,因此各个beats实际只需要关心如何收集数据并生成事件后发送给libbeat的Publisher。beats和libeat的交互如下图所示: 具体到filebeat,它能采集数据的类型包括: log文件、标准输入、redis、udp和tcp包、容器日志和syslog,其中最常见的是使用log类型采集文件日志发送到Elasticsearch或Logstash。而后续的源码解析,也主要基于这种使用场景。 基于libbeat实现的filebeat,主要拥有以下几个特性
filebeat源码解析模块结构下图是filebeat及使用libbeat的一些主要模块,为笔者根据源码的理解所作。 1. filebeat主要模块
2. libbeat主要模块
filebeat目录组织├── autodiscover # 包含filebeat的autodiscover适配器(adapter),当autodiscover发现新容器时创建对应类型的输入 ├── beater # 包含与libbeat库交互相关的文件 ├── channel # 包含filebeat输出到pipeline相关的文件 ├── config # 包含filebeat配置结构和解析函数 ├── crawler # 包含Crawler结构和相关函数 ├── fileset # 包含module和fileset相关的结构 ├── harvester # 包含Harvester接口定义、Reader接口及实现等 ├── input # 包含所有输入类型的实现(比如: log, stdin, syslog) ├── inputsource # 在syslog输入类型中用于读取tcp或udp syslog ├── module # 包含各module和fileset配置 ├── modules.d # 包含各module对应的日志路径配置文件,用于修改默认路径 ├── processor # 用于从容器日志的事件字段source中提取容器id ├── prospector # 包含旧版本的输入结构Prospector,现已被Input取代 ├── registrar # 包含Registrar结构和方法 └── util # 包含beat事件和文件状态的通用结构Data └── ... 除了以上目录注释外,以下将介绍一些个人认为比较重要的文件的详细内容,读者可作为阅读源码时的一个参考。 /beater包含与libbeat库交互相关的文件:
/channelfilebeat输出(到pipeline)相关的文件
/input包含Input接口及各种输入类型的Input和Harvester实现
/harvester包含Harvester接口定义、Reader接口及实现等
重要数据结构beats通用事件结构( type Event struct { Timestamp time.Time // 收集日志时记录的时间戳,对应es文档中的@timestamp字段 Meta common.MapStr // meta信息,outpus可选的将其作为事件字段输出。比如输出为es且指定了pipeline时,其pipeline id就被包含在此字段中 Fields common.MapStr // 默认输出字段定义在field.yml,其他字段可以在通过fields配置项指定 Private interface{} // for beats private use } Crawler( // Crawler 负责抓取日志并发送到libbeat pipeline type Crawler struct { inputs map[uint64]*input.Runner // 包含所有输入的runner inputConfigs []*common.Config out channel.Factory wg sync.WaitGroup InputsFactory cfgfile.RunnerFactory ModulesFactory cfgfile.RunnerFactory modulesReloader *cfgfile.Reloader inputReloader *cfgfile.Reloader once bool beatVersion string beatDone chan struct{} } log类型Input( // Input contains the input and its config type Input struct { cfg *common.Config config config states *file.States harvesters *harvester.Registry // 包含Input所有Harvester outlet channel.Outleter // Input共享的Publisher client stateOutlet channel.Outleter done chan struct{} numHarvesters atomic.Uint32 meta map[string]string } log类型Harvester( type Harvester struct { id uuid.UUID config config source harvester.Source // the source being watched // shutdown handling done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex // internal harvester state state file.State states *file.States log *Log // file reader pipeline reader reader.Reader encodingFactory encoding.EncodingFactory encoding encoding.Encoding // event/state publishing outletFactory OutletFactory publishState func(*util.Data) bool onTerminate func() } Registrar( type Registrar struct { Channel chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indictes the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int } libbeat Pipeline( type Pipeline struct { beatInfo beat.Info logger *logp.Logger queue queue.Queue output *outputController observer observer eventer pipelineEventer // wait close support waitCloseMode WaitCloseMode waitCloseTimeout time.Duration waitCloser *waitCloser // pipeline ack ackMode pipelineACKMode ackActive atomic.Bool ackDone chan struct{} ackBuilder ackBuilder // pipelineEventsACK eventSema *sema processors pipelineProcessors } 执行逻辑filebeat启动filebeat启动流程如下图所示: 1. 执行root命令 在 对于任意一个beats来说,都需要有:1) 实现 beater接口定义(beat/beat.go): type Beater interface { // The main event loop. This method should block until signalled to stop by an // invocation of the Stop() method. Run(b *Beat) error // Stop is invoked to signal that the Run method should finish its execution. // It will be invoked at most once. Stop() } 2. 初始化和运行Filebeat
3. Filebeat运行
日志收集从收集日志、到发送事件到publisher,其数据流如下图所示:
以log类型为例
reader关于log类型的reader处理链,如下图所示:
Reader包括:
除了Line Reader外,这些reader都实现了 type Reader interface { Next() (Message, error) } Reader通过内部包含 各Reader的Next方法的通用形式像是这样: func (r *SomeReader) Next() (Message, error) { message, err := r.reader.Next() if err != nil { return message, err } // do some processing... return message, nil } 事件处理和队列在Crawler收集日志并转换成事件后,其就会通过调用Publisher对应client的Publish接口将事件送到Publisher,后续的处理流程也都将由libbeat完成,事件的流转如下图所示: 事件处理器processor在harvester调用 通过官方文档了解到,processor包含两种:在Input内定义作为局部(Input独享)的processor,其只对该Input产生的事件生效;在顶层配置中定义作为全局processor,其对全部事件生效。 其对应的代码实现方式是: filebeat在使用libbeat pipeline的 任一Processor都实现了Processor接口:Run函数包含处理逻辑,String返回Processor名。 type Processor interface { Run(event *beat.Event) (*beat.Event, error) String() string } 关于支持的processors及其使用,读者可以参考官方文档Filter and enhance the exported data这一小节 队列queue在事件经过处理器处理后,下一步将被发往Publisher的队列。在 队列对象被包含在 type Queue interface { io.Closer BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer Consumer() Consumer } 主要的,Producer方法生成Producer对象,用于向队列中push事件;Consumer方法生成Consumer对象,用于从队列中取出事件。 type Producer interface { Publish(event publisher.Event) bool TryPublish(event publisher.Event) bool Cancel() int } type Consumer interface { Get(sz int) (Batch, error) Close() error } 在配置中没有指定队列配置时,默认使用了 Broker结构(memqueue在代码中实际对应的结构名是Broker): type Broker struct { done chan struct{} logger logger bufSize int // buf brokerBuffer // minEvents int // idleTimeout time.Duration // api channels events chan pushRequest requests chan getRequest pubCancel chan producerCancelRequest // internal channels acks chan int scheduledACKs chan chanList eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup waitOnClose bool } 根据是否需要ack分为forgetfullProducer和ackProducer两种producer: type forgetfullProducer struct { broker *Broker openState openState } type ackProducer struct { broker *Broker cancel bool seq uint32 state produceState openState openState } consumer结构: type consumer struct { broker *Broker resp chan getResponse done chan struct{} closed atomic.Bool } 三者的运作方式如下图所示:
事件发送在队列消费者将事件放入output工作队列后,事件将在 其中, 此时如果发送失败会发生什么呢? 在 ack机制和registrar记录文件状态在事件发送成功后, 其ack的数据流如下图所示:
通过ack机制和registrar模块,filebeat实现了对已发送成功事件对应文件状态的记录,这使它即使在程序crash后重启的情况下也能从之前的文件位置恢复并继续处理,保证了日志数据(事件)被至少发送一次。 总结至此,本篇文章关于filebeat源码解析的内容已经结束。 从整体看,filebeat的代码没有包含复杂的算法逻辑或底层实现,但其整体代码结构还是比较清晰的,即使对于不需要参考filebeat特性实现去开发自定义beats的读者来说,仍属于值得一读的源码。 参考
|
|