一个简单的 Java 自动批处理队列

实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。

这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque 来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize, 的那些记录可主动调用 completeAll() 函数或在达到 timeout 后来触发批处理,并且结束队列内的循环线程。

注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。

下面是 BatchQueue 的简单实现 阅读全文 >>