Kafka 发布消息时如何选择 Partition

本文旨在了解 Kafka 发送消息到有多个 Partition 的 Topic 时如何选择 Partition。或许多数人已经知道 Kafka 默认(当 key 为 null) 时采用 Round-robin 策略,也就是雨露均沾,风水轮流转,实现类是 DefaultPartitioner。但我们实际应用中为保持相关消息按序到,就必须送到指定的 Partition,方法可以有

  1. 指定 Partition 编号
  2. 指定 Key
  3. 自定义 Partitioner - 实现 org.apache.kafka.clients.producer.Partitioner, 并通过属性注册

还应考究当指定了 Key 或 Partition 编号发送消息后,后续消息 key 为 null 会选用哪个 Partition。最后再思考一个问题,Consumer 每次  poll 时是获得的消息列表是否只包含一个 Partition 源还是可以多个 Partiton 源。

为完成本次实验,可以本地搭建一个 Kafka 环境,参考 简单搭建 Apache Kafka 分布式消息系统。待 Zookeeper 和 Kafka 正常启动后,我们用下面的命令创建一个 Partition 数量为 3 的 Topic partition-test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic partition-test

验证一下该 Topic 的信息

Topic 已准备好了,接下来我们用 Java 代码进行测试

默认选择 Partition: Round-robin

这里贴出比较完整的源代码

这里创建 ProducerRecord 是用的 new ProducerRecord<>("partition-test", i), 它实际是通过

this(topic, null, null, null, value, null);

调用方法

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

所以 partition, timestamp, key, headers 都是 null, 当 partition 和 key 都为 null 时就要采用 Round-robin 了,看 DefaultPartitioner 的注释

The default partitioning strategy:

  1. If a partition is specified in the record, use it (指定了 Partition, 则用指定的 Partition)
  2. If no partition is specified but a key is present choose a partition based on a hash of the key (如果没有指定 Partition, 但指定了 Key, 则通过 Key 算出 hash 值定位到 Partition)
  3. If no partition or key is present choose a partition in a round-robin fashion (Partition 和 Key 都未指定则采用 Round-robin)

所以 ProducerRecord 没有 partition 和 key 的情况下,发送 10 消息所用的 Partition 是下面那样子的

Sent to partition: 0, offset: 0
Sent to partition: 2, offset: 0
Sent to partition: 1, offset: 1
Sent to partition: 0, offset: 1
Sent to partition: 2, offset: 1
Sent to partition: 1, offset: 2
Sent to partition: 0, offset: 2
Sent to partition: 2, offset: 2
Sent to partition: 1, offset: 3
Sent to partition: 0, offset: 3

注意到这个 DefaultPartitioner 中的 Round-robin 算法,在三个 Partition 时并不是以自然顺序 0, 1, 2 的顺序。这里三个 Partition 时顺序是 0, 2, 1 这样的顺序,而且每次从哪个索引号开始也是随机的,所以有时会是 2, 1, 0, 或 1, 0, 2, 反正每个 Partition 机会均等。

指定 Partition 编号

上面的代码 main 方法稍加变化

通过 producer.partitionsFor(topic) 获得 Partition 列表, Partition 编号是从 0 开始的,随机选择一下 Partition, 然后构建 ProducerRecord 是用 new ProducerRecord<>(topic, partition, null, i), 实际调用是

this(topic, partition, null, key, value, null)

所以指定了 Partition, 但 key 是 null. 看下执行效果

Partition size: 3
Sent to partition: 1, offset: 4
Sent to partition: 1, offset: 5
Sent to partition: 1, offset: 6
Sent to partition: 1, offset: 7
Sent to partition: 1, offset: 8
Sent to partition: 1, offset: 9
Sent to partition: 1, offset: 10
Sent to partition: 1, offset: 11
Sent to partition: 1, offset: 12
Sent to partition: 1, offset: 13

执行多次可以看到每次从 3 个 Partition 中随机选择了一个 Partition。

我们也可以测试一下只要 Partition 不为 null, 即使指了 Key, 这个 Key 也不会参与决策使用哪一个 Partition. 比如下面的代码

只要写死了 Partition 为 1,任凭 Key 怎么个随机法都改变不了往 1 号 Partition 发布消息的事实

Sent to partition: 1, offset: 24, key: yBtcg
Sent to partition: 1, offset: 25, key: MgNbO
Sent to partition: 1, offset: 26, key: BkzIH
Sent to partition: 1, offset: 27, key: TjvPr
Sent to partition: 1, offset: 28, key: JoUhs
Sent to partition: 1, offset: 29, key: bvuUp
Sent to partition: 1, offset: 30, key: LGWeQ
Sent to partition: 1, offset: 31, key: NhFnX
Sent to partition: 1, offset: 32, key: BgZzK
Sent to partition: 1, offset: 33, key: UyZhP

指定 Key

在没有指定 Partition(null 值) 时, 如果有 Key, Kafka 将依据 Key 哈稀出 Partition 编号来。下面是测试代码

