分享

flume interceptors flume拦截器

 行者花雕 2022-12-24 发布于北京

flume用户自定义拦截器.创建flume-demo的maven项目.

创建项目文件POM.xml.

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>
package com.kpwong.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class CustomInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    //单个事件拦截
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String body = new String( event.getBody());

        if (body.contains("hello")){
            headers.put("topic","letter");
        }
        else
        {
            headers.put("topic","number");
        }

        return event;
    }

    //多个事件拦截
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包项目jar文件。拷贝文件到/flume/lib目录下

 

 

 配置conf文件.准备三台机器(hadoop202,hadoop203,hadoop204)

在hadoop202上。配置flume2.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

#channel interceptors
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop203
a2.sinks.k1.port = 4141

a2.sinks.k2.type=avro
a2.sinks.k2.hostname = hadoop204
a2.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

 拦截器配置代码:

a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2
hadoop203上配置flume3.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop203
a3.sources.r1.port = 4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
hadoop204上配置:
a4.sources = r1
a4.sinks = k1
a4.channels = c1
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop204
a4.sources.r1.port = 4142
a4.sinks.k1.type = logger
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1

在hadoop204上运行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf  -n a4 -Dflume.root.logger=INFO,console

在hadoop203上运行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console

在hadoop202上运行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2

 nc localhost 44444

实验结果:

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约