Kafka 集群间数据镜像实测

由于数据安全,网速等要求,许多公司都会建立多个数据中心,每个数据中心有独立的 Kafka 集群。为保持不同中心间的数据同步,就有必要在 Kafka 集群间进行数据镜像。kafka-mirror-maker 命令或应用 Kafka Connect 可用于在多个 Kafka 集群相同的 Topic 之间互间同步数据。

这里就来体验一下不同的 Kafka 集群间如何用 kafka-mirror-maker 进行 topic 数据镜像。测试环境选择用两个 Vagrant 虚拟机,当然同一个主机上在不同的 ZooKeeper chroot 或不同的端口中也能演示同样的功能。

首先要两启两个 Vagrant 虚拟机,这里用的是 Ubuntu Server 18.04。需要在本地建立两个目录, 分别是 ubuntu-server-1 和 ubuntu-server-2, 在各自目录中建立 Vagrantfile 文件,内容如下:

以下启动 Vagrant 虚拟机,安装 JDK8 和 启动 ZooKeeper, Kafka 分别要在两个目录 (ubuntu-server-1 和 ubuntu-server-2) 中各执行一遍。

注意:config.vm.hostname 对 Kafka 很重要,因为它内部是用主机名(或域名) 来定位的,如果不分别指定 config.vm.hostname 的话,它们会是相同的 ubuntu-disco64 机器,后面的 Kafka 命令可能总是会指向到同一个虚拟机中去,即使命令中指定具体的 IP 地址也不行。

启动 Vagrant

终端下进到 ubuntu-server-1(或 ubuntu-server-2) 目录,执行 vagrant up 选择连接外网的网络接口,将以此从 DHCP 上获得 IP 地址。再 vagrant ssh 进到 Vagrant 虚拟机终端。用 ipconfig 查看下 IP 地址,比如分别是 172.28.128.3172.28.128.4.

在 Vagrant 虚拟机中安装 JDK 8

安装 Amazon Corretoo 8 的话可参考 Amazon Corretto 8 Installation Instructions for Debian-Based and RPM-Based Linux Distributions)。在 Vagrant 虚拟机终端下,执行以下命令

安装启动 ZooKeeper 和 Kafka

仍然在 Vagrant 虚拟机的终端下,这里安装当前(2019/10/27) 最新版 2.3.1, 分别执行以下命令

注意,后两步需要在同一个虚拟机的两个终端中执行,或用 tmux; 或带上 -daemon 参数,或用 & 把 zookeeper 或 zookeeper 和 kafka 送入后台。

至此,两个独立的 Kafka 集群便创建好了,只不过它们都只有一个 kafka 节点,创建 Topic 时 --replication-factor 只能为 1。

创建 Kafka Topic

现在可以回到宿主机的终端下了,在 Mac OS X 下可以用 brew install kafka 安装的 Kafka 相关的命令, 如 kafka-topics,没有 .sh 后缀,或者也可用下载的 kafka_2.12-2.3.1.tgz, 解压后在它的 bin 中有一系列 Kafka 相关命令,比如 kafka-topics.sh。本文在 Mac OS X 下操作,所以直接用 brew install kafka 安装的命令,在其他平台下请写成相应的 .sh 命令。

注意,因为前面提到过 Kafka 内部是用主机名(或域名) 来定位机器,所以最好先在本机的 /etc/hosts 中加上

172.28.128.3 ubuntu-server-1
172.28.128.4 ubuntu-server-2

分别在两个 Kafka 集群中创建相同的 Topic test1

测试 Mirror Maker

我们可以使用 kafka-console-producerkafka-console-consumer 这两个 Kafka 内置的命令来测试。希望达到的效果是,往 ubuntu-server-1 的 test1 Topic 发送消息,在 ubuntu-server-2 的同样的 Topic test1 消费到消息,简单说就是从 ubuntu-server-1:test1 进去,从 ubuntu-server-2:test1 出来。这就需要在 ubuntu-server-1 与 ubuntu-server-2 两个集群间加一个桥梁,而 kafka-mirror-maker 就是担当的这一角色,它实质上做的事情就是作为一个 consumer 从  ubuntu-server-1:test1 取得消息,而后作为一个 producer 往 ubuntu-server-2:test1 发送消息。

 为此,我们可以事先打开两个终端

