分享

Kafka基本原理和java简单使用教程

 关平藏书 2018-09-04

Apache Kafka学习(一):Kafka基本原理

1、什么是Kafka?

Kafka是一个使用Scala编写的消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。

Kafka是一种分布式的,基于发布/订阅的消息系统。

Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。

2、kafka的特性

(1)以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。

(2)高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。

(3)支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序存储和传输。

(4)同时支持离线数据处理(Offline)和实时数据处理(Online)。

(5)Scale out:支持在线水平扩展。无需停机即可扩展机器。

(6)支持定期删除数据机制。可以按照时间段来删除,也可以按照文档大小来删除。

(7)Consumer采用pull的方式消费数据,消费状态由Consumer控制,减轻Broker负担。

3、Kafka架构

(1)Broker:和RabbitMQ中的Broker概念类似。一个kafka服务器就是一个Broker,而一个kafka集群包含一个或多个Broker。Broker会持久化数据到相应的Partition中,不会有cache压力。

(2)Topic:主题。每条消息都有一个类别,这个类别就叫做Topic。Kafka的Topic可以理解为RabbitMQ的Queue消息队列,相同类别的消息被发送到同一个Topic中,然后再被此Topic的Consumer消费。Topic是逻辑上的概念,而物理上的实现就是Partition。

(3)Partition:分区。分区是物理上的概念,每个Topic包含一个或者多个Partition,每个Partition都是一个有序队列。发送给Topic的消息经过分区算法(可以自定义),决定消息存储在哪一个Partition当中。每一条数据都会被分配一个有序id:Offset。注意:kafka只保证按一个partition中的顺序将消息发给Consumer,不保证一个Topic的整体(多个partition间)的顺序。

(4)Replication:备份。Replication是基于Partition而不是Topic的。每个Partition都有自己的备份,且分布在不同的Broker上。

(5)Offset:偏移量。kafka的存储文件都是用offset来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.log的文件即可。当然the first offset就是00000000000.log。注意:每个Partition中的Offset都是各不影响的从0开始的有序数列。

(6)Producer:消息生产者。

(7)Consumer:消息消费者。Consumer采用pull的方式从Broker获取消息,并且Consumer要维护消费状态,因此Kafaka系统中,业务重心一般都在Consumer身上,而不像RabbitMQ那样Broker做了大部分的事情。

(8)Consumer Group:消费组。每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。每个Topic可以被多个Group订阅,每个Group中可以有多个Consumer。发送到Topic的一条消息会被每个Group中的一个Consumer消费,多个Consumer之间将交错消费整个Topic的消息,实现负载均衡。

(9)Record:消息。每一个消息由一个Key、一个Value和一个时间戳构成。

注意:同一个partition内的消息只能被同一个组中的一个consumer消费,不过一个Consumer可以同时消费多个partitions中的消息.。当消费者数量多于partition的数量时,多余的消费者将会空闲。也就是说如果只有一个partition你在同一组启动多少个consumer都没用,partition的数量决定了此topic在同一组中被可被均衡的程度,例如partition=4,则可在同一组中被最多4个consumer均衡消费。

\

Kafka内部结构图(图片源于网络)

\

Kafka拓扑结构图(图片源于网络)

4、Topic、Partition文件存储

4.1、Topic与Partition的关系

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。

4.2、Partition文件存储的特点

(1)每个partition目录相当于一个巨型文件被平均分配到多个大小相等segment数据文件中。但每个segment file消息数量不一定相等,这种特性方便old segment file快速被删除。

(2)每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

(3)segment file组成:由2大部分组成,分别为index file(后缀“.index”)和data file(后缀“.log”),此2个文件一一对应,成对出现。

(4)segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

\

Segment file结构图(图片来源于网络)

以上述图2中一对segment file文件为例,说明segment中index和log文件对应关系物理结构如下:

\

Index和log文件对应图(图片来源于网络)

