Kafka Connect 介绍和使用

继续把 Kafka 捋一捋,还剩两个主要的组件了,分别为 Kafka Connect 和 Kafka Streams。而其中的 Kafka Connect 是在 Kafka 0.9.0.0 开始加入的我,Connect 的出现让 Kafka 与外部世界更紧密连接起来了,进而可以让其他外围组件通过 Connect 的 Source 与 Sink 紧密的团结在以 Kafka 为核心的消息中心。从此不再总是以标准的 Kafka Consumer 和 Producer 与外部联络。

Kafka Connect 主要由两部分组成,Source Connector 和  Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector  需要用到 org.apache.kafka:connect-api 组件,不包含在 kafka-clients 依赖中,其中定义了两个主要抽像类

  1. org.apache.kafka.connect.source.SourceConnector extends Connector
  2. org.apache.kafka.connect.sink.SinkConnector extends Connector

Kafka 服务器端只提供了 connect-file  组件,在 Kafka 安装目录 libs/ 下有 connect-file-2.3.1.jar(当前 Kafka 版本 2.3.1), 即 FileStreamSourceConnector 和 FileStreamSinkConnector 两个实现。更多 Connector 实现可以网上找到,如 HDFS, Hadoop 的 Sink, JDBC Source/Sink, Elasticserach Sink, Kinesis Source/Sink 等等。

本文来体验一下 Kafka 内置的 FileStream Source/Sink 的使用,看如何把一个文件从一端传送到 Kafka 的另一端,完全配置实现。Kafka Connect 有两种运行模式,Standalone 和 Distributed。假设 Kafka 已启动,并创建好了 connect-test 这么一个 Topic。

先来试下 Standalone 的运行方式

可以直接运行 Kafka 自带的配置,在 Kafka 的安装目录 kafka_2.12-2.3.1/config 中有 connect 相关的配置文件,要完成文件内容的搬运要用到其中的

  1. connect-standalone.properties
  2. connect-file-source.properties
  3. connect-file-sink.properties

它们的内容别如下

connect-standalone.properties

要连接到哪个 Kafka broker, key/value 怎么转换,offset 保存在哪里,其实是具体 connector 的公共配置

connect-file-source.properties

Connector 的实现为 FileStreamSource, 最好写成类的全限名称,源文件是 test.txt

connect-file-sink.properties

Connector 的实现为 FileStreamSink, 也最好写成类的全限名称,目标文件是 test.sink.txt

以上三个文件的大意是,将要以 standalone 方式(connect-standalone.properties) 方式启动两个 connector, 它们分别是 connect-file-source.properties 和 connect-file-sink.properties 对应的配置。 更深入的理解前面三个文件的配置项的含义请参数带注释的 <KAFKA_HOME>/config 下的文件。

现在尝试用下面的命令启动它

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

connect-standalone.sh 命令的格式是

$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个文件是 connector 的配置,后面是每一个 Source/Sink 的配置,也就是一下可以启动多个 Source/Sink,把 Source 和  Sink 分开来一个个启动也行。

控制台一直显示

[2019-10-31 22:43:32,932] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
[2019-10-31 22:43:33,936] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)

找不到 test.txt 文件,目标文件 test.sink.txt 已经被创建了,只是空的而已。那现在创建 test.txt 文件,一行行写入内容就,同时查看 test.sink.txt  的内容

从上面的测试我们可以看到 Source, Sink 之间怎么工作的,以及 connect.offsets 是如何记录文件中已发送消息的位置。

再检查一下 Topic connect-test 中的消息

kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"world"}
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"world"}

最后把文件 test.txt 擦除后写入的 done 并未被发出去,因为 connect.offsets 记一下上次的位置,如果把 connect.offsets 文件删除就可以了。同时也看到 Kafka 中消息长什么样子的,也就是前面配置中的 key.convertervalue.converter 所产生的效果。

再来见识一下 Distributed Connect

为什么叫做分布式,因为它不是只在一个节点上执行,而是可以分布在整个集群的 Broker 中去。它不需要我们创建 Sourc/Sink  的配置文件,而是通过 REST API 来创建和管理 Connector,先要用下面命令起动 REST 服务

$ bin/connect-distributed.sh config/connect-distributed.properties

connect-distributed.properties 的内容

会默认启动服务端口为 8083 的 REST 服务,查看 https://kafka.apache.org/documentation/#connect_rest 找到有哪些 REST API  可调用。

试试如何简单的用 REST API 管理 Connector 的

这个 Connector 的行为与前面用 standalone 运行的是一样的。查看它们的配置与状态

$ curl http://localhost:8083/connectors/test-file-source/config
{"connector.class":"FileStreamSource","file":"test.txt","tasks.max":"1","name":"test-file-source","topic":"connect-test"}
$ curl http://localhost:8083/connectors/test-file-source/status
{"name":"test-file-source","connector":{"state":"RUNNING","worker_id":"192.168.86.238:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.86.238:8083"}],"type":"source"}

最后,至于如何开发自己的 Connector 就不具体说了,首先需要引入 org.apache.kafka:connect-api 依赖,关键 API 就是 SourceConnector, SinkConnector, SourceTask, SinkTask, SourceRecord, SinkRecord, Converter 以及 Transformation。

connect 会在 Kafka 中创建相应的 Topic

$ kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
connect-test

conect-configs 有 25 个 partition, 配置存储的地方在

kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-configs --from-beginning
{"properties":{"connector.class":"FileStreamSource","tasks.max":"1","topic":"connect-test","file":"test.txt","name":"test-file-source"}}
{"properties":{"topic":"connect-test","file":"test.txt","task.class":"org.apache.kafka.connect.file.FileStreamSourceTask","batch.size":"2000"}}
{"tasks":1}
{"properties":{"connector.class":"FileStreamSink","tasks.max":"1","topics":"connect-test","file":"test.sink.txt","name":"test-file-sink"}}
{"properties":{"file":"test.sink.txt","task.class":"org.apache.kafka.connect.file.FileStreamSinkTask","topics":"connect-test"}}
{"tasks":1}

Kafka 正在用自己内部 Topic 来实现许多新特性,就像 Redis 的集群,哨兵也是基本用自己的消息发布订阅机制来实现的。

链接:

  1. Kafka Connect

类别: Kafka. 标签: . 阅读(19). 订阅评论. TrackBack.

Leave a Reply

avatar