Kafka Producer 设置 Interceptor 来统计消息

Kafka 消息的 Producer 在调用 producer.send() 方法发送消息时会先把消息放到本地缓冲中,然后由  Kafka 网络线程从缓冲中提取消息再送到 Kafka 代理上去。本地缓冲区大小由 buffer.memory 来配置,默认为 32M(32 * 1024 * 1024L)。如果发消息到网络慢于提交消息到缓冲区的话,缓冲区就可能会满就无法接受新的消息,这时候就要依照 block.on.buffer.full 设置是否暂停还是抛出异常,默认为暂停 producer.send();暂停时间由 max.block.ms 决定,默认为 60 秒。producer.send() 返回一个 Future<RecordMetadata>, 也就是每次调用 send() 方法在缓冲区满后要等待 60 秒才能获得结果(异常)。

这里的关系是 send() --a--> 缓冲区 --b--> 发送到 Kafka 代理,自然要在 a 与 b 之间进行流量控制,如果 b 太慢,缓冲区满的话必须把 a 放慢下来。如果能基于缓冲区已使用大小来放缓 a 也是也行的,留待以后进行研究。本文提供另一种实现参考,为 Producer 配置一个 Interceptor 能够大致统计多少消息提交到缓冲区,多少消息从缓冲区取出。

Kafka 的所有配置项常量可以在这个页面 https://kafka.apache.org/0100/javadoc/constant-values.html 找到。对 interceptor.classes 的解释是:可以为 Producer 配置一个或多个 Interceptor(需要实现 ProducerInterceptor)。另外 Consumer 也有自己的  Interceptor(实现 ConsumerInterceptor)。 阅读全文 >>

Java 8 Map 中新增的方法使用记录

得益于 Java 8 的 default 方法特性,Java 8 对 Map 增加了不少实用的默认方法,像 getOrDefault, forEach, replace, replaceAll, putIfAbsent, remove(key, value), computeIfPresent, computeIfAbsent, compute 和merge 方法。另外与 Map 相关的 Map.Entry 也新加了多个版本的 comparingByKey 和 comparingByValue 方法。

为达到熟练运用上述除 getOrDefault 和 forEach 外的其他方法,有必要逐一体验一番,如何调用,返回值以及调用后的效果如何。看看每个方法不至于 Java 8 那么多年还总是  if(map.containsKey(key))... 那样的老套操作。

前注:Map 新增方法对  present 的判断是 map.containsKey(key) && map.get(key) != null,简单就是  map.get(key) != null,也就是即使 key 存在,但对应的值为 null 的话也视为 absent。absent 就是 map.get(key) == null。

不同 Map 实现对 key/value 是否能为 null 有不同的约束, HashMap, LinkedHashMap, key 和 value 都可以为 null 值,TreeMap 的 key 为不能为 null, 但 value 可以为 null, 而 Hashtable, ConcurrentMap 则 key 和 value 都不同为 null。一句话 absent/present 的判断是 map.get(key) 是否为 null。

方法介绍的顺序是它们相对于本人的生疏程度而定的。每个方法介绍主要分两部分,参考实现代码与示例代码执行效果。参考实现代码摘自 JDK 官方的 Map JavaDoc

getOrDefault 方法

本想忽略这个方法的测试,因为涉及到 key 存在,值为 null 的情况。当 key 不存在或相关联的值为 null 时,返回默认值,否则返回实际值。不要认为 key 存在时总是返回 map.get(key) 的值。

参考实现:

阅读全文 >>

Spring Boot 与 Logback 日志配置

本文记录 SpringBoot 与 Logback 是如何工作的,即观察 SpringBoot 中 Logback  是怎么一步一步初始化的。用以测试的 SpringBoot 版本是 1.5.16, 而非最新的 SpringBoot 2。关于 SpringBoot 日志的官方文档在 Logging, 但不太详细或透彻。本文也不承诺说就理解得更有深度,只是为官方文档提供更多方面的参考。

SpringBoot 默认使用 Slf4J + Logback 来记录日志,对于一个基本的依赖于

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

的 Spring Boot 项目,上面组件依赖了 spring-boot-starter-logging 组件,而该组件又引入了以下几个依赖

  1. logback-classic:   依赖了 Slf4J
  2. jcl-over-slf4j
  3. jul-to-slf4j
  4. log4j-over-slf4j

相当于把其他的日志框架全桥接到了 Slf4J + Logback 上去了。 阅读全文 >>