其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partition表示第368772个message)、以及该消息的物理偏移地址为497。

4.3、在partition中如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

(1)第一步查找segment file

上图为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始消息为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始消息为737338=737337 + 1,只要根据offset 进行二分查找文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|log。

(2)第二步通过segment file查找message

通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

4.4、Kafka集群中Partition分布规则

首先来看一条在Linux下创建topic的命令:

bin/kafka-topics.sh --create --zookeeper ip1:2181,ip2:2181,ip3:2181,ip4:2181 --replication-factor 2 --partitions 4 --topic test

此命令的意思是在四个Broker的kafka集群上创建一个名为test的Topic,并且有4个分区2个备份(此处比较容易搞混,2个Replication表示Leader和Follower一共加起来有2个)。此时在四台机器上面就有8个Partition,如图所示。

\

Kafka集群Partition分布图1(图片来源于网络)

当集群中新增2节点,Partition增加到6个时分布情况如下:

\

Kafka集群Partition分布图2(图片来源于网络)

在Kafka集群中,每个Broker都有均等分配Leader Partition机会。

上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。每个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。

副本分配算法

(1)将所有n个Broker和待分配的i个Partition排序。

(2)将第i个Partition分配到第(i mod n)个Broker上。

(3)将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上

例如图2中的第三个Partition:partition-2,将被分配到Broker3((3 mod 6)=3)上,partition-2的副本将被分配到Broker4上((3+1) mod 6=4)。

4.5、kafka文件存储特点

(1)Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。可以设置segment文件大小定期删除消息过期时间定期删除

(2)通过索引信息可以快速定位message。

(3)通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。

(4)通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

4.6、Consumer和Partition的关系

对于多个Partition,多个Consumer

(1)如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数。

(2)如果consumer比partition少,一个consumer会对应于多个partition,这里要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目。

(3)如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

(4)增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

(5)High-level接口中获取不到数据的时候是会block的。

关于zookeeper中Offset初始值的问题:

Zookeeper中Offset的初始值默认是非法的,因此通过设置Consumer的参数auto.offset.reset来告诉Consumer读取到Offset非法时该怎么做。

auto.offset.reset有三个值:

(1)smallest : 自动把zookeeper中的offset设为Partition中最小的offset;

(2)largest : 自动把zookeeper中offset设为Partition中最大的offset;

(3)anything else: 抛出异常;

auto.offset.reset默认值是largest,此种情况下如果producer先发送了10条数据到某个Partition,然后Consumer启功后修改zookeeper中非法Offset值为Partition中的最大值9(Offset从0开始),这样Consumer就忽略了这10条消息。就算现在再次设置成smallest也读取不到之前的10条数据了,因为此时Offset是合法的了。

所以,想要读取之前的数据,就需要在一开始指定auto.offset.reset=smallest。

5、Replication副本同步机制

Replication是基于Partition而不是Topic的。每个Partition都有自己的备份,且分布在不同的Broker上。这些Partition当中有一个是Leader,其他都是Follower。Leader Partition负责读写操作,Follower Partition只负责从Leader处复制数据,使自己与Leader保持一致。Zookeeper负责两者间的故障切换(fail over,可以理解为Leader选举)。

消息复制延迟受最慢的Follower限制,Leader负责跟踪所有Follower的状态,如果Follower“落后”太多或者失效,Leader就将此Follower从Replication同步列表中移除,但此时Follower是活着的,并且一直从Leader拉取数据,直到差距小于replica.lag.max.messages值,然后重新加入同步列表。当一条消息被所有的Follower保存成功,此消息才被认为是“committed”,Consumer才能消费这条消息。这种同步方式就要求Leader和Follower之间要有良好的网络环境。

