启用并测试 Kafka 的 SASL + ACL 认证授权

Kafka 默认情况下是没有启用安全机制,这让能连接到 Broker 的客户端可以为所欲为,自 Kafka 0.9.0.0 版本引入了安全配置,但是需要进行一些配置来开启它。Kafka 安全主要包含三个方面:认证(authentication),授权(authorization), 和信道加密(encryption)。其中认证机制和授权分别通过 SASL(Simple Authentication and Security Layer)和  ACL(Access Control List) 来实现。本篇主要演示 SASL + ACL 的配置,未涉及 SSL 信道加道,所以没有配置 Kerberos, 客户端与 Broker 之间的数据传输仍然以明文(PLAINTEXT) 传输,这在内网使用 Kafka 基本没问题。

开启 SASL 和 ACL 需要在 Broker 和 Client 端进行相应的配置,要为两端创建包含用户认证信息的 JAAS(Java Authentication and Authorization Service) 文件。听其名就是为 Java 代码服务的,不知客户端要支持别的语言(如 Python) 时应该如何配置客户端。

本例测试用的当前最新版的 Kafka 2.3.1。在 Kafka 的安装目录(kafka_2.12-2.3.1/config) 中创建服务端配置文件 jaas.conf, 填入内容

注意最后两行后的分别必不可少,以上往 Kafka Server 端加了三个用户,分别为 admin, reader, 和 write, 以及各自等号后对应的密码.

再到 Kafka 的配置文件 config/server.properties 后加上以下内容

指明用 SASL_PLAINTEXT 协议,及超级用户为 admin

现在可以依次启动 ZooKeeper 和 Kafka 了

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ export EXTRA_ARGS="-Djava.security.auth.login.config=config/jaas.conf"; bin/kafka-server-start.sh config/server.properties

至此 Kafka 服务端就支持用户认证,注意到 ZooKeeper 尚未支持用户认证,所以通过 --zookeeper ubuntu-server-1:2181 来执行的命令会绕过安全验证。假设 ZooKeeper 和 Kafka 均运行在主机名为 ubuntu-server-1 的机器上。

注:为什么可以用 EXTRA_ARGS?如果我们打开  bin/kafka-server-start.sh 脚本,发现有

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" 

同时发现它调用的是 bin/kafka-run-class.sh,在其中有

if [ -z "$KAFKA_OPTS" ]; then
    KAFKA_OPTS=""
fi
........
exec $JAVA $KAFKA_HEAPS_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -c $CLASSPATH $KAFKA_OPTS "$@"

所以也可以用 KAFKA_OPTS 来替代 EXTRA_ARGS 的功效。同样的 kafka-console-producer, kafka-console-consumer 也是调用了 kafka-run-class.sh, 所以后也是用了 KAFKA_OPTS 环境变量来指定这两命令的系统属性。

现在我们可以在客户端用以下命令创建一个 Topic test1

$ kafka-topics --zookeeper ubuntu-server-1:2181 --create --topic test1 --partitions 2 --replication-factor 1
Created topic test1.

因为 ZooKeeper 未启用安全,所以通过 ZooKeeper 仍然可以创建 Topic, 如果是指定 --bootstrap-server ubuntu-server-1:9092 的话

$ kafka-topics --bootstrap-server ubuntu-server-1:9092 --create --topic test1 --partitions 2 --replication-factor 1

以上命令会在 Kafka 服务端输出下面类似错误信息

[2019-10-29 03:50:07,396] INFO [SocketServer brokerId=0] Failed authentication with /172.28.128.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

因为 Kafka 服务端开启了安全认证,这时候试图以未认证的方式来启动一个 kafka-console-produer 后会有如下错误

$ kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1
>[2019-10-28 22:50:07,721] WARN [Producer clientId=console-producer] Bootstrap broker ubuntu-server-1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2019-10-28 22:50:08,083] WARN [Producer clientId=console-producer] Bootstrap broker ubuntu-server-1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

也就是说客户端也要加入安全认证,所以在客户端也要创建 JAAS 配置文件,我们分别测试 producer  和 consumer 需要的权限,所以先创建 producer 使用的 producer_jaas.conf 文件,写入内容

