使用 Java 转换 Apache Avro 为 Parquet 数据格式(依赖更新)

在上篇 使用 Java 转换 Apache Avro 为 Parquet 数据格式 实现把 Avro 数据转换为 Parquet 文件或内存字节数组,并支持 LogicalType。其中使用到了 hadoop-core 依赖,注意到它传递的依赖都非常老旧,到官方 Maven 仓库一看才发现还不是一般的老

长时间无人问津的项目,那一定有它的替代品。对啦,据说 hadoop-core 在 2009 年 7 月份更名为 hadoop-common 了,没找到官方说明,只看到 StackOverflow 的 Differences between Hadoop-coomon, Hadoop-core and Hadoop-client? 是这么说的。 应该是这么个说法,不然为何 hadoop-core 一直停留在  1.2.1 的版本,而且原来 hadoop-core 中的类在 hadoop-common 中可以找到,如类 org.apache.hadoop.fs.Path。不过在 hadoop-core-1.2.1 中的 fs/s3 包不见,这么重要的 s3 文件系统没了。 阅读全文 >>

Mockito 3.4.0 开始可 Mock 静态方法

Java 单元测试最趁手的 Mock 组件当属 Mockito,虽然它最初是基于继承来实现  Mock 的,所以对私有方法,私有属性,静态方法,final 类,final 方法,构造函数无能为力。于是有时不得不引入 JMockit 或 PowerMockit 来辅助。不过现在的 Mockito 功力有所增强。

首先是 Mockito 2.1.0 开始可以 Mock final 类和 final 方法,要在 classpath 下创建个文件 mockito-extensions/org.mockito.plugins.MockMaker, 内容为 mock-maker-inline。之前写过一篇介绍:Mockto 也能 Mock final 类和 final 方法了,其中也探索了它的实现细节,使用到了 ByteBuddy 修改字节码。

Mockito 3.4.0 通过类似的 mockto-extensions 扩展的方式,实现了对静态方法的 Mock。所有使用到的接口是 org.mockito.MockedStatic,它当前在 Mockito 3.7.7 中还是一个试验性方法 @Incubating,能拿来用就行。 阅读全文 >>

解决 jvisualvm 启动后长时间 Computing description... 的问题

Java 虚拟机分析工具用 JDK 自带的 jconsole, jvisualvm, 和  jmc(Java Mission Control) 就已经非常好了,还真极少情况下(甚至没有)非得用商业的 Profiler 工具如 YourKit Java Profiler 或 JProfiler 的情况。用于实时观察 JVM 的内存, CPU, 线程等运行状况,对比 Heap 快照,发现线程死锁的应用情景,我比较喜欢用 jvisualvm(VisualVM)。

有很长一段时间,因为在家办公司,只要连接到公司的 VPN 后再执行 jvisualvm 来打开 VisualVM 时,会有很长的时间(可能长达 10 几分钟)卡在窗口右下角状态栏的 Computing description...,要等到它消失后才能开始连接 JVM,这时候我的 Java 应用可能早就退出了。要是本地不连 VPN 的话就正常,启动 VisualVM 是正常的,但调试有些工作项目又必须连接公司的 VPN。

这种使用 VisualVM 的体验有如恶梦一般,还是有经常要用到 VisualVM 的需求,所以再也不能忍受这种无谓的等待。依然是 Google + StackOverflow 的模式,找到原来罪魁祸首是 /etc/hosts 中的 127.0.0.1 这个条目。 阅读全文 >>

Mockito 的 anyString(), any(Foo.class) 等不能匹配 null 值

使用 Mockito Mock 方法式,一直以为可以用 anyString(), any(Foo.class) 等匹配 null 值,其实不行,null 值必须显式的用 null, 或 eq(null) 来匹配。anyString(), anyInt() 等只能匹配非 null 值,查看它们的返回值实际是 "" 和 0 等, 而更为特别的是 any(Foo.class) 看到的是 null, 仍然不能匹配 null 值。进一步用 Mockito.mockingDetails(mock).printInvocations() 打印出的内容,anyString(), any(Foo.class) 都会显示为 null 值。

说的有点罗嗦,看下面的例子, 被测试类 UserDao,sql 和 sqlArguments 由各自的 setter 方法来控制,默认它们都为 null 阅读全文 >>

Java 普通线程池与 ForkJoinPool 的效果对比

Java 多线程编程常用的一个接口是 ExecutorService, 其实就一个线程池的接口,一般由两种方式创建线程池,一为 Executors 的工厂方法,二则创建 ForkJoinPool 实例,当然也有直接使用 ThreadPoolExecutor 的。