一个partition的follower落后于leader足够多时,会被认为不在同步副本列表或处于滞后状态。在Kafka-0.8.2.x中,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或replication响应Leader partition的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,而后者是用来检测失效或死亡的副本。假设replica.lag.max.messages设置为4,表明只要follower落后leader的消息数小于4,就不会从同步副本列表中移除。replica.lag.time.max设置为500 ms,表明只要follower向leader发送拉取数据请求时间间隔超过500 ms,就会被标记为死亡,并且会从同步副本列表中移除。

当Leader处于流量高峰时,比如一瞬间就收到了4条数据,此时所有Follower将被认为是“out-of-sync”并且从同步副本列表中移除,然后Follower拉取数据赶上Leader过后又重新加入同步列表,就这样Follower频繁在副本同步列表移除和重新加入之间来回切换。

即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可(注意:不同于其他分布式存储,比如hbase需要"多数派"存活才行)。

当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower。kafka中leader选举并没有采用"投票多数派"的算法,因为这种算法对于"网络稳定性"/"投票参与者数量"等条件有较高的要求,而且kafka集群的设计,还需要容忍N-1个replicas失效。对于kafka而言,每个partition中所有的replicas信息都可以在zookeeper中获得,那么选举leader将是一件非常简单的事情。选择follower时需要兼顾一个问题,就是新leader 所在的server服务器上已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力。在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader。在整个集群中,只要有一个replicas存活,那么此partition都可以继续接受读写操作。

6、Consumer均衡算法

当一个Group中,有Consumer加入或者离开时,会触发Partitions均衡。均衡的最终目的,是提升Topic的并发消费能力。

(1)假如topic1,具有如下partitions: P0,P1,P2,P3

(2)加入group中,有如下consumer: C0,C1

(3)首先根据partition索引号对partitions排序: P0,P1,P2,P3

(4)根据consumer.id排序: C0,C1

(5)计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

(6)然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

通过此算法,就能知道具体Consumer消费的是哪个分区中的数据。

7、Producer消息路由机制

