分享

创建Kafka消费者

 关平藏书 2018-09-04

在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似:把想要传给消费者的属性放在Properties对象里。一般只需要使用3个必要的属性:bootstrap.servers、key.deserializer和value .deserializer。
第1个属性bootstrap.servers指定了Kafka集群的连接字符串。它的用途与在KafkaProducer中的用途是一样的。另外两个属性key.deserializer和value.deserializer与生产者的serializer定义也很类似,不过它们不是使用指定的类把Java对象转成字节数组,而是使用指定的类把字节数组转成Java对象。
第4个属性group.id不是必需的,不过它指定了KafkaConsuner属于哪一个消费者群组。创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见,下面的代码片段演示了如何创建一个KafkaConsuner对象:

  • Properties props = new Properties();
  • props.put("bootstrap.servers","broker1:9092,broker2:9092);
  • props.put("group.id","CountryCounter");
  • props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  • props.put("value.deserializer","org.apache.kafka.common.sertalizatlon.StringDesertalizer");
  • KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);

我们假设消费的键和值都是字符串类型,所以使用的是内置的StringDeserializer,并且使用字符串类型创建了KafkaConsumer对象。唯一不同的是新增了group.id属性,它指定了消费者所属群组的名字。

创建好消费者之后,下一步可以开始订阅主题了。subscribe()方祛接受一个主题列表作为参数,使用起来很简单:
consumer.subscribe(Collections.singletonlist("customerCountries"));
为了简单起见,我们创建了一个只包含单个元素的列表,主题的名字叫作"customerCountries"

我们也可以在调用subscribe()方法时传入一个正则表达式。正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题是很常见的做法。要订阅所有与test相关的主题,可以这样做: consumer.subscribe("test.*");

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多