只要 Partition 的数目不变,上面的代码执行千百遍所选择的 Partition 都会是一样的

Sent to partition: 1, offset: 37, key: yBtcg
Sent to partition: 1, offset: 38, key: yBtcg
Sent to partition: 1, offset: 39, key: yBtcg
Sent to partition: 2, offset: 14, key: MgNbO
Sent to partition: 0, offset: 25, key: BkzIH
Sent to partition: 0, offset: 26, key: TjvPr

一般来说我们不建议使用 Key 来算出 Partition 编号,因为极有可能消息不能平均的分布到每一个 Partition, 除非是一个 UUID。比如用户输入的不确定的字符串,或是一个数字序列 123450001, 123450002, 123450003, 变化部分在后端,很容易使得个别 Partition 很繁忙,而有些却闲得蛋疼,降低了 Topic 运输数据的效率。

采用 Partition 编号或 Key 来指定 Partition 后对 Round-robin 的影响

我们知道 Kafka 默认采用 Round-robin 来选择 Partition, 即如果 Partition 数目为 3,没有指定 Partition 或 Key 的情况下,Partition 的选择策略是 0, 2, 1, 0, 2, 1, 0, 2, 1..... 。我们假设下面的情形

  1. 未指定 Partition 和 Key, 发送了一条消息到了 Partition 0
  2. 然后指定 Partition 1 发送了又一条消息
  3. 再次不指定 Partition 和 Key, 发送一条消息(这条消息应该是发送到步骤 1 中往后推的下一个 Partition  2 还是 1 呢?)

代码验证

第一次执行

Sent to partition: 2, offset: 55
Sent to partition: 1, offset: 99
Sent to partition: 1, offset: 100

第二次执行

Sent to partition: 0, offset: 71
Sent to partition: 1, offset: 101
Sent to partition: 2, offset: 56

所以指定了 Partition 或 Key 时,不会改变 DefaultPartitioner固有顺序。这未指定 Partition 或 Key 时,消息总是按照 0, 2, 1 的循环来走。

因此如果有需求同一批次的许多消息要放在同一个 Partition, 随机选取的 Partition 要是恰好是 Round-robin 的下一个 Partition, 那么下次不指定 Partition 或 Key 的消息就会使用同一个 Partition. 更有甚至两次随机选到了同一个 Partition, 那两批次的消息全部会挤到同一个 Partition 中去。

为一批消息指定 Partition 的另一种实现方式是,不采用随机选择 Partition, 而是可以选发送批次中的第一条,然后在回调中使用 RecordMetadata.partition() 来发批次中的其他消息,这个 Partition 便是 Round-robin 返回的,如此下回不指定 Partition 或 Key 时不会用到相同的 Partition。最后的办法是如果一批次中有 5 条消息,选择了 Round-robin 的下一个 Partition 后,Round-robin 应该跳到该 Partition 4 次,这样更能让消息均匀的分布到每一个 Partition 中去。

如何实现自己的 Partitioner

自定义的 Partitioner 需要实现 org.apache.kafka.clients.producer.Partition 接口(含三个实现方法)。下面是一个粗陋的自定义实现

在创建 Kafka Producer 时需设置 partitioner.class 属性

props.put(PARTITIONER_CLASS_CONFIG, "cc.unmi.SimplePartitioner");

然后创建 ProducerRecord 就用用 AccountId 作为 Key

producer.send(new ProducerRecord<>(topic, 12345, 200))

SimplePartitioner 中把 AccountId 对 Partition 数目进行求模获得 Partition 编号,这样就能保证相同 AccountId 的消息总是往同一个 Partition 中发送。

最后一个问题,Consumer poll 时是只从一个 Partition 中抓若干条记录,还是同时从多个 Partition 中抓一批消息,这个看看

  1. org.apache.kafka.clients.consumer.KafkaConsumer
  2. org.apache.kafka.clients.consumer.internals.Fetcher

两个类的源代码就知道 Kafka 的 Java 客户端轮询消息时会逐个轮询每一个 Topic, 并组装在一个 ConsumerRecords<K, V> 对象中。


2017-07-08:以上关于 Consumer 如何从 Partition 中拉消息的描述不太准确。这与 Topic 有多少个 Partition, 以及相同 Group ID 中有多少个 Consumer 有关, 在相同的 Topic 和 Group ID 范围内,一个 Partition 中会被一个 Consumer 消费,这就是说当 Consumer 比 Partition 少,某些 Consumer 要消费多个 Partition; 如果 Consumer 比 Partition  多,那么就出现多出来的 Consumer 无所事事。这会产生一个 Consumer 数目增减时的 Rebalance 的过程。

本文链接 https://yanbin.blog/how-kafka-select-partition/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

1 Comment
Inline Feedbacks
View all comments
trackback

[…] 以下转自:https://yanbin.blog/how-kafka-select-partition/ […]