继续把 Kafka 捋一捋,还剩两个主要的组件了,分别为 Kafka Connect 和 Kafka Streams。而其中的 Kafka Connect 是在 Kafka 0.9.0.0 开始加入的我,Connect 的出现让 Kafka 与外部世界更紧密连接起来了,进而可以让其他外围组件通过 Connect 的 Source 与 Sink 紧密的团结在以 Kafka 为核心的消息中心。从此不再总是以标准的 Kafka Consumer 和 Producer 与外部联络。
Kafka Connect 主要由两部分组成,Source Connector 和 Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector 需要用到 org.apache.kafka:connect-api
组件,不包含在 kafka-clients 依赖中,其中定义了两个主要抽像类
- org.apache.kafka.connect.source.SourceConnector extends Connector
- org.apache.kafka.connect.sink.SinkConnector extends Connector
Kafka 服务器端只提供了 connect-file 组件,在 Kafka 安装目录 libs/ 下有 connect-file-2.3.1.jar(当前 Kafka 版本 2.3.1), 即 FileStreamSourceConnector 和 FileStreamSinkConnector 两个实现。更多 Connector 实现可以网上找到,如 HDFS, Hadoop 的 Sink, JDBC Source/Sink, Elasticserach Sink, Kinesis Source/Sink 等等。
本文来体验一下 Kafka 内置的 FileStream Source/Sink 的使用,看如何把一个文件从一端传送到 Kafka 的另一端,完全配置实现。Kafka Connect 有两种运行模式,Standalone 和 Distributed。假设 Kafka 已启动,并创建好了 connect-test
这么一个 Topic。
先来试下 Standalone 的运行方式
可以直接运行 Kafka 自带的配置,在 Kafka 的安装目录 kafka_2.12-2.3.1/config 中有 connect 相关的配置文件,要完成文件内容的搬运要用到其中的
- connect-standalone.properties
- connect-file-source.properties
- connect-file-sink.properties
它们的内容别如下
connect-standalone.properties
1 2 3 4 5 6 7 |
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 |
要连接到哪个 Kafka broker, key/value 怎么转换,offset 保存在哪里,其实是具体 connector 的公共配置
connect-file-source.properties
1 2 3 4 5 |
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test |
Connector 的实现为 FileStreamSource, 最好写成类的全限名称,源文件是 test.txt
connect-file-sink.properties
1 2 3 4 5 |
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test |
Connector 的实现为 FileStreamSink, 也最好写成类的全限名称,目标文件是 test.sink.txt
以上三个文件的大意是,将要以 standalone 方式(connect-standalone.properties) 方式启动两个 connector, 它们分别是 connect-file-source.properties 和 connect-file-sink.properties 对应的配置。 更深入的理解前面三个文件的配置项的含义请参数带注释的 <KAFKA_HOME>/config 下的文件。
现在尝试用下面的命令启动它
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
connect-standalone.sh 命令的格式是
$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个文件是 connector 的配置,后面是每一个 Source/Sink 的配置,也就是一下可以启动多个 Source/Sink,把 Source 和 Sink 分开来一个个启动也行。
控制台一直显示
[2019-10-31 22:43:32,932] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
[2019-10-31 22:43:33,936] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
找不到 test.txt
文件,目标文件 test.sink.txt
已经被创建了,只是空的而已。那现在创建 test.txt
文件,一行行写入内容就,同时查看 test.sink.txt
的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
$ cat test.txt $ cat: test.txt: No such file or directory $ cat /tmp/connect.offsets $ cat: /tmp/connect.offsets: No such file or directory $ cat test.sink.txt $ echo hello >> test.txt $ cat test.sink.txt hello $ cat /tmp/connect.offsets ��srjava.util.HashMap���`�F loadFactorI thresholdxp?@ ur[B��T�xp-["local-file-source",{"filename":"test.txt"}]uq~{"position":6}x $ echo world >> test.txt $ cat test.sink.txt hello world $ cat /tmp/connect.offsets ��srjava.util.HashMap���`�F loadFactorI thresholdxp?@ ur[B��T�xp-["local-file-source",{"filename":"test.txt"}]uq~{"position":12}x echo done > test.txt $ cat test.txt done $ cat test.sink.txt hello world $ cat /tmp/connect.offsets ��srjava.util.HashMap���`�F loadFactorI thresholdxp?@ ur[B��T�xp-["local-file-source",{"filename":"test.txt"}]uq~{"position":12}x |
从上面的测试我们可以看到 Source, Sink 之间怎么工作的,以及 connect.offsets 是如何记录文件中已发送消息的位置。
再检查一下 Topic connect-test
中的消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"world"}
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"world"}
最后把文件 test.txt
擦除后写入的 done
并未被发出去,因为 connect.offsets 记一下上次的位置,如果把 connect.offsets 文件删除就可以了。同时也看到 Kafka 中消息长什么样子的,也就是前面配置中的 key.converter
和 value.converter
所产生的效果。
再来见识一下 Distributed Connect
为什么叫做分布式,因为它不是只在一个节点上执行,而是可以分布在整个集群的 Broker 中去。它不需要我们创建 Sourc/Sink 的配置文件,而是通过 REST API 来创建和管理 Connector,先要用下面命令起动 REST 服务
$ bin/connect-distributed.sh config/connect-distributed.properties
connect-distributed.properties 的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
bootstrap.servers=localhost:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 offset.flush.interval.ms=10000 #rest.port=8083 |
会默认启动服务端口为 8083
的 REST 服务,查看 https://kafka.apache.org/documentation/#connect_rest 找到有哪些 REST API 可调用。
试试如何简单的用 REST API 管理 Connector 的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
$ curl http://localhost:8083/connectors [] $ curl -X POST http://localhost:8083/connectors \ -H "Content-type:application/json" \ -H "Accept:application/json" \ -d ' { "name": "test-file-source", "config": { "connector.class": "FileStreamSource", "tasks.max": "1", "topic": "connect-test", "file": "test.txt" } } ' {"name":"test-file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","topic": "connect-test","file":"test.txt","name":"test-file-source"},"tasks":[],"type":"source"} curl http://localhost:8083/connectors ["test-file-source"] $curl -X POST http://localhost:8083/connectors \ -H "Content-type:application/json" \ -H "Accept:application/json" \ -d ' { "name": "test-file-sink", "config": { "connector.class": "FileStreamSink", "tasks.max": "1", "topics": "connect-test", "file": "test.sink.txt" } } ' {"name":"test-file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","topics": "connect-test","file":"test.sink.txt","name":"test-file-sink"},"tasks":[],"type":"sink"} $ curl http://localhost:8083/connectors ["test-file-source","test-file-sink"] |
这个 Connector 的行为与前面用 standalone 运行的是一样的。查看它们的配置与状态
$ curl http://localhost:8083/connectors/test-file-source/config
{"connector.class":"FileStreamSource","file":"test.txt","tasks.max":"1","name":"test-file-source","topic":"connect-test"}
$ curl http://localhost:8083/connectors/test-file-source/status
{"name":"test-file-source","connector":{"state":"RUNNING","worker_id":"192.168.86.238:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.86.238:8083"}],"type":"source"}
最后,至于如何开发自己的 Connector 就不具体说了,首先需要引入 org.apache.kafka:connect-api
依赖,关键 API 就是 SourceConnector, SinkConnector, SourceTask, SinkTask, SourceRecord, SinkRecord, Converter 以及 Transformation。
connect 会在 Kafka 中创建相应的 Topic
$ kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
connect-test
conect-configs 有 25 个 partition, 配置存储的地方在
kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-configs --from-beginning
{"properties":{"connector.class":"FileStreamSource","tasks.max":"1","topic":"connect-test","file":"test.txt","name":"test-file-source"}}
{"properties":{"topic":"connect-test","file":"test.txt","task.class":"org.apache.kafka.connect.file.FileStreamSourceTask","batch.size":"2000"}}
{"tasks":1}
{"properties":{"connector.class":"FileStreamSink","tasks.max":"1","topics":"connect-test","file":"test.sink.txt","name":"test-file-sink"}}
{"properties":{"file":"test.sink.txt","task.class":"org.apache.kafka.connect.file.FileStreamSinkTask","topics":"connect-test"}}
{"tasks":1}
Kafka 正在用自己内部 Topic 来实现许多新特性,就像 Redis 的集群,哨兵也是基本用自己的消息发布订阅机制来实现的。
链接:
本文链接 https://yanbin.blog/kafka-connect-how-to/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。