本文旨在了解 Kafka 发送消息到有多个 Partition 的 Topic 时如何选择 Partition。或许多数人已经知道 Kafka 默认(当 key 为 null) 时采用 Round-robin 策略,也就是雨露均沾,风水轮流转,实现类是 DefaultPartitioner。但我们实际应用中为保持相关消息按序到,就必须送到指定的 Partition,方法可以有
- 指定 Partition 编号
- 指定 Key
- 自定义 Partitioner - 实现 org.apache.kafka.clients.producer.Partitioner, 并通过属性注册
还应考究当指定了 Key 或 Partition 编号发送消息后,后续消息 key 为 null 会选用哪个 Partition。最后再思考一个问题,Consumer 每次 poll 时是获得的消息列表是否只包含一个 Partition 源还是可以多个 Partiton 源。
为完成本次实验,可以本地搭建一个 Kafka 环境,参考 简单搭建 Apache Kafka 分布式消息系统。待 Zookeeper 和 Kafka 正常启动后,我们用下面的命令创建一个 Partition 数量为 3 的 Topic partition-test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic partition-test
验证一下该 Topic 的信息
Topic 已准备好了,接下来我们用 Java 代码进行测试
默认选择 Partition: Round-robin
这里贴出比较完整的源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public class MyKafkaProducer { public static void main(String[] args) throws Exception { KafkaProducer<String, Integer> producer = new KafkaProducer<>(getProperties()); for (int i = 0; i < 10; i++) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("partition-test", i); RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset()); } } private static Properties getProperties() { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(CLIENT_ID_CONFIG, "PartitionTestProducer"); props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); return props; } } |
这里创建 ProducerRecord 是用的 new ProducerRecord<>("partition-test", i)
, 它实际是通过
this(topic, null, null, null, value, null);
调用方法
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
所以 partition, timestamp, key, headers 都是 null, 当 partition 和 key 都为 null 时就要采用 Round-robin 了,看 DefaultPartitioner 的注释
The default partitioning strategy:
- If a partition is specified in the record, use it (指定了 Partition, 则用指定的 Partition)
- If no partition is specified but a key is present choose a partition based on a hash of the key (如果没有指定 Partition, 但指定了 Key, 则通过 Key 算出 hash 值定位到 Partition)
- If no partition or key is present choose a partition in a round-robin fashion (Partition 和 Key 都未指定则采用 Round-robin)
所以 ProducerRecord 没有 partition 和 key 的情况下,发送 10 消息所用的 Partition 是下面那样子的
Sent to partition: 0, offset: 0
Sent to partition: 2, offset: 0
Sent to partition: 1, offset: 1
Sent to partition: 0, offset: 1
Sent to partition: 2, offset: 1
Sent to partition: 1, offset: 2
Sent to partition: 0, offset: 2
Sent to partition: 2, offset: 2
Sent to partition: 1, offset: 3
Sent to partition: 0, offset: 3
注意到这个 DefaultPartitioner 中的 Round-robin 算法,在三个 Partition 时并不是以自然顺序 0, 1, 2 的顺序。这里三个 Partition 时顺序是 0, 2, 1 这样的顺序,而且每次从哪个索引号开始也是随机的,所以有时会是 2, 1, 0, 或 1, 0, 2, 反正每个 Partition 机会均等。
指定 Partition 编号
上面的代码 main 方法稍加变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static void main(String[] args) throws Exception { KafkaProducer<String, Integer> producer = new KafkaProducer<>(getProperties()); String topic = "partition-test"; int partitionSize = producer.partitionsFor(topic).size(); System.out.println("Partition size: " + partitionSize); int partition = new Random().nextInt(partitionSize); for (int i = 0; i < 10; i++) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(topic, partition, null, i); RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset()); } } |
通过 producer.partitionsFor(topic) 获得 Partition 列表, Partition 编号是从 0 开始的,随机选择一下 Partition, 然后构建 ProducerRecord 是用 new ProducerRecord<>(topic, partition, null, i)
, 实际调用是
this(topic, partition, null, key, value, null)
所以指定了 Partition, 但 key 是 null. 看下执行效果
Partition size: 3
Sent to partition: 1, offset: 4
Sent to partition: 1, offset: 5
Sent to partition: 1, offset: 6
Sent to partition: 1, offset: 7
Sent to partition: 1, offset: 8
Sent to partition: 1, offset: 9
Sent to partition: 1, offset: 10
Sent to partition: 1, offset: 11
Sent to partition: 1, offset: 12
Sent to partition: 1, offset: 13
执行多次可以看到每次从 3 个 Partition 中随机选择了一个 Partition。
我们也可以测试一下只要 Partition 不为 null, 即使指了 Key, 这个 Key 也不会参与决策使用哪一个 Partition. 比如下面的代码
1 2 3 4 5 6 |
for (int i = 0; i < 10; i++) { String randomKey = RandomStringUtils.randomAlphabetic(5); ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(topic, 1, randomKey, i); RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", key: " + randomKey); } |
只要写死了 Partition 为 1,任凭 Key 怎么个随机法都改变不了往 1 号 Partition 发布消息的事实
Sent to partition: 1, offset: 24, key: yBtcg
Sent to partition: 1, offset: 25, key: MgNbO
Sent to partition: 1, offset: 26, key: BkzIH
Sent to partition: 1, offset: 27, key: TjvPr
Sent to partition: 1, offset: 28, key: JoUhs
Sent to partition: 1, offset: 29, key: bvuUp
Sent to partition: 1, offset: 30, key: LGWeQ
Sent to partition: 1, offset: 31, key: NhFnX
Sent to partition: 1, offset: 32, key: BgZzK
Sent to partition: 1, offset: 33, key: UyZhP
指定 Key
在没有指定 Partition(null 值) 时, 如果有 Key, Kafka 将依据 Key 哈稀出 Partition 编号来。下面是测试代码
1 2 3 4 5 6 |
String[] keys = new String[]{"yBtcg", "yBtcg", "yBtcg", "MgNbO", "BkzIH", "TjvPr"}; for (int i = 0; i < 6; i++) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(topic, null, keys[i], i); RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset() + ", key: " + keys[i]); } |
只要 Partition 的数目不变,上面的代码执行千百遍所选择的 Partition 都会是一样的
Sent to partition: 1, offset: 37, key: yBtcg
Sent to partition: 1, offset: 38, key: yBtcg
Sent to partition: 1, offset: 39, key: yBtcg
Sent to partition: 2, offset: 14, key: MgNbO
Sent to partition: 0, offset: 25, key: BkzIH
Sent to partition: 0, offset: 26, key: TjvPr
一般来说我们不建议使用 Key 来算出 Partition 编号,因为极有可能消息不能平均的分布到每一个 Partition, 除非是一个 UUID。比如用户输入的不确定的字符串,或是一个数字序列 123450001, 123450002, 123450003, 变化部分在后端,很容易使得个别 Partition 很繁忙,而有些却闲得蛋疼,降低了 Topic 运输数据的效率。
采用 Partition 编号或 Key 来指定 Partition 后对 Round-robin 的影响
我们知道 Kafka 默认采用 Round-robin 来选择 Partition, 即如果 Partition 数目为 3,没有指定 Partition 或 Key 的情况下,Partition 的选择策略是 0, 2, 1, 0, 2, 1, 0, 2, 1.....
。我们假设下面的情形
- 未指定 Partition 和 Key, 发送了一条消息到了 Partition 0
- 然后指定 Partition 1 发送了又一条消息
- 再次不指定 Partition 和 Key, 发送一条消息(这条消息应该是发送到步骤 1 中往后推的下一个 Partition 2 还是 1 呢?)
代码验证
1 2 3 4 5 6 7 8 |
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, 100)).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset()); metadata = producer.send(new ProducerRecord<>(topic, 1, null, 200)).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset()); metadata = producer.send(new ProducerRecord<>(topic, 300)).get(); System.out.println("Sent to partition: " + metadata.partition() + ", offset: " + metadata.offset()); |
第一次执行
Sent to partition: 2, offset: 55
Sent to partition: 1, offset: 99
Sent to partition: 1, offset: 100
第二次执行
Sent to partition: 0, offset: 71
Sent to partition: 1, offset: 101
Sent to partition: 2, offset: 56
所以指定了 Partition 或 Key 时,不会改变 DefaultPartitioner固有顺序。这未指定 Partition 或 Key 时,消息总是按照 0, 2, 1 的循环来走。
因此如果有需求同一批次的许多消息要放在同一个 Partition, 随机选取的 Partition 要是恰好是 Round-robin 的下一个 Partition, 那么下次不指定 Partition 或 Key 的消息就会使用同一个 Partition. 更有甚至两次随机选到了同一个 Partition, 那两批次的消息全部会挤到同一个 Partition 中去。
为一批消息指定 Partition 的另一种实现方式是,不采用随机选择 Partition, 而是可以选发送批次中的第一条,然后在回调中使用 RecordMetadata.partition() 来发批次中的其他消息,这个 Partition 便是 Round-robin 返回的,如此下回不指定 Partition 或 Key 时不会用到相同的 Partition。最后的办法是如果一批次中有 5 条消息,选择了 Round-robin 的下一个 Partition 后,Round-robin 应该跳到该 Partition 4 次,这样更能让消息均匀的分布到每一个 Partition 中去。
如何实现自己的 Partitioner
自定义的 Partitioner 需要实现 org.apache.kafka.clients.producer.Partition
接口(含三个实现方法)。下面是一个粗陋的自定义实现
1 2 3 4 5 6 7 8 9 10 |
public class SimplePartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer accountId = (Integer) key; return cluster.partitionCountForTopic(topic) % accountId; } //实现另两方法 } |
在创建 Kafka Producer 时需设置 partitioner.class
属性
props.put(PARTITIONER_CLASS_CONFIG, "cc.unmi.SimplePartitioner");
然后创建 ProducerRecord 就用用 AccountId 作为 Key
producer.send(new ProducerRecord<>(topic, 12345, 200))
SimplePartitioner 中把 AccountId 对 Partition 数目进行求模获得 Partition 编号,这样就能保证相同 AccountId 的消息总是往同一个 Partition 中发送。
最后一个问题,Consumer poll 时是只从一个 Partition 中抓若干条记录,还是同时从多个 Partition 中抓一批消息,这个看看
- org.apache.kafka.clients.consumer.KafkaConsumer
- org.apache.kafka.clients.consumer.internals.Fetcher
两个类的源代码就知道 Kafka 的 Java 客户端轮询消息时会逐个轮询每一个 Topic, 并组装在一个 ConsumerRecords<K, V> 对象中。
2017-07-08:以上关于 Consumer 如何从 Partition 中拉消息的描述不太准确。这与 Topic 有多少个 Partition, 以及相同 Group ID 中有多少个 Consumer 有关, 在相同的 Topic 和 Group ID 范围内,一个 Partition 中会被一个 Consumer 消费,这就是说当 Consumer 比 Partition 少,某些 Consumer 要消费多个 Partition; 如果 Consumer 比 Partition 多,那么就出现多出来的 Consumer 无所事事。这会产生一个 Consumer 数目增减时的 Rebalance 的过程。
本文链接 https://yanbin.blog/how-kafka-select-partition/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
[…] 以下转自:https://yanbin.blog/how-kafka-select-partition/ […]