Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。官网下载地址:http://kafka./downloads 安装 https: -zxvf kafka_2.-. kafka_2.-. /usr/local//usr/local/kafka 启动 Kafka使用ZooKeeper,所以您需要先启动一个ZooKeeper服务器。如果没有安装。您可以使用随Kafka一起打包的便捷脚本来获取一个快速但是比较粗糙的单节点ZooKeeper实例。 1、启动ZooKeeper 查看ZooKeeper配置 # Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www./licenses/LICENSE-2.0# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 ########## zookeeper的默认端口 2181,在后面会用到 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0# Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false# admin.serverPort=8080 启动命令: bin/zookeeper-server-start.sh config/zookeeper.properties 2、Kafka基本配置 Kafka在config目录下提供了一个基本的配置文件。为了保证可以远程访问Kafka,我们需要修改两处配置。 打开config/server.properties文件,在很靠前的位置有listeners和 advertised.listeners两处配置的注释,去掉这两个注释,并且根据当前服务器的IP修改如下: vim config/server.properties listeners=PLAINTEXT://:9092advertised.listeners=PLAINTEXT://127.0.0.1:9092 #你需要修改为外网或局域网可以访问到的服务器IP 3、启动Kafka bin/kafka-server-start.sh config/server.properties 或者使用后台启动 bin/kafka-server-start.sh -daemon config/server.properties 这里碰到几个问题,我的云服务器只有1G内存: java内存不足的问题 vim jvm.options #修改为-Xms512m-Xmx512m kafaka启动时日式内存不足 vim bin/kafka-server-start.sh#修改为if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"fi 3、创建 Topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 查看 topic 列表 bin/kafka-topics.sh --list --zookeeper localhost:2181 查看描述 topics 信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
返回信息说明: 第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。 “Leader”: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。 “Replicas”: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。 “Isr”: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。 4、启动生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 5、启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --from-beginning 表示从起始位读取 测试 在生产者窗口输入,可以在消费者端查看输入的内容 生产端: 消费端: 特别提示: 一定要先启动ZooKeeper 再启动Kafka 顺序不可以改变。 先关闭kafka ,再关闭zookeeper。 本文转载自:https://www./item/251 |
|