consumer

kafka-console-consumer --bootstrap-server ubuntu-server-2:9092 --topic test1 --from-beginning

producer

kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1
>hello
>world
>

现在我们无论怎么发送消息,消息都不能从集群 ubuntu-server-1 传送到集群 ubuntu-server-2 去。

执行 kafka-mirror-maker 命令必须指定 --consumer.config--producer.config 两个配置文件,即该 mirror maker 同时作为 consumer 与 producer 时的基本配置,它们分别为

consumer.properties

bootstrap.server=ubuntu-server-1:9092
group.id=mirror

producer.properties

bootstrap.server=ubuntu-server-2:9092

最后执行命令

kafka-mirror-maker --consumer.config consumer.properties --producer.config producer.properties --whitelist test1

现在回到前面那个 producer 再输入新消息的话就能在 consumer 端收到消息了,表明 ubuntu-server-1:test1 --> ubuntu-server-2:test2 之间连接起来了。

kafka-mirror-maker 除两配置文件外还有一些比较重要的参数,下面列出几个,全部参数可用 --help 查看到

  1. --whitelist:这是一个正则表达式指定要镜像哪些 Topic
  2. --blacklist:   也是一个正则表达式指定不要镜像哪些 Topic,比如用 --whitelist ".+" 指定了镜像所以 topic  时可用 --blacklist 进行排除
  3. --abort.on.send.failure: 若设置为 true, 当发送失败时则关闭 MirrorMaker。默认为 true
  4. --num.streams: 指定 MirrorMaker 线程数,默认为 1
  5. --offset.commit.interval.ms: 设定 MirrorMaker offset 提交间隔,默认为每 1 分钟

进一步测试,同名的 Topic 在两 Kafka 集群间有不同数量的 Partition 也没关系,因为 kafka-mirror-maker 只是从源集群取消息再发送到目标集群的同名 Topic 中去,不用理会多少个 Partition。而且虽说是镜像,如果中间有数据出入,或人为的往目标集群的 Topic 上发了消息,所以两边的 Offset 也没有严格的相关性。

前面我们启动了一个从  ubuntu-server-1:test1 往 ubuntu-server-2:test1 搬运的 kafka-mirror-maker, 头脑发热一下,假如我们同时启动一个从 ubuntu-server-2:test1 往 ubuntu-server-1:test1 搬运的反方向的 kafka-mirror-maker  会将如何呢?Kafka 并不排斥这个逆向的 Mirror, 因此这盛世真如你所愿,一条消息就能产生风暴,不停的在两个集群间来回晃动,持续写日志,死循环,暴硬盘。

相关链接:

  1. Kafka系列 (八) 跨集群数据镜像

2019-10-29: 单个 ZooKeeper 应用 chroot 来实现独立 ZooKeeper + Kafka 集群的配置方式:ZooKeeper 的启动方式不变。

$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

只需要启动一个 ZooKeeper 实例,只要 Kafka 的配置文件加个 ZooKeeper 的目录,并且 Kafka 日志指向不同地方。如

第一个 Kafka 实例的配置 server1.properties

第二个 Kafka 实例的配置 server2.properties

分别用

$ bin/kafka-server-start.sh -daemon config/server1.properties
$ bin/kafka-server-start.sh -daemon config/server2.properties

启动后用 ZooKeeper 的客户端连接上查看节点可以看到 kafka0 和 kafka1 为各个 Kafka 实例用到的 ZooKeeper 节点,实际表现为两个不同的 Kafka 集群

$ zkCli -server localhost
......................
[zk: localhost(CONNECTED) 0] ls /
[zookeeper, kafka1, kafka0]
[zk: localhost(CONNECTED) 1] ls /kafka0
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

类别: Kafka. 标签: , . 阅读(14). 订阅评论. TrackBack.

Leave a Reply

avatar