以新的方式启动 kafka-console-producer

$ export KAFKA_OPTS="-Djava.security.auth.login.config=producer_jaas.conf"; kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

这时候输入一条消息后会出现 Topic test1 不能被访问的错误

$ export KAFKA_OPTS="-Djava.security.auth.login.config=producer_jaas.conf"; kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>hello
[2019-10-28 23:28:05,256] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-10-28 23:28:05,257] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test1] (org.apache.kafka.clients.Metadata)
[2019-10-28 23:28:05,258] ERROR Error when sending message to topic test1 with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test1]

原因为未给该 writer 用户授权写 test1 Topic 的权限,还需要配置 ACL 规则,须执行的命令是

$ unset KAFKA_OPTS                        //如果未设置 KAFKA_OPTS 就不需要这一步
$ kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=ubuntu-server-1:2181 --add --allow-principal User:writer --operation Write --topic test1
Adding ACLs for resource Topic:LITERAL:test1:
User:writer has Allow permission for operations: Write from hosts: *

Current ACLs for resource Topic:LITERAL:test1:
User:writer has Allow permission for operations: Write from hosts: *
User:reader has Allow permission for operations: Write from hosts: *

执行前请先把  KAFKA_OPTS 环境变量清除掉,授权后现在可以用 writer 用户向 test1 发送消息了。如果要让该用户能向所有 Topic 发送消息的话可以用 --topic * 指定所有。

Kafka ACL 规则还能配置允许 Client 的 IP, 更详细的规则可直接看 kafka-acls 的命令帮助或请参考官方文档:https://docs.confluent.io/current/kafka/authorization.html。默认时 Kafka 服务端的  allow.everyone.if.no.acl.found=false, 所以它工作在白名单机制,只加给用户授权了才能访问相应资源,否则什么也访问不了。

$ export KAFKA_OPTS="-Djava.security.auth.login.config=producer_jaas.conf"; kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>hello
>kafka
>test done
>^C

要使用 reader 用户从 Topic test1 中读取消息的话,也需要建立一个 JAAS 文件,如 consumer_jaas.conf,内容为

这儿直接说吧,一个 Consumer 需要的 ACL 规则得对相应 Topic 的 Read 权限再加上 Consumer Group 的 Read  权限,简单一点用下面的命令直接让 reader 用户有对所有 Topic 所有消费组的读权限

$ unset KAFKA_OPTS                 //如果未设置 KAFKA_OPTS 就不需要这一步
$ kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=ubuntu-server-1:2181 --add --allow-principal User:reader --operation Read --topic "*" --group "*"
Adding ACLs for resource Topic:LITERAL:*:
User:reader has Allow permission for operations: Read from hosts: *

Adding ACLs for resource Group:LITERAL:*:
User:reader has Allow permission for operations: Read from hosts: *

Current ACLs for resource Topic:LITERAL:*:
User:reader has Allow permission for operations: Read from hosts: *

Current ACLs for resource Group:LITERAL:*:
User:reader has Allow permission for operations: Read from hosts: *

启动带安全验证的 kafka-console-consumer

export KAFKA_OPTS="-Djava.security.auth.login.config=consumer_jaas.conf"; kafka-console-consumer --bootstrap-server ubuntu-server:9092 --topic test1 --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --consumer-property group.id=test-group
hello
kafka
test done

其他的需要用 admin 用户的就为 admin 建立一个相应的 jaas.conf 文件,由于 admin 在 Kafka 服务端指定成了 super.user, 所以 admin 是不受 ACL 限制的,它默认拥有所有的权限。

本文用的是 Kafka 的命令来验证 Producer 与 Consumer, 如果是用 Java 写的代码的话,在创建 Producer 或  Consumer 时只要设置前面用到的必须的属性即可。倘若还需再进一步启用 SSL 进行数据传输加密,可以用 Java 的 keytool 命令创建证书,再分发到客户端,此处不再多讲。据说启用了 SSL 之后 Kafka 的吞吐量会有 10% -40% 的下降。

类别: Kafka. 标签: , , . 阅读(20). 订阅评论. TrackBack.

Leave a Reply

avatar