Skip to content

kafka 单机部署

一、下载

shell
# 下载kafka-华为镜像站下载的3.0.0版本
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.12-3.0.0.tgz --no-check-certificate

二、安装

shell
# 解压
tar -zxvf kafka_2.12-3.0.0.tgz -C /usr/local/
# 修改文件夹名称
mv /usr/local/kafka_2.12-3.0.0/ /usr/local/kafka

三、配置、启动

1、修改 logs 文件地址(可选)

shell
mkdir -p /usr/local/kafka/logdirs
#注释掉原有的配置
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/server.properties  -i
#加入新配置
grep -q '^#log.dirs=' /usr/local/kafka/config/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs' /usr/local/kafka/config/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs' >> /usr/local/kafka/config/server.properties

2、修改 zookeeper 配置(可选)

如果需要,也可以修改 zookeeper 的配置

properties
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000

3、启动 zookeeper 和 kafka

shell
# 如果需要启动zookeeper的话
cd /usr/local/kafka && ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/server.properties

4、关闭 kafka

shell
# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

四、Kraft 模式

1、修改配置文件

shell
mkdir -p /usr/local/kafka/logdirs2
#注释掉原有的配置
sed "s/^advertised.listeners=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
sed "s/^log.dirs=.*/#&/" /usr/local/kafka/config/kraft/server.properties  -i
#加入新配置 **注意监听的ip为公网ip或内网ip
grep -q '^#advertised.listeners=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#advertised.listeners=.*/a\advertised.listeners=PLAINTEXT://119.91.217.34:9092' /usr/local/kafka/config/kraft/server.properties || echo 'advertised.listeners=PLAINTEXT://ip:9092' >> /usr/local/kafka/config/kraft/server.properties
grep -q '^#log.dirs=' /usr/local/kafka/config/kraft/server.properties && sed -i '/^#log.dirs=.*/a\log.dirs=/usr/local/kafka/logdirs2' /usr/local/kafka/config/kraft/server.properties || echo 'log.dirs=/usr/local/kafka/logdirs2' >> /usr/local/kafka/config/kraft/server.properties

2、初始化集群数据目录

生成存储目录唯一 ID 并格式化 kafka 存储目录。

properties
#单机可以直接执行改命令,集群需要分开执行,集群所有机子的唯一id要相同
/usr/local/kafka/bin/kafka-storage.sh random-uuid|xargs -n1 -I {} /usr/local/kafka/bin/kafka-storage.sh format -t {} -c /usr/local/kafka/config/kraft/server.properties

3、启动 kafka

shell
# 启动命令,如果需要后台启动,则加上 -daemon 参数即可
 cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

4、关闭 kafka

shell
# 关闭命令
 cd /usr/local/kafka && ./bin/kafka-server-stop.sh

五、验证

1.首先创建一个名为 chenlj 的 topic :

shell
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chenlj

2.创建完成以后,可以使⽤命令来列出⽬前已有的 topic 列表

shell
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

3.查看 chenlj 主题的详情:

shell
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092  --topic chenlj

4.接下来创建一个生产者,用于在 chenlj 这个 topic 上生产消息:

shell
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic chenlj

5.接下来创建一个消费者,用于在 chenlj 这个 topic 上获取消息:

shell
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chenlj

此时生产者发出的消息,在消费者端可以获取到。

六、配置环境变量(全局可以使用 kafka 命令)

shell
cat >>/etc/profile.d/kafka.sh <<EOF
export KAFKA_HOME=/usr/local/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF
source /etc/profile

七、配置远程访问

1、开启防火墙端口

shell
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload

八、设置开机自启

shell
#待完善