【Kafka】Kafka 1.0.1案例详解之快速入门


本博客文章如无特别说明,均为原创!转载请注明出处:Big data enthusiast(http://www.lubinsu.com/)

本文链接地址:【Kafka】Kafka 1.0.1案例详解之快速入门(http://www.lubinsu.com/index.php/archives/475)

这个章节我们将从Kafka集群的安装部署讲起,并测试topic的创建,消息的发布订阅等功能。希望对你有所帮助。
废话不多说,我们开始
  • 单机模式的安装
  1. 下载Kafka组件
[hadoop@master kafka]$ wget http://mirrors.hust.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
–2018-05-08 11:39:56– http://mirrors.hust.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
Resolving mirrors.hust.edu.cn… 202.114.18.160
Connecting to mirrors.hust.edu.cn|202.114.18.160|:80… connected.
HTTP request sent, awaiting response… 200 OK
Length: 49766096 (47M) [application/octet-stream]
Saving to: “kafka_2.11-1.0.1.tgz”
  1. 下载安装包并解压,配置zookeeper节点
因为Kafka用到了Zookeeper,这里需要先安装zookeeper集群:
[hadoop@master zookeeper]$ wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
–2018-05-08 11:47:33– http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
Resolving mirrors.hust.edu.cn… 202.114.18.160
Connecting to mirrors.hust.edu.cn|202.114.18.160|:80… connected.
HTTP request sent, awaiting response… 200 OK
Length: 35042811 (33M) [application/octet-stream]
Saving to: “zookeeper-3.4.10.tar.gz”
 
解压并修改zookeeper配置文件:
[hadoop@master zookeeper-3.4.10]$ tar -xzf zookeeper-3.4.10.tar.gz
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/hadoop/zookeeper/zookeeper-3.4.10/tmp
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to “0” to disable auto purge feature
#autopurge.purgeInterval=1
  1. 启动Kafka
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-server-start.sh config/server.properties
  1. 创建topic
现在我们创建一个名字为test的topic
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
Created topic “test”.
然后检查看zookeeper中是否已经有了该topic
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-topics.sh –list –zookeeper localhost:2181
test
 
  1. 发送测试消息
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
>this is a message
>this is another message
  1. 接收消息
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
this is a message
this is another message
 
至此单机模式的kafka已经可以正常收发消息了,但是在生产环境中我们肯定是需要搭建分布式集群的,下面我们来看下集群模式的安装。
 
  • 集群模式的安装
在Kafka 1.x中我们发现已经将zookeeper集成进去了,如果直接是用kafka自带的zookeeper,那么部署起来更方便了。
  1. 修改zookeeper配置文件
[hadoop@master kafka_2.11-1.0.1]$ vim config/zookeeper.properties
dataDir=/home/hadoop/kafka/kafka_2.11-1.0.1/zData
dataLogDir=/home/hadoop/kafka/kafka_2.11-1.0.1/zLog
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=192.168.0.181:2888:3888
server.2=192.168.0.182:2888:3888
server.3=192.168.0.183:2888:3888
 
添加myid(另外两台服务器的id必须不一样,myid是zookeeper互相识别用的id):
echo “1” > /home/hadoop/kafka/kafka_2.11-1.0.1/zData/myid
 
  1. 修改kafka配置文件
vim config/server.properties
broker.id=0(另外两台服务器的id必须不一样)
listeners=PLAINTEXT://:9092
log.dirs=/home/hadoop/kafka/kafka_2.11-1.0.1/log
  1. 复制kafka整个安装目录到集群的各个节点
scp -r kafka_2.11-1.0.1 slaver01:/home/hadoop/kafka/
scp -r kafka_2.11-1.0.1 slaver02:/home/hadoop/kafka/
  1. 分别在其他节点上修改zookeeper的myid和kafka的broker.id
echo “2” > /home/hadoop/kafka/kafka_2.11-1.0.1/zData/myid
echo “3” > /home/hadoop/kafka/kafka_2.11-1.0.1/zData/myid
 
vim config/server.properties
broker.id=1
broker.id=2
  1. 启动集群
启动各个节点的zookeeper:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> zoo.out 2>&1 &
启动各个节点的kafka:
nohup bin/kafka-server-start.sh config/server.properties >> kafka.out 2>&1 &
  1. 创建一个多副本的topic
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic
Created topic “my-replicated-topic”.
 
然后我们通过以下命令可以发现该topic下的各个副本均匀分布到了各个节点broker上
 
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
 
  1. 发送接收消息测试
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic
>message 1
>message 2
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic my-replicated-topic
message 1
message 2
  1. 高可用测试
我们kill掉leader所在的broker节点
kill -9 19681
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
 
可以发现leader已经变成2了
  1. 使用Kafka Conect导入导出数据
echo -e “foo\nbar” > test.txt
启动连接:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
检查接收到的消息:
[hadoop@master kafka_2.11-1.0.1]$ tail -f test.sink.txt
foo
bar
 
通过consumer接收消息:
[hadoop@master kafka_2.11-1.0.1]$ bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic connect-test –from-beginning
{“schema”:{“type”:”string”,”optional”:false},”payload”:”foo”}
{“schema”:{“type”:”string”,”optional”:false},”payload”:”bar”}
 
支持Kafka 1.x安装使用已经基本讲完了,个人能力有限,难免有所纰漏,欢迎大家指正。

发表评论

电子邮件地址不会被公开。 必填项已用*标注