Kafka Producer 设置 Interceptor 来统计消息

Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send();暂停时间由 max.block.ms 决定,默认为 60 秒。producer.send() 返回一个 Future<RecordMetadata>, 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。

这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理,自然要在 a 与 b 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。

Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。 阅读全文 >>