Kafka Connect 介绍和使用

继续把 Kafka 捋一捋,还剩两个主要的组件了,分别为 Kafka Connect 和 Kafka Streams。而其中的 Kafka Connect 是在 Kafka 0.9.0.0 开始加入的我,Connect 的出现让 Kafka 与外部世界更紧密连接起来了,进而可以让其他外围组件通过 Connect 的 Source 与 Sink 紧密的团结在以 Kafka 为核心的消息中心。从此不再总是以标准的 Kafka Consumer 和 Producer 与外部联络。

Kafka Connect 主要由两部分组成,Source Connector 和  Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector  需要用到 org.apache.kafka:connect-api 组件,不包含在 kafka-clients 依赖中,其中定义了两个主要抽像类

  1. org.apache.kafka.connect.source.SourceConnector extends Connector
  2. org.apache.kafka.connect.sink.SinkConnector extends Connector

阅读全文 >>

启用并测试 Kafka 的 SASL + ACL 认证授权

Kafka 默认情况下是没有启用安全机制,这让能连接到 Broker 的客户端可以为所欲为,自 Kafka 0.9.0.0 版本引入了安全配置,但是需要进行一些配置来开启它。Kafka 安全主要包含三个方面:认证(authentication),授权(authorization), 和信道加密(encryption)。其中认证机制和授权分别通过 SASL(Simple Authentication and Security Layer)和  ACL(Access Control List) 来实现。本篇主要演示 SASL + ACL 的配置,未涉及 SSL 信道加道,所以没有配置 Kerberos, 客户端与 Broker 之间的数据传输仍然以明文(PLAINTEXT) 传输,这在内网使用 Kafka 基本没问题。

开启 SASL 和 ACL 需要在 Broker 和 Client 端进行相应的配置,要为两端创建包含用户认证信息的 JAAS(Java Authentication and Authorization Service) 文件。听其名就是为 Java 代码服务的,不知客户端要支持别的语言(如 Python) 时应该如何配置客户端。 阅读全文 >>

Kafka 集群间数据镜像实测

由于数据安全,网速等要求,许多公司都会建立多个数据中心,每个数据中心有独立的 Kafka 集群。为保持不同中心间的数据同步,就有必要在 Kafka 集群间进行数据镜像。kafka-mirror-maker 命令或应用 Kafka Connect 可用于在多个 Kafka 集群相同的 Topic 之间互间同步数据。

这里就来体验一下不同的 Kafka 集群间如何用 kafka-mirror-maker 进行 topic 数据镜像。测试环境选择用两个 Vagrant 虚拟机,当然同一个主机上在不同的 ZooKeeper chroot 或不同的端口中也能演示同样的功能。

首先要两启两个 Vagrant 虚拟机,这里用的是 Ubuntu Server 18.04。需要在本地建立两个目录, 分别是 ubuntu-server-1 和 ubuntu-server-2, 在各自目录中建立 Vagrantfile 文件,内容如下:

以下启动 Vagrant 虚拟机,安装 JDK8 和 启动 ZooKeeper, Kafka 分别要在两个目录 (ubuntu-server-1 和 ubuntu-server-2) 中各执行一遍。 阅读全文 >>

Kafka Producer 设置 Interceptor 来统计消息

Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send();暂停时间由 max.block.ms 决定,默认为 60 秒。producer.send() 返回一个 Future<RecordMetadata>, 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。

这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理,自然要在 a 与 b 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。

Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。 阅读全文 >>