在kafka-Client-0.11.0.0.jar中,提供的有默认的KafkaProducer和DefaultPartitioner实现。其中DefaultPartitioner主要提供了Producer发送消息到分区的路由算法,如果给定Key值,就通过Key的哈希值和分区个数取余来计算;如果没有给定Key,就通过ThreadLocalRandom.current().nextInt()产生的随机数与分区数取余(其中涉及复杂步奏参考如下代码)。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class DefaultPartitioner implements Partitioner {
 
    private final ConcurrentMap<string, atomicinteger=""> topicCounterMap = new ConcurrentHashMap<>();
 
    public void configure(Map<string,> configs) {}
 
    /**
     * 计算给定记录的分区
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<partitioninfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<partitioninfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
 
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
 
    public void close() {}
 
}</partitioninfo></partitioninfo></string,></string,>

我们也可以设置自己的Partition路由规则,需要继承Partitioner类实现

1
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);。

方法。

8、kafka消息投递保证(delivery保证)

Kafka的消息delivery保证主要有三种:

(1)At most once 最多一次。消息可能会丢失,但绝不会重复传输。

(2)At least once 最少一次。消息绝不会丢失,但可能会重复传输。

(3)Exactly once 正好一次。每条消息正好被传输一次和消费一次。

8.1、Producer delivery保证

Producer的delivery保证可以通过参数request.required.acks设置来保证:

(1)request.required.acks=0。

相当于消息异步发送。消息一发送完毕马上发送下一条。由于不需要ack,可能会造成数据丢失,相当于实现了At most once。

(2)request.required.acks=1。

消息发送给Leader Partition,在Leader Partition确认消息并ack 生产者过后才发下一条。

(3)request.required.acks=-1。

消息发送给Leader,在Leader收到所有Follower确认保存消息的ack后对producer进行ack才发送下一条。

所以一条消息从Producer到Broker至少是确保了At least once的,因为有Replication的存在,只要消息到达Broker就不会丢失。如果ack出现问题,比如网络中断,有可能会导致producer收不到ack而重复发送消息。Exactly once这种方式,没有查到相关的实现。

第(3)种方式的具体步奏如下:

a. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader

b. producer 将消息发送给该 leader

c. leader 将消息写入本地 log

d. followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK

e. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

8.2、Consumer delivery保证

Consumer从Broker拉取数据过后,可以选择commit,此操作会在zookeeper中存下此Consumer读取对应Partition消息的Offset,以便下一次拉取数据时会从Partition的下一个Offset消费,避免重复消费。

同样,Consumer可以通过设置参数enable.auto.commit=true来自动确认消息,即Consumer一收到消息立刻自动commit。如果只看消息的读取过程,kafka是确保了Exactly once的,但是实际情况中Consumer不可能读取到数据就结束了,往往还需要处理读取到的数据。因此Consumer处理消息和commit消息的顺序就决定了delivery保证的类别。

(1)先处理后commit

这种方式实现了At least once。Consumer收到消息先处理后提交,如果在处理完成后机器崩溃了,导致Offset没有更新,Consumer下次启动时又会重新读取上一次消费的数据,实际上此消息已经处理过了。

(2)先commit后处理

这种方式实现了At most once。Consumer收到消息过后立刻commit,更新zookeeper上的Offset,然后再处理消息。如果处理还未结束Consumer崩溃了,等Consumer再次启动的时候会读取Offset更新过后的下一条数据,这就导致了数据丢失。

9、High Level API和Low Level API

Kafka提供了两种Consumer API,选用哪种API需要视具体情况而定。

9.1、High Level Consumer API

High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Partition的last offset )、Broker失败转移以及增减Partition、Consumer时的负载均衡(当Partition和Consumer增减时,Kafka自动进行Rebalance)。

9.2、Low Level Consumer API

Low Level Consumer API,作为底层的Consumer API,提供了消费Kafka Message更大的控制,用户可以实现重复读取、跳读等功能。

使用Low Level Consumer API,是没有对Broker、Consumer、Partition增减进行处理,如果出现这些的增减时,需要自己处理负载均衡。

Low Level Consumer API提供更大灵活控制是以增加复杂性为代价的:

(1)Offset不再透明

(2)Broker自动失败转移需要处理

(3)增加Consumer、Partition、Broker需要自己做负载均衡

Apache Kafka学习(二):java使用Kafka

1、maven依赖

1
2
3
4
5
6
7
8
9
<dependency>
 
<groupid>org.apache.kafka</groupid>
 
kafka-clients</artifactid>
 
<version>0.11.0.0</version>
 
</dependency>

2、Producer

2.1、producer发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 最简单的kafka producer
 */
public class ProducerDemo {
 
    public static void main(String[] args) {
        Properties properties =new Properties();
        //zookeeper服务器集群地址,用逗号隔开
        properties.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //自定义producer拦截器
        properties.put("interceptor.classes", "com.lt.kafka.producer.MyProducerInterceptor");
        //自定义消息路由规则(消息发送到哪一个Partition中)
        //properties.put("partitioner.class", "com.lt.kafka.producer.MyPartition");
         
        Producer<string, string=""> producer = null;
        try {
            producer = new KafkaProducer<string, string="">(properties);
            for (int i = 20; i < 40; i++) {
                String msg = "This is Message:" + i;
                 
                /**
                 * kafkaproducer中会同时调用自己的callback的onCompletion方法和producerIntercepter的onAcknowledgement方法。
                 * 关键源码:Callback interceptCallback = this.interceptors == null
                 * callback : new InterceptorCallback<>(callback,
                 * this.interceptors, tp);
                 */
                producer.send(new ProducerRecord<string, string="">("leixiang", msg),new MyCallback());
            }
        } catch (Exception e) {
            e.printStackTrace();
 
        } finally {
            if(producer!=null)
            producer.close();
        }
    }
 
}</string,></string,></string,>

2.2、自定义producer拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.Map;
 
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
 * 自定义producer拦截器
 */
public class MyProducerInterceptor implements ProducerInterceptor<string,string> {
 
    /**
     * 打印配置相关信息
     */
    public void configure(Map<string,> configs) {
        // TODO Auto-generated method stub
        System.out.println(configs.toString());
    }
 
    /**
     * producer发送信息拦截方法
     */
    public ProducerRecord<string,string> onSend(ProducerRecord<string, string=""> record) {
        System.out.println("拦截处理前》》》");
        String topic=record.topic();
        String value=record.value();
        System.out.println("拦截处理前的消息:"+value);
        ProducerRecord<string,string> record2=new ProducerRecord<string, string="">(topic, value+" (intercepted)");
        System.out.println("拦截处理后的消息:"+record2.value());
        System.out.println("拦截处理后《《《");
        return record2;
    }
 
    /**
     * 消息确认回调函数,和callback的onCompletion方法相似。
     * 在kafkaProducer中,如果都设置,两者都会调用。
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (metadata != null)
            System.out.println("MyProducerInterceptor onAcknowledgement:RecordMetadata=" + metadata.toString());
        if (exception != null)
            exception.printStackTrace();
    }
 
    /**
     * interceptor关闭回调
     */
    public void close() {
        System.out.println("MyProducerInterceptor is closed!");
    }
 
}</string,></string,string></string,></string,string></string,></string,string>

2.3、自定义消息路由规则

自定义路由规则,可以根据自己的需要定义消息发送到哪个分区。自定义路由规则需要实现Partitioner。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.Map;
 
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
 
public class MyPartition implements Partitioner{
 
    public void configure(Map<string,> arg0) {
        // TODO Auto-generated method stub
         
    }
 
    public void close() {
        // TODO Auto-generated method stub
         
    }
 
    public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) {
        // TODO Auto-generated method stub
        return 0;
    }
 
}</string,>