关于什么时候用 ForkJoinPool 或普通的线程池(如 Executors.newFixedThreadPool(2) 或 new ThreadPoolExecutor(...)) 不过多的述说。如果要运用到 ForkJoinTask 的话就要用 ForkJoinPool, 它是 Java7 新引入的线程池类型。

关于 Java7 的 fork-join 框架可参考很多年前的一篇 Java 的 fork-join 框架实例备忘。ForkJoinPool 的一个典型特征是能够进行 Work stealing。它也是 Akka actor 效率高效的一个有力保证。

本文只能某一种情形下在选择普通线程池与 ForkJoinPool 的区别,直接说吧,普通线程更容易造成死锁,而 ForkJoinPool 却能应对相同的状况。 阅读全文 >>

运行时动态创建 Spring Bean

通常我们注册 Spring Bean 是通过像 @Named, @Bean, @Component, @Service 这样的注解来注册的,或者用更为古老的 XML 配置文件的方式。难免有时候要根据实际业务需求在 Spring  运行期间动态注册 Spring Bean, 比如基本某种形式的配置文件或系统属性来注册相应的 Bean, 这好像又回到了 XML 文件注册方式,也不尽然。

那为什么在运行期还要去注册 Spring Bean 呢,直接 new 对象不行吗?当然行得通,不过这样的话就不能更好的使用到  Spring IOC 的好处了。像待注册的 Bean 构造函数可以直接用到其他的 Spring  对象,或 @Value 引入环境变量,还有 @PostContruct 这样的行为。

最初思考如何注册 Spring Bean 时还是费了不少周折,如今清晰了许多。了当的说,不管是 Spring  初始时还是运行时,注册 Bean 的关键(应是唯一) 入口就是 BeanDefinitionRegistry 接口的方法 阅读全文 >>

用 Java 把内存中的表格数据合并到 SQL Server 表中

承接近两年前的 用 PreparedStatement 向 SqlServer 中一次性插入多条记录,其文后用 User-Defined Type 可用下面简单的代码把 Java 本地内存中表格数据一股脑的刷入到 SQL Server 数据库表格中

String sql = "INSERT INTO Customers SELECT * FROM ?";
SQLServerPreparedStatement pstmt = (SQLServerPreparedStatement) conn.prepareStatement(sql);
SQLServerDataTable dataTable = ..... // 生成好的本地表格数据
pstmt.setStructured(1, "CustomersTableType", dataTable);
pstmt.execute();

上面的 dataTable 本地表格类型变量容易生成,关键是必须在正式数据库数须预先用 CREATE TYPE 创建好 CustomersTableType 这个用户自定义类型,这会受权限的约束。如果由 DBA 预先完全依照目标表来创建好这个用户自定义类型,又无法确定是否总是要操作该目标表的所有字段。

数据库是允许我们创建临时的用户自定义类型 阅读全文 >>

等待所有的 CompletableFuture 完成

现实中有这样的用法,创建一批在线程池中运行的  CompletableFuture 实例,然后等待它们全部执行完再继续后面的操作。比如说 AWS 的 Lambda, 单单提交任务到线程池,不等待所有任务全部完成便退出主线程的话,AWS 便认为 Lambda 执行完毕,无视线程池中正在执行的任务而强行结束该 Lambda 实例。

以往我们通常的作法如下

如果所有的任务均无异常,上面的代码能得到预想的结果,只要上面打印出 all done 的话真的就是表明所有的任务都完成了。但是在循环 join futures 中的每一个 CompletableFuture 时,只要碰到任意一个任务有异常时,便立即抛出给外部线程,不在乎是否还有其他任务正在执行。此时,如果外部未予捕获,当然 阅读全文 >>

转换 Iterator 为 Java 8 的 Stream

Java 中有关抽象的可遍历的对象有 Iterator, Iterable 和 Java 8 的 Stream, Iterable 可简单的用如下代码转换为 Stream

StreamSupport.stream(iterable.spliterator(), false)

再回过头来,为什么要把 Iterator 或 Iterable 转换为 Stream, 因为 Iterator 和 Iterable 只提供有限的遍历操作,如 Iterator 接口的全部四个方法

hasNext()
next()
forEachRemaining(consumer)
remove()

同样 Iterable 也只有 iterator(), forEach(consumer), 和 spliterator() 方法。而 Java 8 的 Stream 就大不一样的,带有大量的链式操作方法,如 filter, map, flatMap, collect 等。

因此如果我们已有一个 Iterator 类型,能够被转换为 Stream 类型的话将会大大简化后续的转换,处理操作。具体的从 Iterator 到 Stream 的转换方式有两种 阅读全文 >>

试手 RxJava 2.x 及对线程的初步理解

在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。

Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西

而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个  rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。 阅读全文 >>