Amazon SQS 触发 AWS Lambda 及重试/DLQ

Amazon 在 2018 年 6 月份宣布可以设置用 SQS 来触发 Lambda,SQS 不再是单纯用于 ECS 服务中,或用于伸缩控制的。这儿就来亲自尝试一下用 SQS 驱动的 Lambda,以及要注意的要素。

首先使用 Java 编写 Lambda 的话,AWS 在 com.amazonaws:aws-lambda-java-events:2.20 版本开始加入了 com.amazonaws.services.lambda.runtime.events.SQSEvent 类,可是这个版本的 aws-lambda-java-events 是有所限的,因为 SQSEvent.SQSMessage 类是私有的,这就造成不能获取到 SQSEvent 中的记录数据。

//下面的操作代码无法编译,因为 SQSEvent.SQSMessage 是私有的,不可访问
SQSEvent.SQSMessage sqs = sqsEvent.getRecords().get(0); 
sqsEvent.getRecords().get(0).getBody();

Java 使用 SQS 来驱动 Lambda 的话,至少需要 com.amazonaws:aws-lambda-java-events:2.2.1 版本,从此 SQSEvent.SQSMessage 变成 public 了。该版本是于  2018 年 6 月传到 Maven 官方中央仓库的,这就是那时才能真正用来写 Java 的 SQS 触发的 Lambda.

同时此篇也是作为上文 AWS Lambda 重试与死信队列(DLQ) 的一个很重要的补充。在此也会验证 SQS 触发的 Lambda 的重试机制以及 DLQ 相关的内容。 阅读全文 >>

AWS Lambda 重试与死信队列(DLQ)

AWS Lambda 允许设置 Debugging and error handling, 在 Lambda 出现异常,达到最大的重试次数后,把以下信息放到选择的 SNS 或 SQS 主题作为死信队列(DLQ - Dead Letter Queue),包括

  1. 原始 Lambda 接收到的消息(基于 SNS 和 SQS 消息的总大小,可能会被截取,本人猜测,尤其是 Kinesis 的消息会比较大)
  2. 原始 Lambda 的 RequestId
  3. ErrorCode(三位数字的 HTTP 错误码)
  4. ErrorMessage, 即原 Lambda 抛出 Exception 的 getMessage() 信息,截取 1 KB 字符串

并且 Lambda 要使用 DLQ 的话还必须设置当前 Lambda 的 IAM role 有对于 SNS/SQS 主题相应的 sns:Publish 和 sqs:SendMessage 权限。

AWS Lambda 基本重试规则:对于 Kinesis 消息会无限重试直至消息过期,对于 SNS 或 SQS 的消息出现异常后会再重试两次。参考:AWS Lambda Retry Behavior

而在重试次数用完后仍然失败,并且设置了 DLQ 的话就会发送消息到 DLQ 中去。 阅读全文 >>

跳过构造函数创建 Java 对象(测试)

如果一个 Java 类在初始化时会有外部依赖,这就给单元测试创建它的实例时造成困难。当然被测试类可以改造为依赖全部构造时注入或创建实例后延迟注入,这里不考虑这种改造。

可以参看我以前一篇类似的日志:使用 JMockit 来 mock 构造函数

来说下面的例子

假如上面的代码是不能改动的,并且在 new PriceInquiry() 时依赖于网络环境,所以单机情况不能创建成功。也就使得测试时试图

new OrderService();

会失败。并且试图用 Mockito 的 @InjectMocks 也不行 阅读全文 >>

如何快乐的使用 Java 8 的 Lambda

Java 8 的 Lambda 特性较之于先前的泛型加入更能鼓舞人心的,我对 Lambda 的理解是它得以让 Java 以函数式思维的方式来写代码。而写出的代码是否是函数式,并不单纯在包含了多少 Lambda 表达式,而在思维,要神似。

实际中看过一些代码,为了 Lambda 表达式而 Lambda(函数式),有一种少年不识愁滋味,为赋新词强说愁的味道。从而致使原本一个简单的方调用硬生生的要显式的用类如 apply(), accept(obj) 等形式。不仅造成代码可读性差,且可测试性也变坏了。

为什么说的是快乐的使用 Java 8 的 Lambda 呢?我窃以为第一个念头声明 Lambda 表达式为实例/类变量(像本文第一段代码那样),而不是方法的,一定会觉得如此使用方式很快乐的。所谓独乐乐,不如众乐乐;独乐乐,众不乐定然是更大的快乐; 更极致一些,不管什么时候必须是:我快乐,所以你也快乐。

