本文实验的版本为 kafka_2.12-2.8.0
。
从官网下载包文件后,可以直接在本地启动一个实例:
# 先启动 zookeeper 服务
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
# 然后启动 Kafka server
kafka/bin/kafka-server-start.sh kafka/config/server.properties
Kafka 使用测试
Kafka 安装包提供了一些测试用的 shell 脚本,可以直接使用
# 创建一个 Topic
kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看已经创建的 Topic
kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
# 创建生产者,运行下面命令后,直接在 cmd 敲内容 + Enter
kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 创建消费者
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
配置 Kafka Server SASL/PLAIN 认证
配置认证好像比较复杂,网上的文章以及官方文档,写的内容都很多,理解起来费劲,最后明白只需要看这里的配置就行: https://kafka.apache.org/documentation/#security_sasl_plain
首先创建配置文件 kafka/config/kafka_server_jaas.conf
cd kafka_2.12-2.8.0/config
vim kafka_server_jaas.conf
# 内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_test="testpass";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_test="testpass";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
这里定义用户的方式非常奇葩,很难理解。username 和 password 两字段定义 Kafka brokers 内部沟通的用户密码,user_用户名
配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。此外可以再添加新用户,如添加 user_alice=”alice-secret” 。
KafkaServer 和 KafkaClient 是 Kafka Brokers 之间,以及 Kafka 客户端通讯的配置,Server 和 Client 是 Kafka 与 Zookeeper 之间的配置。注意这里和 Zookeeper 配置的 jaas 有点不一样,Kafka 这里是 org.apache.kafka.common.security.plain.PlainLoginModule ,Zookeeper 是 org.apache.zookeeper.server.auth.DigestLoginModule 。
然后编辑启动脚本 kafka/bin/kafka-server-start.sh
,在后最添加一行 export 语句
...
export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
再编辑配置文件 kafka/config/server.properties
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
# 参数 allow.everyone.if.no.acl.found
# 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问
# 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false
auto.create.topics.enable=false
super.users=User:admin
zookeeper.set.acl=true
最后再重新启动 kafka 就可以了。我们在 jaas 文件中配置了两用户,admin 和 test,其中我们设置了 admin 为超级用户。后面我们创建 topic 和 ACL 授权都将以 admin 用户来操作,而 test 用户只有读写 topic 权限,无法删除 topic 。
配置 Zookeeper
由于 Kafka 的 metadata 数据是保存在 zookeeper 中的,所以需要设置 zookeeper 支持 SASL 验证,然后配置权限,禁止未登录用户随便删除 Topic 等。
编辑配置文件 zookeeper/conf/zoo.cfg
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
其中 authProvider.1 这里的 .1 是 server_id ,多个 zookeeper 节点每个都要配置,例如你有 3 台 zk,那么需要再加上 authProvider.2 和 authProvider.3 等。
添加 jaas 配置文件 zookeeper/conf/zookeeper_jaas.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
修改 zookeeper/bin/zkEnv.sh ,添加一行
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"
这里因为我是用的单独的 Zookeeper 程序包,如果你用的是 Kafka 自带的 Zookeeper,那 SERVER_JVMFLAGS 要改成 KAFKA_OPTS 。滚动重启 Zookeeper 。
配置 zkCli.sh 使用上鉴权。创建配置文件 zookeeper/conf/adminclient_jaas.conf
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="adminpass";
};
然后修改 zookeeper/bin/zkCli.sh ,在 java 后面添加
"$JAVA" "-Djava.security.auth.login.config=${ZOOBIN}/../conf/adminclient_jaas.conf" 后面接上原来的参数
配置 Kafka ACL 权限
先修改 Kafka 里的 kafka/bin/zookeeper-security-migration.sh 文件,添加
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/release/dp_kafka/config/kafka_server_jaas.conf"
然后运行命令 ,参考这里的文档
kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl secure
这样 Zookeeper 里面的 metadata 都加上权限了。可以上 Zookeeper 验证下
zookeeper/bin/zkCli.sh
# 登录进 shell 后,执行 getAcl 指令
getAcl /brokers/topics
'sasl,'admin
: cdrwa
'world,'anyone
: r
为了用上 Kafka 自带的 shell 工具,我们要配置 jaas 认证,新建一个 kafka/config/adminclient-configs.conf
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="admin" password="adminpass";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
使用
kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list
一些 ACL 命令使用,参考文档
# 列出 temp 这个 topic 的 ACL 列表
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list --topic "temp"
# 对 temp 这个 topic 对用户 test 授权
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --add --allow-principal User:test --operation Read --operation Write --operation AlterConfigs --operation Describe --operation DescribeConfigs --operation Alter --topic "temp"
# 去掉 delete 权限
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --remove --allow-principal User:test --operation Delete --topic "temp"
配置 Consumer 和 Producer
如果 Kafka 配置了认证,再用脚本消费数据就会报错
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
WARN [Consumer clientId=consumer-console-consumer-93884-1, groupId=console-consumer-93884] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
# 以下是 Kafka Server 端报错日志
INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
修改 config/consumer.properties
,添加以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="adminpass";
config/producer.properties
参考上面一样的修改。
最后再尝试运行上面的命令。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--consumer.config config/consumer.properties --topic test --from-beginning
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test \
--producer.config=config/producer.properties
./bin/kafka-topics.sh --list --bootstrap-server=localhost:9092 \
--command-config config/consumer.properties
Python 端添加认证代码如下
from kafka import KafkaConsumer
consumer = KafkaConsumer('topic_name', group_id='test', bootstrap_servers=["127.0.0.1:9092"],
auto_offset_reset='earliest', enable_auto_commit=False,
security_protocol= 'SASL_PLAINTEXT',
sasl_mechanism= 'PLAIN',
sasl_plain_username= 'testuser',
sasl_plain_password= 'testpass',
)
for message in consumer:
print(message)
Java 端认证代码如下
import org.apache.kafka.clients.consumer.KafkaConsumer;
String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, "testuser", "testpass");
props = new Properties();
props.put("bootstrap.servers", brokers);
...
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", jaasCfg);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
配置管理界面 Kowl
这里使用 https://github.com/cloudhut/kowl 作为 Web 管理工具,参考项目里的 Dockerfile,可以直接从源码编译运行
git clone https://github.com/cloudhut/kowl.git
cd kowl/backend
go build -o ./bin/kowl ./cmd/api/
# 可执行文件在 ./kowl/backend/bin/kowl
cp ./kowl/backend/bin/kowl ./kowl/
# 编译前端
cd kowl/frontend
npm install && npm run build
cp -r build ./kowl
# 创建配置文件
cd kowl && wget https://github.com/cloudhut/kowl/blob/master/docs/config/kowl.yaml
# 运行 kowl
./kowl -config.filepath kowl.yaml
# 最终目录结构是,kowl 可执行文件与前端 build 文件夹在一起
kowl 示例配置
kafka:
brokers:
- 127.0.0.1:9092
# clientId: kowl
# rackId: # In multi zone Kafka clusters you can reduce traffic costs by consuming messages from replica brokers in the same zone
sasl:
enabled: true
username: testuser
password: testpass # This can be set via the --kafka.sasl.password flag as well
mechanism: PLAIN # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER and AWS_MSK_IAM are supported
已经加上鉴权的 zookeeper 去掉鉴权
如果对已经加上鉴权的 zookeeper、kafka 集群,现在不想要鉴权了,要回退回去,操作如下。
首先运行命令,把 kafka 在 zookeeper 里面的 metadata 数据去掉认证:
kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl unsecure
去掉后,可以上 zookeeper 使用 getAcl 命令确认。
然后,修改 zookeeper 文件 zookeeper/bin/zkEnv.sh ,去掉之前加的
# 把这一行注释掉
# export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"
滚动重启 zookeeper。
再来修改 kafka 配置,修改 kafka/config/kafka_server_jaas.conf ,删除连接 zookeeper 的配置 Client 和 Server 。再修改 kafka/config/server.properties ,删除配置项 zookeeper.set.acl=true 。
再滚动重启 kafka 。