分布式消息队列Kafka学习笔记
Kafka概述
a distributed streaming platform
Kafka架构和核心概念
producer, 生产者,生产馒头。
consumer, 消费者,吃馒头。
broker, 篮子。
topic, 主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃。
Zookeeper集群部署
安装包解压,
tar -xzvf zookeeper-3.4.5.tar.gz -C /export/servers
zookeeper配置文件修改,
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
#数据目录. 可以是任意目录,其中的dataDir目录和dataLogDir需要提前建立好
#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
dataDir=/export/servers/data/zookeeper
#log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置,其中的dataDir目录和dataLogDir需要提前建立好
#注意 应该谨慎地选择日志存放的位置,使用专用的日志存储设备能够大大地提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会在很大程度上影响系统的性能。
dataLogDir=/export/servers/logs/zookeeper
# 主机名:心跳端口:数据端口
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888
myid记录到数据文件夹,
# zk01
mkdir -p /export/servers/data/zookeeper
echo 1 > myid
cat myid
zookeeper分发到其他节点,
sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk02:/export/servers/
# zk02
mkdir -p /export/servers/data/zookeeper
echo 2 > myid
sudo scp -r /export/servers/zookeeper-3.4.5 hadoop@zk03:/export/servers/
# zk03
mkdir -p /export/servers/data/zookeeper
echo 3 > myid
配置环境变量,
vi /etc/profile
export ZK_HOME=/export/servers/zookeeper-3.4.5
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin
source /etc/profile
启动,
# 启动
zkServer.sh start
# 查看集群状态和主从信息
zkServer.sh status
# 查看进程
jps -m
export变量作用域解析,
export A=1,定义的变量,会对自己所在的shell进程及子进程生效。
B=1,定义的变量,只对自己所在的shell进程生效。
在script.sh中定义的变量,在当前登陆的shell进程中,source script.sh时,脚本中定义的变量也会进入当前登陆的进程。
要在父进程shell可见,可source一下定义export变量的脚本文件,让当前shell可见。
Zookeeper集群启动和停止脚本,可先配置集群机器间的免密登录。
#!/bin/sh
# start-zkServer-cluster.sh
zkServers="zk01 zk02 zk03"
echo "start zkServer..."
for i in $zkServers
do
echo "start zkServer on ${i} ..."
ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh start > /dev/null 2>&1 &"
done
#!/bin/sh
# stop-zkServer-cluster.sh
zkServers="zk01 zk02 zk03"
echo "stop zkServer..."
for i in $zkServers
do
echo "stop zkServer on ${i} ..."
ssh $i "source /etc/profile;nohup sh /export/servers/zookeeper-3.4.5/bin/zkServer.sh stop > /dev/null 2>&1 &"
done
Kafka集群部署及使用
安装包解压,
tar -xzvf kafka_2.11-0.9.0.1.tar.gz -C /export/servers
$KAFKA_HOME/config/server.properties修改,集群的每个节点的broker.id和host.name都需要修改。
#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#kafka运行日志存放的路径
log.dirs=/export/servers/logs/kafka
#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01
Kafka集群启动和停止脚本,
#!/bin/bash
#start-kafka-cluster.sh
brokers="kafka01 kafka02 kafka03"
kafka_home="/export/servers/kafka_2.11-0.9.0.1"
for i in $brokers
do
echo "Starting kafka on ${i} ... "
ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
if [[ $? -eq 0 ]]; then
echo "Start kafka on ${i} is OK !"
fi
done
echo all kafka are started !
exit 0
#!/bin/bash
#stop-kafka-cluster.sh
brokers="kafka01 kafka02 kafka03"
kafka_home="/export/servers/kafka_2.11-0.9.0.1"
for i in $brokers
do
echo "stop kafka on ${i} ... "
ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-stop.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &"
if [[ $? -eq 0 ]]; then
echo "stop kafka on ${i} is OK !"
fi
done
echo all kafka are stoped !
exit 0
启动Kafka,start-kafka-cluster.sh
。
创建topic,
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic hello_topic
查看所有topic,
bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka-topics.sh --list --zookeeper localhost:2181
发送消息,
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka-console-producer.sh --broker-list zk01:9092 --topic hello_topic
消费消息,
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic --from-beginning
查看topic详细信息,
kafka-topics.sh -describe --zookeeper zk01:2181 --topic hello_topic
整合Flume和Kafka完成实时数据的采集
在分布式日志收集框架Flume学习笔记的应用需求3中,将A服务器上的日志实时采集到B服务器,打印到控制台,通过整合Flume和Kafka,把logger sink改为kafka sink,这里的kafka sink是作为producer的角色,通过控制台起一个consumer进行消费来验证。
技术选型:
exec-memory-avro.conf: exec source + memory channel + avro sink
avro-memory-logger.conf: avro source + memory channel + kafka sink
# avro-memory-kafka.conf
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 192.168.169.100
avro-memory-kafka.sources.avro-source.port = 44444
# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = kafka01:9092,kafka02:9092,kafka03:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
验证,先启动avro-memory-kafka.conf,因为它监听192.168.169.100的44444端口,
flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf/myconf \
--conf-file $FLUME_HOME/conf/myconf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf/myconf \
--conf-file $FLUME_HOME/conf/myconf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
在控制台启动消费者验证,
kafka-console-consumer.sh --zookeeper zk01:2181 --topic hello_topic
echo hellokafka1 >> data.log
echo hellokafka2 >> data.log
本文首发于steem,感谢阅读,转载请注明。
微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。
读者交流电报群
知识星球交流群
@padluo, 这是小可可我在steemit最好的邂逅,好喜欢你的贴(^∀^)哇~~~