一方面也在于 Java 还没有进化到 JavaScript 或  Scala 那样的水平,JavaScript 的函数类型变量,不一定要用 apply 或 call, 直接括号就能实现方法调用。Scala 的函数类型用括号调用也会自动匹配到 apply 或 update 等方法上去。 阅读全文 >>

Jackson 序列化忽略指定类型的属性

本文准确来讲是探讨如何用 Jackson 来序列化 Apache avro 对象,因为简单用 Jackson 来序列化 Apache avro 对象会报错。原因是序列化 Schema getSchema() 时会报错,后面会讲到,需要序列化时忽略该属性。那么能不能在 getSchema() 上加上 @JsonIgnore 来忽略该属性呢?原理上是通的。不过手工修改的 avsc 生成的 Java 文件随时会因为重新编译而还原,所以不太具有实际可操作性,当然通过定制编译 avsc 用的模板文件来加入 @JsonIgnore 是另一回事。

由于不能在要忽略的字段上添加 JsonIgnore 来控制,而如果我们明确了要忽略的字段类型的话,是能够定制 Jackson 的  ObjectMapper  来屏蔽某个特定的类型。来看下面序列化 Apache avro 对象的例子:

假设我们有一个 Apache 的 Schema 文件 user.avsc, 内容如下: 阅读全文 >>

Java8 Optional 几个常见错误用法

Java 8 引入的 Optional 类型,基本是把它当作 null 值优雅的处理方式。其实也不完全如此,Optional 在语义上更能体现有还是没有值。所以它不是设计来作为 null 的替代品,如果方法返回 null 值表达了二义性,没有结果或是执行中出现异常。

在 Oracle  做  Java 语言工作的  Brian Goetz 在 Stack Overflow 回复  Should Java 8 getters return optional type? 中讲述了引入  Optional 的主要动机。

Our intention was to provide a limited mechanism for library method return types where there needed to be a clear way to represent “no result”, and using null for such was overwhelmingly likely to cause errors.

说的是  Optional 提供了一个有限的机制让类库方法返回值清晰的表达有与没有值,避免很多时候 null 造成的错误。并非有了  Optional 就要完全杜绝 NullPointerException。

在 Java 8 之前一个实践是方法返回集合或数组时,应返回空集合或数组表示没有元素; 而对于返回对象,只能用 null 来表示不存在,现在可以用  Optional 来表示这个意义。

自 Java8 于  2014-03-18 发布后已 5 年有余,这里就列举几个我们在项目实践中使用 Optional 常见的几个用法。

Optional 类型作为字段或方法参数

这儿把 Optional  类型用为字段(类或实例变量)和方法参数放在一起来讲,是因为假如我们使用 IntelliJ IDEA 来写 Java 8 代码,IDEA 对于  Optional 作为字段和方法参数会给出同样的代码建议: 阅读全文 >>

Java 9 - 说说响应式流

最初看到 Java 9 的这个新特性没太在意,及至重新关注到 Spring 5/Springboot 2 的响应式编程的时候才真正重视起 Reactive Streams(响应式流或反应式流)。应用响应式流的编程也就叫做响应式编程(Reactive Programming),无论是翻译成反应式编程都有些令人摸不准头脑。与此对应的在 Web 设计方面有一个叫做响应式 Web 设计(Responsive web design),两个词都译作响应式,却有些差别,大概是 Reactive 被译为反应的原因之一。

通过这里对  Reactive Streams 的学习,主要目的是为了进一步掌握 Spring 5/Springboot 2 的响应式 MVC 作铺垫的,不至于猛然间见 Flux, Mono 而不知所措。

函数式响应式编程概念最早来自于九十年代末,这也激发了微软的 Erik Meijer 设计开发了 .NET 的  Rx(Reactive eXtension) 库,以及到后来 Netflix 的  RxJava 也与他有关系。Reactive Stream 更像是一种编程模式,致力于解决一个生产者产生一系列消息,一个或多个去消费它们的问题。两者的名词我们会用: producer-consumer(生产者-消费者), source/sink(水源/水槽, Akka Stream 用了这个概念), publisher-subscriber(发布者-订阅者)。

既然 Reactive Stream 和 Java 8 引入的 Stream 都叫做流,它们之间有什么关系呢?有一点关系,Java 8 的 Stream 主要关注在流的过滤,映射,合并,而  Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调。 阅读全文 >>