4. 使用Kafka的Producer API来完成消息的推送
1) Kafka 0.9.0.1的java client依赖: Xml代码
2) 写一个KafkaUtil工具类,用于构造Kafka Client Java代码
KafkaProducer<K,V>的K代表每条消息的key类型,V代表消息类型。消息的key用于决定此条消息由哪一个partition接收,所以我们需要保证每条消息的key是不同的。 Producer端的常用配置
更多的Producer配置见官网:http://kafka./documentation.html#producerconfigs
3) 写一个简单的Producer端,每隔1秒向Kafka集群发送一条消息: Java代码
在调用KafkaProducer的send方法时,可以注册一个回调方法,在Producer端完成发送后会触发回调逻辑,在回调方法的 metadata对象中,我们能够获取到已发送消息的offset和落在的分区等信息。注意,如果acks配置为0,依然会触发回调逻辑,只是拿不到 offset和消息落地的分区信息。 跑一下,输出是这样的: message send to partition 0, offset: 28
message send to partition 1, offset: 26 message send to partition 0, offset: 29 message send to partition 1, offset: 27 message send to partition 1, offset: 28 message send to partition 0, offset: 30 message send to partition 0, offset: 31 message send to partition 1, offset: 29 message send to partition 1, offset: 30 message send to partition 1, offset: 31 message send to partition 0, offset: 32 message send to partition 0, offset: 33 message send to partition 0, offset: 34 message send to partition 1, offset: 32 乍一看似乎offset乱掉了,但其实这是因为消息分布在了两个分区上,每个分区上的offset其实是正确递增的。
5. 使用Kafka的Consumer API来完成消息的消费
1) 改造一下KafkaUtil类,加入Consumer client的构造。 Java代码
同样,我们介绍一下Consumer常用配置
全部的Consumer配置见官方文档:http://kafka./documentation.html#newconsumerconfigs
2) 编写Consumer端: Java代码
运行输出: fetched from partition 0, offset: 28, message: this is message0
fetched from partition 0, offset: 29, message: this is message2 fetched from partition 0, offset: 30, message: this is message5 fetched from partition 0, offset: 31, message: this is message6 fetched from partition 0, offset: 32, message: this is message10 fetched from partition 0, offset: 33, message: this is message11 fetched from partition 0, offset: 34, message: this is message12 fetched from partition 1, offset: 26, message: this is message1 fetched from partition 1, offset: 27, message: this is message3 fetched from partition 1, offset: 28, message: this is message4 fetched from partition 1, offset: 29, message: this is message7 fetched from partition 1, offset: 30, message: this is message8 fetched from partition 1, offset: 31, message: this is message9 fetched from partition 1, offset: 32, message: this is message13
说明:
如果不想让kafka控制consumer拉取数据时在partition间的负载均衡,也可以手工控制: Java代码
使用consumer.assign()方法为consumer线程指定1个或多个partition。
此处的坑: 在测试中我发现,如果用手工指定partition的方法拉取消息,不知为何kafka的自动提交offset机制会失效,必须使用手动方式才能正确提交已消费的消息offset。
题外话: 在 真正的应用环境中,Consumer端将消息拉取下来后要做的肯定不止是输出出来这么简单,在消费消息时很有可能需要花掉更多的时间。1个 Consumer线程消费消息的速度很有可能是赶不上Producer产生消息的速度,所以我们不得不考虑Consumer端采用多线程模型来消费消息。
然而KafkaConsumer并不是线程安全的,多个线程操作同一个KafkaConsumer实例会出现各种问题,Kafka官方对于Consumer端的多线程处理给出的指导建议如下: 1. 每个线程都持有一个KafkaConsumer对象 好处:
弊端:
2. 解耦,1个Consumer线程负责拉取消息,数个Worker线程负责消费消息
弊端:
个人认为第二种方式更加可取,consumer数不能超过partition数这个限制是很要命的,不可能为了提高Consumer消费消息的效率而把Topic分成更多的partition,partition越多,集群的高可用性就越低。
6. Kafka集群高可用性测试
1) 查看当前Topic的状态: Shell代码
输出: Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 可以看到,partition0的leader是broker1,parition1的leader是broker0
2) 启动Producer向Kafka集群发送消息 输出: message send to partition 0, offset: 35
message send to partition 1, offset: 33 message send to partition 0, offset: 36 message send to partition 1, offset: 34 message send to partition 1, offset: 35 message send to partition 0, offset: 37 message send to partition 0, offset: 38 message send to partition 1, offset: 36 message send to partition 1, offset: 37
3) 登录SSH将broker0,也就是partition 1的leader kill掉
再次查看Topic状态: Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1 Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1 可以看到,当前parition0和parition1的leader都是broker1了
此时再去看Producer的输出:
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with /10.0.0.100 disconnected
java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54) at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72) at org.apache.kafka.common.network.Selector.poll(Selector.java:274) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) at java.lang.Thread.run(Thread.java:745) [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 7 to Cluster(nodes = [Node(1, 10.0.0.101, 9092)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = test, partition = 0, leader = 1, replicas = [1,], isr = [1,]])
能看到Producer端的DEBUG日志显示与broker0的链接断开了,此时Kafka立刻开始更新集群metadata,更新后的metadata表示broker1现在是两个partition的leader,Producer进程很快就恢复继续运行,没有漏发任何消息,能够看出Kafka集群的故障切换机制还是很厉害的
4) 我们再把broker0启动起来 Shell代码
然后再次检查Topic状态: Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: test Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0 我们看到,broker0启动起来了,并且已经是in-sync状态(注意Isr从1变成了1,0),但此时两个partition的leader还都是 broker1,也就是说当前broker1会承载所有的发送和拉取请求。这显然是不行的,我们要让集群恢复到负载均衡的状态。 这时候,需要使用Kafka的选举工具触发一次选举: Shell代码
选举完成后,再次查看Topic状态: Topic:test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0 可以看到,集群重新回到了broker0挂掉之前的状态 但此时,Producer端产生了异常: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
原因是Producer端在尝试向broker1的parition0发送消息时,partition0的leader已经切换成了broker0,所以消息发送失败。 此时用Consumer去消费消息,会发现消息的编号不连续了,确实漏发了一条消息。这是因为我们在构造Producer时设定了retries=0,所以在发送失败时Producer端不会尝试重发。 将retries改为3后再次尝试,会发现leader切换时再次发生了同样的问题,但Producer的重发机制起了作用,消息重发成功,启动Consumer端检查也证实了所有消息都发送成功了。
每 次集群单点发生故障恢复后,都需要进行重新选举才能彻底恢复集群的leader分配,如果嫌每次这样做很麻烦,可以在broker的配置文件(即 server.properties)中配置auto.leader.rebalance.enable=true,这样broker在启动后就会自动进 行重新选举
至此,我们通过测试证实了集群出现单点故障和恢复的过程中,Producer端能够保持正确运转。接下来我们看一下Consumer端的表现:
5) 同时启动Producer进程和Consumer进程 此时Producer一边在生产消息,Consumer一边在消费消息
6) 把broker0干掉,观察Consumer端的输出: 能看到,在broker0挂掉后,consumer也端产生了一系列INFO和WARN输出,但同Producer端一样,若干秒后自动恢复,消息仍然是连续的,并未出现断点。
7) 再次把broker0启动,并触发重新选举,然后观察输出: fetched from partition 0, offset: 418, message: this is message48
fetched from partition 0, offset: 419, message: this is message49 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Offset commit for group 1 failed due to NOT_COORDINATOR_FOR_GROUP, will find new coordinator and retry [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking the coordinator 2147483646 dead. [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed: This is not the correct coordinator for this group. fetched from partition 1, offset: 392, message: this is message50 fetched from partition 0, offset: 420, message: this is message51 能看到,重选举后Consumer端也输出了一些日志,意思是在提交offset时发现当前的调度器已经失效了,但很快就重新获取了新的有效调度器,恢复 了offset的自动提交,验证已提交offset的值也证明了offset提交并未因leader切换而发生错误。
如上,我们也通过测试证实了Kafka集群出现单点故障时,Consumer端的功能正确性。
|
|