分享

KAFKA-消息队列

 BIGDATA云 2018-07-13
大数据领域中操作的数据类型:
Bounded:有界----->一般批处理(一个文件或者一批文件,不管文件多大,都是可以度量,eg 1T 1PB 1EB 1NB)
HADOOP-MR Hive SparkCore。。。
Unbounded:无界----->源源不断的流水一样(流数据,就像水流一样)
Storm Spark-Streaming Flink。。。
---------------------------------------------------------------------------------
消息队列(Message Queue)
消息 Message
  网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。
队列 Queue
  一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素。入队、出队。
消息队列 MQ
  消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。
MQ主要分为两类:点对点(p2p)、发布订阅(Pub/Sub)
共同点:
  消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。
不同点:
  p2p模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver)
一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。
  Pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)
  每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。
  那么在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka。
---------------------------------------------------------------------------------
Kafka简介
Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,于2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。
三大特点:
高吞吐量
  可以满足每秒百万级别消息的生产和消费——生产消费。
持久性
  有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。
分布式
  基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性。
Kafka的组件
一个MQ需要哪些部分?生产、消费、消息类别、存储等等。
 对于kafka而言,kafka服务就像是一个大的水池。不断的生产、存储、消费着各种类别的消息。那么kafka由何组成呢?
> Kafka服务:
 > Topic:主题,Kafka处理的消息的不同分类。
 > Broker:消息代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。
 > Partition:Topic物理上的分组,一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。
 > Message:消息,是通信的基本单位,每个消息都属于一个partition
> Kafka服务相关
 > Producer:消息和数据的生产者,向Kafka的一个topic发布消息。
 > Consumer:消息和数据的消费者,定于topic并处理其发布的消息。
 > Zookeeper:协调kafka的正常运行。
---------------------------------------------------------------------------------
Kafka的安装
单机安装
解压:  opt]# tar -zxvf soft/kafka_2.10-0.10.0.1.tgz
重命名:opt]# mv kafka_2.10-0.10.0.1/ kafka
添加KAFKA_HOME至环境变量:/etc/profile.d/hadoop-etc.sh
export KAFKA_HOME=/opt/kafaka
export PATH=$PATH:$KAFKA_HOME/bin
配置相关参数:$KAFKA_HOME/config/server.properties
主要参数:broker.id、log.dirs、zookeeper.connect
broker.id=0
log.dirs=/opt/logs/kafka               [kafka数据的存放目录]
zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181
kafka实例broker监听默认端口9092,配置listeners=PLAINTEXT://:9092
启动:
$KAFKA_HOME/bin/kafka-server-start.sh [-daemon] config/server.properties 
-daemon 可选,表示后台启动kafka服务
---------------------------------------------------------------------------------
Kafka的操作--topic
创建Topic 
hadoop
kafka]# bin/kafka-topics.sh --create --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 1 --replication-factor 1
kafka]# bin/kafka-topics.sh --create --topic hive --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 1 --replication-factor 1
kafka]# bin/kafka-topics.sh --create --topic hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 1
创建topic过程的问题,replication-factor个数不能超过broker的个数
bin/kafka-topics.sh --create --topic sqoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
Error while executing topic command : replication factor: 3 larger than available brokers: 1

查看Topic列表
kafka]# bin/kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
查看某一个具体的Topic
kafka]# bin/kafka-topics.sh --describe --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 
Topic:hadoopPartitionCount:1ReplicationFactor:1
Configs:
Topic: hadoopPartition: 0Leader: 0Replicas: 0Isr: 0
PartitionCount:topic对应的partition的个数
ReplicationFactor:topic对应的副本因子,说白就是副本个数
Partition:partition编号,从0开始递增
Leader:当前partition起作用的breaker.id
Replicas: 当前副本数据坐在的breaker.id,是一个列表,排在最前面的其作用
Isr:当前kakfa集群中可用的breaker.id列表
修改Topic
不能修改replication-factor,以及只能对partition个数进行增加,不能减少
bin/kafka-topics.sh --alter --topic hive --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3
partition由3变为2的时,抛出的异常:
    ERROR kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased
删除Topic
kafka]# bin/kafka-topics.sh --delete --topic hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
Topic hbase is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
彻底删除一个topic,需要在server.properties中配置delete.topic.enable=true,否则只是标记删除
配置完成之后,需要重启kafka服务
---------------------------------------------------------------------------------
kafka-topic-生产消费数据(控制台命令行的操作方式)
生产数据:kafka]# bin/kafka-console-producer.sh --topic hive --broker-list uplooking01:9092
消费数据:bin/kafka-console-consumer.sh --blacklist hive --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
--from-beginning从头开始消费数据
--blacklist黑名单过滤
kafka]# bin/kafka-console-producer.sh --topic hbase --broker-list uplooking01:9092
hbase
kafka]# bin/kafka-console-producer.sh --topic sqoop --broker-list uplooking01:9092
sqoop
kafka]# bin/kafka-console-producer.sh --topic hive --broker-list uplooking01:9092
hive
bin/kafka-console-consumer.sh --blacklist hive,hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
sqoop
--whitelist白名单过滤
bin/kafka-console-consumer.sh --whitelist hive,hbase --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
hive
hbase
---------------------------------------------------------------------------------
Kafka分布式集群的安装
在单机的基础之上,只要修改server.properties配置文件中的broker.id是其在集群能够保证唯一即可,
kafka集群中的节点没有主从之分,大家都是一样的
在每一台机器上启动方式和单机启动一致
---------------------------------------------------------------------------------

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多