大数据领域中操作的数据类型: Bounded:有界----->一般批处理(一个文件或者一批文件,不管文件多大,都是可以度量,eg 1T 1PB 1EB 1NB) HADOOP-MR Hive SparkCore。。。 Unbounded:无界----->源源不断的流水一样(流数据,就像水流一样) Storm Spark-Streaming Flink。。。 --------------------------------------------------------------------------------- 消息队列(Message Queue) 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素。入队、出队。 消息队列 MQ 消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。 MQ主要分为两类:点对点(p2p)、发布订阅(Pub/Sub) 共同点: 消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。 不同点: p2p模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。 Pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。 那么在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka。 --------------------------------------------------------------------------------- Kafka简介 Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,于2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。 三大特点: 高吞吐量 可以满足每秒百万级别消息的生产和消费——生产消费。 持久性 有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。 分布式 基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性。 Kafka的组件 一个MQ需要哪些部分?生产、消费、消息类别、存储等等。 对于kafka而言,kafka服务就像是一个大的水池。不断的生产、存储、消费着各种类别的消息。那么kafka由何组成呢? > Kafka服务: > Topic:主题,Kafka处理的消息的不同分类。 > Broker:消息代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。 > Partition:Topic物理上的分组,一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。 > Message:消息,是通信的基本单位,每个消息都属于一个partition > Kafka服务相关 > Producer:消息和数据的生产者,向Kafka的一个topic发布消息。 > Consumer:消息和数据的消费者,定于topic并处理其发布的消息。 > Zookeeper:协调kafka的正常运行。 --------------------------------------------------------------------------------- Kafka的安装 单机安装 解压: opt]# tar -zxvf soft/kafka_2.10-0.10.0.1.tgz 重命名:opt]# mv kafka_2.10-0.10.0.1/ kafka 添加KAFKA_HOME至环境变量:/etc/profile.d/hadoop-etc.sh export KAFKA_HOME=/opt/kafaka export PATH=$PATH:$KAFKA_HOME/bin 配置相关参数:$KAFKA_HOME/config/server.properties 主要参数:broker.id、log.dirs、zookeeper.connect broker.id=0 log.dirs=/opt/logs/kafka [kafka数据的存放目录] zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181 kafka实例broker监听默认端口9092,配置listeners=PLAINTEXT://:9092 启动: $KAFKA_HOME/bin/kafka-server-start.sh [-daemon] config/server.properties -daemon 可选,表示后台启动kafka服务 --------------------------------------------------------------------------------- Kafka的操作--topic 创建Topic hadoop kafka]# bin/kafka-topics.sh --create --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 1 --replication-factor 1 kafka]# bin/kafka-topics.sh --create --topic hive --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 1 --replication-factor 1 kafka]# bin/kafka-topics.sh --create --topic hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 1 创建topic过程的问题,replication-factor个数不能超过broker的个数 bin/kafka-topics.sh --create --topic sqoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3 Error while executing topic command : replication factor: 3 larger than available brokers: 1 查看Topic列表 kafka]# bin/kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 查看某一个具体的Topic kafka]# bin/kafka-topics.sh --describe --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 Topic:hadoopPartitionCount:1ReplicationFactor:1 Configs: Topic: hadoopPartition: 0Leader: 0Replicas: 0Isr: 0 PartitionCount:topic对应的partition的个数 ReplicationFactor:topic对应的副本因子,说白就是副本个数 Partition:partition编号,从0开始递增 Leader:当前partition起作用的breaker.id Replicas: 当前副本数据坐在的breaker.id,是一个列表,排在最前面的其作用 Isr:当前kakfa集群中可用的breaker.id列表 修改Topic 不能修改replication-factor,以及只能对partition个数进行增加,不能减少 bin/kafka-topics.sh --alter --topic hive --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 partition由3变为2的时,抛出的异常: ERROR kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased 删除Topic kafka]# bin/kafka-topics.sh --delete --topic hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 Topic hbase is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. 彻底删除一个topic,需要在server.properties中配置delete.topic.enable=true,否则只是标记删除 配置完成之后,需要重启kafka服务 --------------------------------------------------------------------------------- kafka-topic-生产消费数据(控制台命令行的操作方式) 生产数据:kafka]# bin/kafka-console-producer.sh --topic hive --broker-list uplooking01:9092 消费数据:bin/kafka-console-consumer.sh --blacklist hive --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --from-beginning从头开始消费数据 --blacklist黑名单过滤 kafka]# bin/kafka-console-producer.sh --topic hbase --broker-list uplooking01:9092 hbase kafka]# bin/kafka-console-producer.sh --topic sqoop --broker-list uplooking01:9092 sqoop kafka]# bin/kafka-console-producer.sh --topic hive --broker-list uplooking01:9092 hive bin/kafka-console-consumer.sh --blacklist hive,hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 sqoop --whitelist白名单过滤 bin/kafka-console-consumer.sh --whitelist hive,hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 hive hbase --------------------------------------------------------------------------------- Kafka分布式集群的安装 在单机的基础之上,只要修改server.properties配置文件中的broker.id是其在集群能够保证唯一即可, kafka集群中的节点没有主从之分,大家都是一样的 在每一台机器上启动方式和单机启动一致 ---------------------------------------------------------------------------------
|
|
来自: BIGDATA云 > 《Kafka消息队列》