Kafka Producer 设置 Interceptor 来统计消息

Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send();暂停时间由 max.block.ms 决定,默认为 60 秒。producer.send() 返回一个 Future<RecordMetadata>, 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。

这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理,自然要在 a 与 b 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。

Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。 阅读全文 >>

类别: Kafka. 标签: . 阅读(18). 评论(0) »

Kafka 发布消息时如何选择 Partition

本文旨在了解 Kafka 发送消息到有多个 Partition 的 Topic 时如何选择 Partition。或许多数人已经知道 Kafka 默认(当 key 为 null) 时采用 Round-robin 策略,也就是雨露均沾,风水轮流转,实现类是 DefaultPartitioner。但我们实际应用中为保持相关消息按序到,就必须送到指定的 Partition,方法可以有

  1. 指定 Partition 编号
  2. 指定 Key
  3. 自定义 Partitioner - 实现 org.apache.kafka.clients.producer.Partitioner, 并通过属性注册

还应考究当指定了 Key 或 Partition 编号发送消息后,后续消息 key 为 null 会选用哪个 Partition。最后再思考一个问题,Consumer 每次  poll 时是获得的消息列表是否只包含一个 Partition 源还是可以多个 Partiton 源。

为完成本次实验,可以本地搭建一个 Kafka 环境,参考 简单搭建 Apache Kafka 分布式消息系统。待 Zookeeper 和 Kafka 正常启动后,我们用下面的命令创建一个 Partition 数量为 3 的 Topic partition-test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic partition-test

验证一下该 Topic 的信息 阅读全文 >>

类别: Mid-Ware. 标签: . 阅读(1,227). 评论(0) »

Kafka 生产消费 Avro 序列化数据

本文实践了如何连接 Kafka 生产和消费 Avro 序列化格式的数据, 不能像 NgAgo-gDNA 那样, 为保证实验内容及结果的可重复性, 文中所用的各中间件和组件版本如下:

  1. Apache Kafka: kafka_2.11-0.10.0.1, 这个版本在初始始化生产者消费者的属性与之前版本有所不同.
  2. kafka-clients: Java API 客户端, 版本为  0.10.0.1
  3. Apache Avro: 1.8.1. 关于 Avro 序列化的内容可参见 Apache Avro 序列化与反序列化 (Java 实现)
  4. Java 8

Apache Kafka 消息系统设计为可以传输字符串, 二进制等数据, 但直接用于传输生产消费两端都能理解的对象数据会更友好.  所以我们这里用 Avro 的 Schema  来定义要传输的数据格式, 通信时采用自定义的序列化和反序列化类进行对象与字节数组间的转换.

以下是整个实验过程

本地启动 Apache Kafka 服务

请参考 简单搭建 Apache Kafka 分布式消息系统 启动 ZooKeeper 和 Kafka 即可. 程序运行会自动创建相应的主题. 启动后 Kafka 开启了本地的 9092 端口, 程序中只需要连接这个端口, 不用管 ZooKeeper 的  2181 端口. 阅读全文 >>

类别: Java/JEE, Mid-Ware. 标签: , . 阅读(4,109). 评论(0) »

简单搭建 Apache Kafka 分布式消息系统

早先都是用的基于 JMS 规范的消息系统, 像 ActiveMQ, IBM MQSeries 等. 随着互联网的发展, 大约是要适应当今大数据, 高可用性, 高效的需求, 于是诞生了 Apache Kafka 这一新时代的分布式消息系统. Apache Kafka 也是发布-订阅式的消息系统, 用 Scala 语言写的, 它最初由 LinedIn 开发并贡献到 Apache 基金会.

Kafka 的集群实质是依赖于 ZooKeeper 的集群来协同管理, 所以这里可以参照之前的 ZooKeeper 快速搭建与体验 来搭建一个 ZooKeeper 集群(其实这是一个伪集群, 实际产品中应该把 ZooKeeper 集群分布在不同的机器上).

本文主要是参考官方的 Kafka Quickstart 来快速体验 Kafka 消息系统, 下载的 Kafka 自带了 ZooKeeper, 默认只启动了一个  ZooKeeper 节点. 如需 ZooKeeper 集群可以不依赖于 Kafka 自带的 ZooKeeper 而单独搭建.

下面开始演示建立一个最简单的 Kafka 系统 阅读全文 >>

类别: Java/JEE. 标签: . 阅读(614). 评论(0) »