一个简单的 Java 自动批处理队列

实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。

这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque 来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize, 的那些记录可主动调用 completeAll() 函数或在达到 timeout 后来触发批处理,并且结束队列内的循环线程。

注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。

下面是 BatchQueue 的简单实现

客户端 Client 的使用代码如下:

运行效果

循环线程只在添加新数据到队列才开始启动,如果队列为空了,会在下一次循环中把检测队列的线程关闭掉,如果有新数据进来再次开启队列检测线程。

调用 compleateAll() 方法或者在队列空闲了指定的 timeout 时间后,队列中剩下的不足数额的记录也会被处理掉。

如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。

本文链接 https://yanbin.blog/simple-java-auto-batch-queue/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

2 Comments
Inline Feedbacks
View all comments
who
who
5 years ago

batchQueue.shutdown();哪来呢?