3、Consumer

3.1、自动提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.Arrays;
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
public class AutoCommitConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
        props.put("group.id", "leixiang");
        props.put("enable.auto.commit", "true");
        //想要读取之前的数据,必须加上
        //props.put("auto.offset.reset", "earliest");
        /* 自动确认offset的时间间隔 */
        props.put("auto.commit.interval.ms", "1000");
        /*
         * 一旦consumer和kakfa集群建立连接,
         * consumer会以心跳的方式来高速集群自己还活着,
         * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence
         */
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。
        //props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");
         
        @SuppressWarnings("resource")
        KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
        try {
            /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/
            consumer.subscribe(Arrays.asList("leixiang"));
            while (true) {
                //轮询数据。如果缓冲区中没有数据,轮询等待的时间为毫秒。如果0,立即返回缓冲区中可用的任何记录,则返回空
                ConsumerRecords<string, string=""> records = consumer.poll(100);
                for (ConsumerRecord<string, string=""> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                            record.value());
            }
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
}</string,></string,></string,></string,>

3.2、手动提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.Arrays;
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
public class ManualCommitConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.0.218:9092,172.16.0.219:9092,172.16.0.217:9092");
        props.put("group.id", "leixiang");
        props.put("enable.auto.commit", "false");//手动确认
        /* 自动确认offset的时间间隔 */
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");//想要读取之前的数据,必须加上
        /*
         * 一旦consumer和kakfa集群建立连接,
         * consumer会以心跳的方式来高速集群自己还活着,
         * 如果session.timeout.ms 内心跳未到达服务器,服务器认为心跳丢失,会做rebalence
         */
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //配置自定义的拦截器,可以在拦截器中引入第三方插件实现日志记录等功能。
        props.put("interceptor.classes", "com.lt.kafka.consumer.MyConsumerInterceptor");
         
        KafkaConsumer<string, string=""> consumer = new KafkaConsumer<string, string="">(props);
        /* 消费者订阅的topic, 可同时订阅多个 ,用逗号隔开*/
        consumer.subscribe(Arrays.asList("leixiang"));
        while (true) {
            ConsumerRecords<string, string=""> records = consumer.poll(100);
            for (ConsumerRecord<string, string=""> record : records) {
                //处理消息
                saveMessage(record);
                //手动提交,并且设置Offset提交回调方法
                //consumer.commitAsync(new MyOffsetCommitCallback());
                consumer.commitAsync();
            }
        }
    }
     
    public static void saveMessage(ConsumerRecord<string, string=""> record){
        System.out.printf("处理消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                            record.value());
    }
}</string,></string,></string,></string,></string,>

