Kafka 默认情况下是没有启用安全机制,这让能连接到 Broker 的客户端可以为所欲为,自 Kafka 0.9.0.0 版本引入了安全配置,但是需要进行一些配置来开启它。Kafka 安全主要包含三个方面:认证(authentication),授权(authorization), 和信道加密(encryption)。其中认证机制和授权分别通过 SASL(Simple Authentication and Security Layer)和 ACL(Access Control List) 来实现。本篇主要演示 SASL + ACL 的配置,未涉及 SSL 信道加道,所以没有配置 Kerberos, 客户端与 Broker 之间的数据传输仍然以明文(PLAINTEXT) 传输,这在内网使用 Kafka 基本没问题。
开启 SASL 和 ACL 需要在 Broker 和 Client 端进行相应的配置,要为两端创建包含用户认证信息的 JAAS(Java Authentication and Authorization Service) 文件。听其名就是为 Java 代码服务的,不知客户端要支持别的语言(如 Python) 时应该如何配置客户端。
本例测试用的当前最新版的 Kafka 2.3.1。在 Kafka 的安装目录(kafka_2.12-2.3.1/config) 中创建服务端配置文件 jaas.conf
, 填入内容
1 2 3 4 5 6 7 8 |
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username = "admin" password = "admin_password" user_admin = "admin_password" user_reader = "reader_password" user_writer = "writer_password"; }; |
注意最后两行后的分别必不可少,以上往 Kafka Server 端加了三个用户,分别为 admin, reader, 和 write, 以及各自等号后对应的密码.
再到 Kafka 的配置文件 config/server.properties
后加上以下内容
1 2 3 4 5 6 |
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer listeners=SASL_PLAINTEXT://:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN super.users=User:admin |
指明用 SASL_PLAINTEXT 协议,及超级用户为 admin
。
现在可以依次启动 ZooKeeper 和 Kafka 了
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ export EXTRA_ARGS="-Djava.security.auth.login.config=config/jaas.conf"; bin/kafka-server-start.sh config/server.properties
至此 Kafka 服务端就支持用户认证,注意到 ZooKeeper 尚未支持用户认证,所以通过 --zookeeper ubuntu-server-1:2181
来执行的命令会绕过安全验证。假设 ZooKeeper 和 Kafka 均运行在主机名为 ubuntu-server-1
的机器上。
注:为什么可以用 EXTRA_ARGS
?如果我们打开 bin/kafka-server-start.sh
脚本,发现有
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
同时发现它调用的是 bin/kafka-run-class.sh
,在其中有
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""
fi
........
exec $JAVA $KAFKA_HEAPS_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -c $CLASSPATH $KAFKA_OPTS "$@"
所以也可以用 KAFKA_OPTS
来替代 EXTRA_ARGS
的功效。同样的 kafka-console-producer
, kafka-console-consumer
也是调用了 kafka-run-class.sh
, 所以后也是用了 KAFKA_OPTS
环境变量来指定这两命令的系统属性。
现在我们可以在客户端用以下命令创建一个 Topic test1
$ kafka-topics --zookeeper ubuntu-server-1:2181 --create --topic test1 --partitions 2 --replication-factor 1
Created topic test1.
因为 ZooKeeper 未启用安全,所以通过 ZooKeeper 仍然可以创建 Topic, 如果是指定 --bootstrap-server ubuntu-server-1:9092
的话
$ kafka-topics --bootstrap-server ubuntu-server-1:9092 --create --topic test1 --partitions 2 --replication-factor 1
以上命令会在 Kafka 服务端输出下面类似错误信息
[2019-10-29 03:50:07,396] INFO [SocketServer brokerId=0] Failed authentication with /172.28.128.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
因为 Kafka 服务端开启了安全认证,这时候试图以未认证的方式来启动一个 kafka-console-produer
后会有如下错误
$ kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1
>[2019-10-28 22:50:07,721] WARN [Producer clientId=console-producer] Bootstrap broker ubuntu-server-1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2019-10-28 22:50:08,083] WARN [Producer clientId=console-producer] Bootstrap broker ubuntu-server-1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
也就是说客户端也要加入安全认证,所以在客户端也要创建 JAAS 配置文件,我们分别测试 producer 和 consumer 需要的权限,所以先创建 producer 使用的 producer_jaas.conf
文件,写入内容
1 2 3 4 5 |
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username = "writer" password = "writer_password"; }; |
以新的方式启动 kafka-console-producer
$ export KAFKA_OPTS="-Djava.security.auth.login.config=producer_jaas.conf"; kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
这时候输入一条消息后会出现 Topic test1 不能被访问的错误
$ export KAFKA_OPTS="-Djava.security.auth.login.config=producer_jaas.conf"; kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>hello
[2019-10-28 23:28:05,256] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-10-28 23:28:05,257] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test1] (org.apache.kafka.clients.Metadata)
[2019-10-28 23:28:05,258] ERROR Error when sending message to topic test1 with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test1]
原因为未给该 writer
用户授权写 test1
Topic 的权限,还需要配置 ACL 规则,须执行的命令是
$ unset KAFKA_OPTS //如果未设置 KAFKA_OPTS 就不需要这一步
$ kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=ubuntu-server-1:2181 --add --allow-principal User:writer --operation Write --topic test1
Adding ACLs for resourceTopic:LITERAL:test1
:
User:writer has Allow permission for operations: Write from hosts: *Current ACLs for resource
Topic:LITERAL:test1
:
User:writer has Allow permission for operations: Write from hosts: *
User:reader has Allow permission for operations: Write from hosts: *
执行前请先把 KAFKA_OPTS
环境变量清除掉,授权后现在可以用 writer
用户向 test1
发送消息了。如果要让该用户能向所有 Topic 发送消息的话可以用 --topic *
指定所有。
Kafka ACL 规则还能配置允许 Client 的 IP, 更详细的规则可直接看 kafka-acls
的命令帮助或请参考官方文档:https://docs.confluent.io/current/kafka/authorization.html。默认时 Kafka 服务端的 allow.everyone.if.no.acl.found=false
, 所以它工作在白名单机制,只加给用户授权了才能访问相应资源,否则什么也访问不了。
$ export KAFKA_OPTS="-Djava.security.auth.login.config=producer_jaas.conf"; kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>hello
>kafka
>test done
>^C
要使用 reader
用户从 Topic test1
中读取消息的话,也需要建立一个 JAAS 文件,如 consumer_jaas.conf
,内容为
这儿直接说吧,一个 Consumer 需要的 ACL 规则得对相应 Topic 的 Read 权限再加上 Consumer Group 的 Read 权限,简单一点用下面的命令直接让 reader
用户有对所有 Topic 所有消费组的读权限
$ unset KAFKA_OPTS //如果未设置 KAFKA_OPTS 就不需要这一步
$ kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=ubuntu-server-1:2181 --add --allow-principal User:reader --operation Read --topic "*" --group "*"
Adding ACLs for resourceTopic:LITERAL:*
:
User:reader has Allow permission for operations: Read from hosts: *Adding ACLs for resource
Group:LITERAL:*
:
User:reader has Allow permission for operations: Read from hosts: *Current ACLs for resource
Topic:LITERAL:*
:
User:reader has Allow permission for operations: Read from hosts: *Current ACLs for resource
Group:LITERAL:*
:
User:reader has Allow permission for operations: Read from hosts: *
启动带安全验证的 kafka-console-consumer
export KAFKA_OPTS="-Djava.security.auth.login.config=consumer_jaas.conf"; kafka-console-consumer --bootstrap-server ubuntu-server:9092 --topic test1 --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --consumer-property group.id=test-group
hello
kafka
test done
其他的需要用 admin
用户的就为 admin
建立一个相应的 jaas.conf
文件,由于 admin
在 Kafka 服务端指定成了 super.user
, 所以 admin
是不受 ACL 限制的,它默认拥有所有的权限。
本文用的是 Kafka 的命令来验证 Producer 与 Consumer, 如果是用 Java 写的代码的话,在创建 Producer 或 Consumer 时只要设置前面用到的必须的属性即可。倘若还需再进一步启用 SSL 进行数据传输加密,可以用 Java 的 keytool 命令创建证书,再分发到客户端,此处不再多讲。据说启用了 SSL 之后 Kafka 的吞吐量会有 10% -40% 的下降。
本文链接 https://yanbin.blog/kafka-enable-saslacl/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
consumer_jaas.conf的内容跟producer_jaas.conf一样吗
一样的,consumer 和 producer 只有验证后对 topic 的操作权限不同
[…] 启用并测试 Kafka 的 SASL + ACL 认证授权 […]
清晰流畅,难得一见的优质文章