由于数据安全,网速等要求,许多公司都会建立多个数据中心,每个数据中心有独立的 Kafka 集群。为保持不同中心间的数据同步,就有必要在 Kafka 集群间进行数据镜像。kafka-mirror-maker
命令或应用 Kafka Connect 可用于在多个 Kafka 集群相同的 Topic 之间互间同步数据。
这里就来体验一下不同的 Kafka 集群间如何用 kafka-mirror-maker
进行 topic 数据镜像。测试环境选择用两个 Vagrant 虚拟机,当然同一个主机上在不同的 ZooKeeper chroot 或不同的端口中也能演示同样的功能。
首先要两启两个 Vagrant 虚拟机,这里用的是 Ubuntu Server 18.04。需要在本地建立两个目录, 分别是 ubuntu-server-1 和 ubuntu-server-2, 在各自目录中建立 Vagrantfile 文件,内容如下:
1 2 3 4 5 6 7 8 |
Vagrant.configure("2") do |config| config.vm.box = "ubuntu/disco64" config.vm.provider "virtualbox" do |vb| vb.memory = "2048" end config.vm.network "private_network", type: "dhcp" config.vm.hostname = "ubuntu-server-1" # 另一台机器指定 ubuntu-server-2 end |
以下启动 Vagrant 虚拟机,安装 JDK8 和 启动 ZooKeeper, Kafka 分别要在两个目录 (ubuntu-server-1 和 ubuntu-server-2) 中各执行一遍。
注意:config.vm.hostname
对 Kafka 很重要,因为它内部是用主机名(或域名) 来定位的,如果不分别指定 config.vm.hostname
的话,它们会是相同的 ubuntu-disco64
机器,后面的 Kafka 命令可能总是会指向到同一个虚拟机中去,即使命令中指定具体的 IP 地址也不行。
启动 Vagrant
终端下进到 ubuntu-server-1(或 ubuntu-server-2) 目录,执行 vagrant up
选择连接外网的网络接口,将以此从 DHCP 上获得 IP 地址。再 vagrant ssh
进到 Vagrant 虚拟机终端。用 ipconfig
查看下 IP 地址,比如分别是 172.28.128.3
和 172.28.128.4
.
在 Vagrant 虚拟机中安装 JDK 8
安装 Amazon Corretoo 8 的话可参考 Amazon Corretto 8 Installation Instructions for Debian-Based and RPM-Based Linux Distributions)。在 Vagrant 虚拟机终端下,执行以下命令
1 2 3 |
sudo apt-get update && sudo apt-get install java-common wget https://d3pxv6yz143wms.cloudfront.net/8.232.09.1/java-1.8.0-amazon-corretto-jdk_8.232.09-1_amd64.deb sudo dpkg --install java-1.8.0-amazon-corretto-jdk_8.232.09-1_amd64.deb |
安装启动 ZooKeeper 和 Kafka
仍然在 Vagrant 虚拟机的终端下,这里安装当前(2019/10/27) 最新版 2.3.1, 分别执行以下命令
1 2 3 4 5 |
wget http://mirror.reverse.net/pub/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz tar xzvf kafka_2.12-2.3.1.tgz cd kafka_2.12-2.3.1 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties |
注意,后两步需要在同一个虚拟机的两个终端中执行,或用 tmux; 或带上 -daemon 参数,或用 &
把 zookeeper 或 zookeeper 和 kafka 送入后台。
至此,两个独立的 Kafka 集群便创建好了,只不过它们都只有一个 kafka 节点,创建 Topic 时 --replication-factor
只能为 1。
创建 Kafka Topic
现在可以回到宿主机的终端下了,在 Mac OS X 下可以用 brew install kafka
安装的 Kafka 相关的命令, 如 kafka-topics
,没有 .sh
后缀,或者也可用下载的 kafka_2.12-2.3.1.tgz
, 解压后在它的 bin
中有一系列 Kafka 相关命令,比如 kafka-topics.sh
。本文在 Mac OS X 下操作,所以直接用 brew install kafka
安装的命令,在其他平台下请写成相应的 .sh
命令。
注意,因为前面提到过 Kafka 内部是用主机名(或域名) 来定位机器,所以最好先在本机的 /etc/hosts
中加上
172.28.128.3 ubuntu-server-1
172.28.128.4 ubuntu-server-2
分别在两个 Kafka 集群中创建相同的 Topic test1
1 2 |
kafka-topics --bootstrap-server ubuntu-server-1:9092 --create --topic test1 --partitions 2 --replication-factor 1 kafka-topics --bootstrap-server ubuntu-server-2:9092 --create --topic test1 --partitions 2 --replication-factor 1 |
测试 Mirror Maker
我们可以使用 kafka-console-producer
和 kafka-console-consumer
这两个 Kafka 内置的命令来测试。希望达到的效果是,往 ubuntu-server-1 的 test1
Topic 发送消息,在 ubuntu-server-2 的同样的 Topic test1
消费到消息,简单说就是从 ubuntu-server-1:test1
进去,从 ubuntu-server-2:test1
出来。这就需要在 ubuntu-server-1 与 ubuntu-server-2 两个集群间加一个桥梁,而 kafka-mirror-maker
就是担当的这一角色,它实质上做的事情就是作为一个 consumer 从 ubuntu-server-1:test1
取得消息,而后作为一个 producer 往 ubuntu-server-2:test1
发送消息。
为此,我们可以事先打开两个终端
consumer
kafka-console-consumer --bootstrap-server ubuntu-server-2:9092 --topic test1 --from-beginning
producer
kafka-console-producer --broker-list ubuntu-server-1:9092 --topic test1
>hello
>world
>
现在我们无论怎么发送消息,消息都不能从集群 ubuntu-server-1 传送到集群 ubuntu-server-2 去。
执行 kafka-mirror-maker
命令必须指定 --consumer.config
和 --producer.config
两个配置文件,即该 mirror maker 同时作为 consumer 与 producer 时的基本配置,它们分别为
consumer.properties
bootstrap.server=ubuntu-server-1:9092
group.id=mirror
producer.properties
bootstrap.server=ubuntu-server-2:9092
最后执行命令
kafka-mirror-maker --consumer.config consumer.properties --producer.config producer.properties --whitelist test1
现在回到前面那个 producer 再输入新消息的话就能在 consumer 端收到消息了,表明 ubuntu-server-1:test1 --> ubuntu-server-2:test2 之间连接起来了。
kafka-mirror-maker
除两配置文件外还有一些比较重要的参数,下面列出几个,全部参数可用 --help
查看到
- --whitelist:这是一个正则表达式指定要镜像哪些 Topic
- --blacklist: 也是一个正则表达式指定不要镜像哪些 Topic,比如用 --whitelist ".+" 指定了镜像所以 topic 时可用 --blacklist 进行排除
- --abort.on.send.failure: 若设置为 true, 当发送失败时则关闭 MirrorMaker。默认为 true
- --num.streams: 指定 MirrorMaker 线程数,默认为 1
- --offset.commit.interval.ms: 设定 MirrorMaker offset 提交间隔,默认为每 1 分钟
进一步测试,同名的 Topic 在两 Kafka 集群间有不同数量的 Partition 也没关系,因为 kafka-mirror-maker
只是从源集群取消息再发送到目标集群的同名 Topic 中去,不用理会多少个 Partition。而且虽说是镜像,如果中间有数据出入,或人为的往目标集群的 Topic 上发了消息,所以两边的 Offset 也没有严格的相关性。
前面我们启动了一个从 ubuntu-server-1:test1 往 ubuntu-server-2:test1 搬运的 kafka-mirror-maker
, 头脑发热一下,假如我们同时启动一个从 ubuntu-server-2:test1 往 ubuntu-server-1:test1 搬运的反方向的 kafka-mirror-maker
会将如何呢?Kafka 并不排斥这个逆向的 Mirror, 因此这盛世真如你所愿,一条消息就能产生风暴,不停的在两个集群间来回晃动,持续写日志,死循环,暴硬盘。
相关链接:
2019-10-29: 单个 ZooKeeper 应用 chroot 来实现独立 ZooKeeper + Kafka 集群的配置方式:ZooKeeper 的启动方式不变。
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
只需要启动一个 ZooKeeper 实例,只要 Kafka 的配置文件加个 ZooKeeper 的目录,并且 Kafka 日志指向不同地方。如
第一个 Kafka 实例的配置 server1.properties
1 2 3 4 |
broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/tmp/kafka-logs/broker0 zookeeper.connect=localhost:2181/kafka0 |
第二个 Kafka 实例的配置 server2.properties
1 2 3 4 |
broker.id=1 listeners=PLAINTEXT://localhost:9092 log.dirs=/tmp/kafka-logs/broker1 zookeeper.connect=localhost:2181/kafka1 |
分别用
$ bin/kafka-server-start.sh -daemon config/server1.properties
$ bin/kafka-server-start.sh -daemon config/server2.properties
启动后用 ZooKeeper 的客户端连接上查看节点可以看到 kafka0 和 kafka1 为各个 Kafka 实例用到的 ZooKeeper 节点,实际表现为两个不同的 Kafka 集群
$ zkCli -server localhost
......................
[zk: localhost(CONNECTED) 0] ls /
[zookeeper, kafka1, kafka0]
[zk: localhost(CONNECTED) 1] ls /kafka0
[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
本文链接 https://yanbin.blog/kafka-mirror-maker-in-action/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。