自定义Consumer拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
 
public class MyConsumerInterceptor implements ConsumerInterceptor<string, string=""> {
 
    public void configure(Map<string,> configs) {
        System.out.println("MyConsumerInterceptor configs>>>"+configs.toString());
    }
 
    public ConsumerRecords<string, string=""> onConsume(ConsumerRecords<string, string=""> records) {
        System.out.println("onConsume");
        return records;
    }
 
    public void onCommit(Map<topicpartition, offsetandmetadata=""> offsets) {
        System.out.println("onCommit");
    }
 
    public void close() {
        System.out.println("MyConsumerInterceptor is closed!");
    }
 
}</topicpartition,></string,></string,></string,></string,>

自定义Offset提交回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.Map;
 
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
 
public class MyOffsetCommitCallback implements OffsetCommitCallback {
 
    public void onComplete(Map<topicpartition, offsetandmetadata=""> offsets, Exception exception) {
        if (offsets != null)
            System.out.println("offsets>>>" + offsets.toString());
        if (exception != null)
            exception.printStackTrace();
    }
 
}</topicpartition,>

Apache Kafka学习(三):Kafka常用命令

1、开启zookeeper(在安装目录下使用命令)

Linux:bin/zkServer.sh start

windows:bin\zkServer.cmd

2、启动kafka(安装目录下使用命令)

Linux:bin/kafka-server-start.sh start config/server.properties

windows:bin\windows\kafka-server-start.bat config\server.properties

3、查看topic名称列表

Linux:bin/kafka-topics.sh -list --zookeeper 172.16.0.99:2181,172.16.0.218:2181

windows:bin\windows\kafka-topics.bat -list --zookeeper 172.16.0.99:2181,172.16.0.218:2181

4、查看topic详情

Linux:bin/kafka-topics.sh -zookeeper localhost:2181 --topic test --describe

windows:bin\windows\kafka-topics.bat --zookeeper localhost:2181 --topic test --describe

5、删除topic

Linux:bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic "test"

windows:bin\windows\kafka-topics.bat --zookeeper localhost:2181 --delete --topic "test"

注意:集群中一台机器删除了topic,其他机器同步删除相同topic

6、创建topic

Linux:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

windows:bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

注意:

replication-factor:副本个数,一般为小于等于Broker个数。

partitions:分区个数。如果副本个数为1,分区为4,则4个分区会均匀的分布在各个Broker上。如果Broker为2,副本为2,分区为4,则每个Broker上面都有4个分区。

7、创建Consumer

Linux:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

windows:bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --form-beginning

注意:form beginning表示从头拉取。

8、创建Producer

Linux:bin/kafka-console-producer.sh --broker-list 172.16.0.99:9020,172.16.0.218:9020 --topic test

windows:bin\windows\kafka-console-producer.bat --broker-list 172.16.0.99:9092,172.16.0.218:9080 --topic test

注意:此处是kafka的端口,而且在集群里如果此处填localhost,会报一个连接错误,猜想应该是消息没有到达集群,因此此处将集群的ip都填上。

9、查询topic所有分区的offset值

Linux:bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.162.160.115:9092 --topic s1mmetest --time -1

10、查询kafka集群当前topic所有分区中的消息数目

Linux:bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.162.160.115:9092 --topic s1mmetest --time -2

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多