Kafka集群搭建

1、环境CentOS6.8 + jdk1.8.0_151+zookeeper-3.4.11

      jdk安装请参考CentOS 搭建JDK环境 

      Zookeeper安装请参考Zookeeper集群搭建

2、下载 kafka_2.12-0.11.0.2.tgz,下载地址: http://kafka.apache.org/downloads

3、安装

 3.1、上传kafka_2.12-0.11.0.2.tgz至/usr/dev/kafka安装目录下

 3.2、将kafka_2.12-0.11.0.2.tgz解压至kafka目录

tar -zxvf kafka_2.12-1.0.1.tgz

 3.3、修改配置文件

 进入到config目录

cd kafka_2.12-1.0.1/config/

 修改server.properties配置文件

 主要修改:server.properties 这个文件即可,我们可以发现在目录下:

 有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

vi server.properties
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.202.107.207:9092
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/dev/kafka/kafka_2.12-1.0.1/logs#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 #设置zookeeper的连接端口
 

 上面是参数的解释,实际的修改项为(修改如下三项即可):

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://10.202.107.207:9092
zookeeper.connect=10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 #设置zookeeper的连接端口

将上述修改后的kafka附件包拷贝到其他虚拟机下,修改 server.properties配置文件,修改完配置文件完成集群安装

4、启动并测试

 4.1、启动

进入bin目录:
cd /usr/dev/kafka/kafka_2.12-1.0.1/bin/	
启动kafka服务:
非守护程序方式启动:
./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties
守护程序方式启动:nohup  &
nohup ./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties &

  4.2、检查服务是否启动

jps

 

 4.3

1、kafka架构
  生产者、消费者(组)、zookeeper、broker(组)、主题-->分区-->副本因子
2、主题:创建、修改、删除(标记-设置特殊的属性-server.properties)
3、分区:从主题下进行分区
    单独的一个分区内的消息是有序且不可修改
	但是多个分区之间的数据是无序
	消息怎么知道进入特定的分区?--消息的组成部分key-value
4、复本
    复本因子数应当小于等于可用的broker数
	leader和follower  leader与外界的读写。然后follower云采用拉的方式与leader同步
5、消费组
    同一消费组中的消费者应该小于等于所操作主题下的分区数。如果小于的话,最好成位数
6、同步与异步
    同步:消息发送给kafka之后,在些等待返回结果
	异步:消息先发送到阻塞队列中,通过轮询的方式,将阻塞队列中的消费发送给kafka
	回调回来的消息,是由用户(生产者)自己来决定
7、kafka与zookeeper的节点说明
    多个broker之间会存在一个controller broker的角色	
	10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181

进入bin目录:
cd /usr/dev/kafka/kafka_2.12-1.0.1/bin/	
启动kafka服务:
非守护程序方式启动:
./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties
守护程序方式启动:nohup  &
nohup ./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties &
停止kafka服务:
./kafka-server-stop.sh 

创建Topic
./kafka-topics.sh --create --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 --topic mytopic --partitions 3 --replication-factor 3
查看Topic信息(会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点)
./kafka-topics.sh --describe --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 --topic mytopic
列出所有topic:
./kafka-topics.sh --list --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 

将IP为207的kafka停点,测试集群(kill)

创建消费者
./kafka-console-consumer.sh --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 --topic mytopic
创建生产者
./kafka-console-producer.sh --broker-list IP:9092,IP:9092,IP:9092 --topic mytopic
创建topic:
bin/kafka-topics.sh  --create  --zookeeper  localhost:2181  --replication-factor 1  --partitions  1  --topic test 


查看topic信息(包括分区、副本情况等): kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ,会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点

往某topic生产消息: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

从某topic消费消息: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (默认用一个线程消费指定topic的所有分区的数据)

删除某个Kafka groupid:连接Zookeeper后用rmr命令,如删除名为JSI的消费组: rmr /consumers/JSI