Kafka Connect 介绍和使用
继续把 Kafka 捋一捋,还剩两个主要的组件了,分别为 Kafka Connect 和 Kafka Streams。而其中的 Kafka Connect 是在 Kafka 0.9.0.0 开始加入的我,Connect 的出现让 Kafka 与外部世界更紧密连接起来了,进而可以让其他外围组件通过 Connect 的 Source 与 Sink 紧密的团结在以 Kafka 为核心的消息中心。从此不再总是以标准的 Kafka Consumer 和 Producer 与外部联络。
Kafka Connect 主要由两部分组成,Source Connector 和 Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector 需要用到
Kafka 服务器端只提供了 connect-file 组件,在 Kafka 安装目录 libs/ 下有 connect-file-2.3.1.jar(当前 Kafka 版本 2.3.1), 即 FileStreamSourceConnector 和 FileStreamSinkConnector 两个实现。更多 Connector 实现可以网上找到,如 HDFS, Hadoop 的 Sink, JDBC Source/Sink, Elasticserach Sink, Kinesis Source/Sink 等等。
本文来体验一下 Kafka 内置的 FileStream Source/Sink 的使用,看如何把一个文件从一端传送到 Kafka 的另一端,完全配置实现。Kafka Connect 有两种运行模式,Standalone 和 Distributed。假设 Kafka 已启动,并创建好了
它们的内容别如下
connect-standalone.properties
要连接到哪个 Kafka broker, key/value 怎么转换,offset 保存在哪里,其实是具体 connector 的公共配置
connect-file-source.properties
Connector 的实现为 FileStreamSource, 最好写成类的全限名称,源文件是 test.txt
connect-file-sink.properties
Connector 的实现为 FileStreamSink, 也最好写成类的全限名称,目标文件是 test.sink.txt
以上三个文件的大意是,将要以 standalone 方式(connect-standalone.properties) 方式启动两个 connector, 它们分别是 connect-file-source.properties 和 connect-file-sink.properties 对应的配置。 更深入的理解前面三个文件的配置项的含义请参数带注释的 <KAFKA_HOME>/config 下的文件。
现在尝试用下面的命令启动它
控制台一直显示
从上面的测试我们可以看到 Source, Sink 之间怎么工作的,以及 connect.offsets 是如何记录文件中已发送消息的位置。
再检查一下 Topic
$ bin/connect-distributed.sh config/connect-distributed.properties
connect-distributed.properties 的内容
会默认启动服务端口为
试试如何简单的用 REST API 管理 Connector 的
这个 Connector 的行为与前面用 standalone 运行的是一样的。查看它们的配置与状态
connect 会在 Kafka 中创建相应的 Topic
链接:
永久链接 https://yanbin.blog/kafka-connect-how-to/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
Kafka Connect 主要由两部分组成,Source Connector 和 Sink Connector,这两个来自于 Akka Stream 这一 Reactive 框架的概念,即往 Kafka 流入数据的 Connector 是 Source, 从 Kafka 导出数据的是 Sink。 要自己实现 Kafka 的 Connector 需要用到
org.apache.kafka:connect-api 组件,不包含在 kafka-clients 依赖中,其中定义了两个主要抽像类- org.apache.kafka.connect.source.SourceConnector extends Connector
- org.apache.kafka.connect.sink.SinkConnector extends Connector
Kafka 服务器端只提供了 connect-file 组件,在 Kafka 安装目录 libs/ 下有 connect-file-2.3.1.jar(当前 Kafka 版本 2.3.1), 即 FileStreamSourceConnector 和 FileStreamSinkConnector 两个实现。更多 Connector 实现可以网上找到,如 HDFS, Hadoop 的 Sink, JDBC Source/Sink, Elasticserach Sink, Kinesis Source/Sink 等等。
本文来体验一下 Kafka 内置的 FileStream Source/Sink 的使用,看如何把一个文件从一端传送到 Kafka 的另一端,完全配置实现。Kafka Connect 有两种运行模式,Standalone 和 Distributed。假设 Kafka 已启动,并创建好了
connect-test 这么一个 Topic。先来试下 Standalone 的运行方式
可以直接运行 Kafka 自带的配置,在 Kafka 的安装目录 kafka_2.12-2.3.1/config 中有 connect 相关的配置文件,要完成文件内容的搬运要用到其中的- connect-standalone.properties
- connect-file-source.properties
- connect-file-sink.properties
它们的内容别如下
connect-standalone.properties
1bootstrap.servers=localhost:9092
2key.converter=org.apache.kafka.connect.json.JsonConverter
3value.converter=org.apache.kafka.connect.json.JsonConverter
4key.converter.schemas.enable=true
5value.converter.schemas.enable=true
6offset.storage.file.filename=/tmp/connect.offsets
7offset.flush.interval.ms=10000要连接到哪个 Kafka broker, key/value 怎么转换,offset 保存在哪里,其实是具体 connector 的公共配置
connect-file-source.properties
1name=local-file-source
2connector.class=FileStreamSource
3tasks.max=1
4file=test.txt
5topic=connect-testConnector 的实现为 FileStreamSource, 最好写成类的全限名称,源文件是 test.txt
connect-file-sink.properties
1name=local-file-sink
2connector.class=FileStreamSink
3tasks.max=1
4file=test.sink.txt
5topics=connect-testConnector 的实现为 FileStreamSink, 也最好写成类的全限名称,目标文件是 test.sink.txt
以上三个文件的大意是,将要以 standalone 方式(connect-standalone.properties) 方式启动两个 connector, 它们分别是 connect-file-source.properties 和 connect-file-sink.properties 对应的配置。 更深入的理解前面三个文件的配置项的含义请参数带注释的 <KAFKA_HOME>/config 下的文件。
现在尝试用下面的命令启动它
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.propertiesconnect-standalone.sh 命令的格式是
$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]第一个文件是 connector 的配置,后面是每一个 Source/Sink 的配置,也就是一下可以启动多个 Source/Sink,把 Source 和 Sink 分开来一个个启动也行。
控制台一直显示
[2019-10-31 22:43:32,932] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)找不到
[2019-10-31 22:43:33,936] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
test.txt 文件,目标文件 test.sink.txt 已经被创建了,只是空的而已。那现在创建 test.txt 文件,一行行写入内容就,同时查看 test.sink.txt 的内容 1$ cat test.txt
2$ cat: test.txt: No such file or directory
3$ cat /tmp/connect.offsets
4$ cat: /tmp/connect.offsets: No such file or directory
5$ cat test.sink.txt
6$ echo hello >> test.txt
7$ cat test.sink.txt
8hello
9$ cat /tmp/connect.offsets
10��srjava.util.HashMap���`�F
11loadFactorI thresholdxp?@
12ur[B��T�xp-["local-file-source",{"filename":"test.txt"}]uq~{"position":6}x
13$ echo world >> test.txt
14$ cat test.sink.txt
15hello
16world
17$ cat /tmp/connect.offsets
18��srjava.util.HashMap���`�F
19loadFactorI thresholdxp?@
20ur[B��T�xp-["local-file-source",{"filename":"test.txt"}]uq~{"position":12}x
21echo done > test.txt
22$ cat test.txt
23done
24$ cat test.sink.txt
25hello
26world
27$ cat /tmp/connect.offsets
28��srjava.util.HashMap���`�F
29loadFactorI thresholdxp?@
30ur[B��T�xp-["local-file-source",{"filename":"test.txt"}]uq~{"position":12}x从上面的测试我们可以看到 Source, Sink 之间怎么工作的,以及 connect.offsets 是如何记录文件中已发送消息的位置。
再检查一下 Topic
connect-test 中的消息kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning最后把文件
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"world"}
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"world"}
test.txt 擦除后写入的 done 并未被发出去,因为 connect.offsets 记一下上次的位置,如果把 connect.offsets 文件删除就可以了。同时也看到 Kafka 中消息长什么样子的,也就是前面配置中的 key.converter 和 value.converter 所产生的效果。再来见识一下 Distributed Connect
为什么叫做分布式,因为它不是只在一个节点上执行,而是可以分布在整个集群的 Broker 中去。它不需要我们创建 Sourc/Sink 的配置文件,而是通过 REST API 来创建和管理 Connector,先要用下面命令起动 REST 服务$ bin/connect-distributed.sh config/connect-distributed.properties
connect-distributed.properties 的内容
1bootstrap.servers=localhost:9092
2
3group.id=connect-cluster
4
5key.converter=org.apache.kafka.connect.json.JsonConverter
6value.converter=org.apache.kafka.connect.json.JsonConverter
7key.converter.schemas.enable=true
8value.converter.schemas.enable=true
9
10offset.storage.topic=connect-offsets
11offset.storage.replication.factor=1
12config.storage.topic=connect-configs
13config.storage.replication.factor=1
14
15status.storage.topic=connect-status
16status.storage.replication.factor=1
17
18offset.flush.interval.ms=10000
19
20#rest.port=8083会默认启动服务端口为
8083 的 REST 服务,查看 https://kafka.apache.org/documentation/#connect_rest 找到有哪些 REST API 可调用。试试如何简单的用 REST API 管理 Connector 的
1$ curl http://localhost:8083/connectors
2[]
3$ curl -X POST http://localhost:8083/connectors \
4 -H "Content-type:application/json" \
5 -H "Accept:application/json" \
6 -d '
7{
8 "name": "test-file-source",
9 "config": {
10 "connector.class": "FileStreamSource",
11 "tasks.max": "1",
12 "topic": "connect-test",
13 "file": "test.txt"
14 }
15}
16'
17{"name":"test-file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","topic":
18"connect-test","file":"test.txt","name":"test-file-source"},"tasks":[],"type":"source"}
19curl http://localhost:8083/connectors
20["test-file-source"]
21$curl -X POST http://localhost:8083/connectors \
22 -H "Content-type:application/json" \
23 -H "Accept:application/json" \
24-d '
25{
26 "name": "test-file-sink",
27 "config": {
28 "connector.class": "FileStreamSink",
29 "tasks.max": "1",
30 "topics": "connect-test",
31 "file": "test.sink.txt"
32 }
33}
34'
35{"name":"test-file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","topics":
36"connect-test","file":"test.sink.txt","name":"test-file-sink"},"tasks":[],"type":"sink"}
37$ curl http://localhost:8083/connectors
38["test-file-source","test-file-sink"]这个 Connector 的行为与前面用 standalone 运行的是一样的。查看它们的配置与状态
$ curl http://localhost:8083/connectors/test-file-source/config最后,至于如何开发自己的 Connector 就不具体说了,首先需要引入
{"connector.class":"FileStreamSource","file":"test.txt","tasks.max":"1","name":"test-file-source","topic":"connect-test"}
$ curl http://localhost:8083/connectors/test-file-source/status
{"name":"test-file-source","connector":{"state":"RUNNING","worker_id":"192.168.86.238:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.86.238:8083"}],"type":"source"}
org.apache.kafka:connect-api 依赖,关键 API 就是 SourceConnector, SinkConnector, SourceTask, SinkTask, SourceRecord, SinkRecord, Converter 以及 Transformation。connect 会在 Kafka 中创建相应的 Topic
$ kafka-topics --zookeeper localhost:2181 --listconect-configs 有 25 个 partition, 配置存储的地方在
__consumer_offsets
connect-configs
connect-offsets
connect-status
connect-test
kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-configs --from-beginningKafka 正在用自己内部 Topic 来实现许多新特性,就像 Redis 的集群,哨兵也是基本用自己的消息发布订阅机制来实现的。
{"properties":{"connector.class":"FileStreamSource","tasks.max":"1","topic":"connect-test","file":"test.txt","name":"test-file-source"}}
{"properties":{"topic":"connect-test","file":"test.txt","task.class":"org.apache.kafka.connect.file.FileStreamSourceTask","batch.size":"2000"}}
{"tasks":1}
{"properties":{"connector.class":"FileStreamSink","tasks.max":"1","topics":"connect-test","file":"test.sink.txt","name":"test-file-sink"}}
{"properties":{"file":"test.sink.txt","task.class":"org.apache.kafka.connect.file.FileStreamSinkTask","topics":"connect-test"}}
{"tasks":1}
链接:
永久链接 https://yanbin.blog/kafka-connect-how-to/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。