Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send();暂停时间由 max.block.ms 决定,默认为 60 秒。producer.send() 返回一个 Future<RecordMetadata>, 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。
这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理,自然要在 a 与 b 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。
Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。
ProducerInterceptor 有三个接口方法:
- void close(): Interceptor 关闭时调用,会在 Producer 关闭前被调用
- ProducerRecord<K,V> onSend(ProducerRecord<K, V> record): 由 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 调用,在序列化 key 和 value 和指定 partition(如果没有指定) 之前调用,就是说在把消息放到缓冲区之前调用。该方法可能再次对消息进行修改。
- void onAcknowledgement(RecordMetadata metadata, Exception exception): 该方法在消息从缓冲区提出来成功发送到了网络,或发送失败后都被调用
- void configure(Map<String, ?> configs): 在创建 KafkaProducer 之前还有一次机会对属性进行配置
现在用代码来演示统计提交到缓冲区,发送成功,发送失败的消息记录数
Producer 相关代码
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | public class Main {     private static final Logger logger = LoggerFactory.getLogger(Main.class);     public static void main(String[] args) {         String topic = "test_topic";         Properties props = new Properties();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");         props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "blog.yanbin.StatisticsProducerInterceptor");         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         Producer<String, String> producer = new KafkaProducer<>(props);         for (int i = 0; i < 5; i++) {             producer.send(new ProducerRecord<>(topic, String.valueOf(i), String.valueOf(i)));         }         producer.close();         logger.info(StatisticsProducerInterceptor.getRecordStatistics());     } | 
上面用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG 指定了一个 Interceptor 的实现类 StatisticsProducerInterceptor,它的代码如下
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | public class StatisticsProducerInterceptor implements ProducerInterceptor<String, String> {     private static final Logger logger = LoggerFactory.getLogger(StatisticsProducerInterceptor.class);     private static LongAdder submittedRecords = new LongAdder();     private static LongAdder deliveredRecords = new LongAdder();     private static LongAdder failedRecords = new LongAdder();     @Override     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {         ProducerRecord<String, String> updatedRecord = record.value().compareTo("3") < 0 ? record :             new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),                 record.value() + "+U"); //演示修改消息         logger.info("record: {} to be sent, updated value from {} to {}",             updatedRecord, record.value(), updatedRecord.value());         submittedRecords.increment(); //如果消息最终无法被序列化,将不被放到缓冲区,并触发 onAcknowledgement() 方法并带有异常         return updatedRecord;     }     @Override     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {         if(exception == null) {             deliveredRecords.increment();             logger.info("sent message: topic: {}, partition: {}, offset: {}, timestamp: {}, checksum: {}",                 metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp(), metadata.checksum());         } else {             failedRecords.increment();             logger.error("failed to send message: {}", metadata, exception);         }         logger.info(getRecordStatistics());     }     @Override     public void close() {         logger.info("producer closed");     }     @Override     public void configure(Map<String, ?> configs) {         logger.info("configuration: {}", configs);     }     public static String getRecordStatistics() {         return String.format("record statistics, submitted: %s, delivered: %s, failed: %s",             submittedRecords.longValue(), deliveredRecords.longValue(), failedRecords.longValue());     } } | 
执行后效果大概如下
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | 00:33:23 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 0 00:33:23 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 1 00:33:23 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=2, value=2, timestamp=null) to be sent, updated value from 2 to 2 00:33:23 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=3, value=3+U, timestamp=null) to be sent, updated value from 3 to 3+U 00:33:23 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=4, value=4+U, timestamp=null) to be sent, updated value from 4 to 4+U 00:33:23 [main] KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 00:33:23 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 6351, timestamp: 1541050403463, checksum: 1478612472 00:33:23 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 5, delivered: 1, failed: 0 00:33:23 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 6352, timestamp: 1541050403475, checksum: 4199907714 00:33:23 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 5, delivered: 2, failed: 0 00:33:23 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 6353, timestamp: 1541050403475, checksum: 3855131286 00:33:23 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 5, delivered: 3, failed: 0 00:33:23 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 6354, timestamp: 1541050403475, checksum: 1502822821 00:33:23 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 5, delivered: 4, failed: 0 00:33:23 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 6355, timestamp: 1541050403475, checksum: 3673351358 00:33:23 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 5, delivered: 5, failed: 0 00:33:23 [main] - producer closed 00:33:23 [main] Main - record statistics, submitted: 5, delivered: 5, failed: 0 | 
从日志中可以看到总共提交了 5 条消息,成功发送了 5 条消息,失败消息数为 0。消息能在 onSend(..) 函数中被修改。而且看起来好像完成把全部消息放到缓冲区后才开始发送消息,main 函数中数字改为 10,也差不多,onSend(..) 调用完 10 才开始真正发送消息到网络。但是注意到  onSend(..) 与 onAcknowledgement(..) 是由不同的线程调用的,所以它们不该存在先后顺序的。
若欲进一步验证,我们可以一次性发送 2000 条消息,修改 main 函数的循环次数为 2000,执行后再查看日志,以下是片断
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 0 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 1 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=821, value=821+U, timestamp=null) to be sent, updated value from 820 to 820+U 01:01:40 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 821, delivered: 1, failed: 0 01:01:40 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 19357, timestamp: 1541052100757, checksum: 791494235 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=855, value=855+U, timestamp=null) to be sent, updated value from 855 to 855+U 01:01:40 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 855, delivered: 2, failed: 0 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=1612, value=1612, timestamp=null) to be sent, updated value from 1612 to 1612 01:01:40 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 1611, delivered: 242, failed: 0 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=1614, value=1614, timestamp=null) to be sent, updated value from 1614 to 1614 01:01:40 [main] - record: ProducerRecord(topic=test_topic, partition=null, key=1999, value=1999, timestamp=null) to be sent, updated value from 1999 to 1999 01:01:40 [main] KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 01:01:41 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 2000, delivered: 1999, failed: 0 01:01:41 [kafka-producer-network-thread | producer-1] - sent message: topic: test_topic, partition: 0, offset: 21355, timestamp: 1541052100856, checksum: 2489747570 01:01:41 [kafka-producer-network-thread | producer-1] - record statistics, submitted: 2000, delivered: 2000, failed: 0 01:01:41 [main] - producer closed 01:01:41 [main] Main - record statistics, submitted: 2000, delivered: 2000, failed: 0 | 
日志说明了,从缓冲区中取消息发送到网络上并不需要等待所有的消息都放到缓冲区后再进行,它们是不同的两个线程。最终的结果是,待发送的消息都成功的发送到了 Kafka 代理上。
使用 ProducerInterceptor 还是可以比较准确的统计到待发送消息与成功送到网络的记录数,如果消息不能被序列化将直接带异常的触发 onAcknowledgement(..) 方法,并统计为发送失败记录。这也是我们想要的结果。
进一步,由以上三个数字,我们能够计算出在本地缓冲区中有多少待发送的记录,基于此可以进行前面 a 处的流量控制。比如说假设缓冲区中有 5000 条记录积压,那么完全可以让 producer.send(..) 方法停下来,等缓冲区降下来再继续。
