实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。
这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque
来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize
, 的那些记录可主动调用 completeAll()
函数或在达到 timeout 后来触发批处理,并且结束队列内的循环线程。
注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。
下面是 BatchQueue 的简单实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
package cc.unmi; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public class BatchQueue<T> { private final int batchSize; private final Consumer<List<T>> consumer; private final int timeoutInMs; private AtomicBoolean isLooping = new AtomicBoolean(false); private BlockingQueue<T> queue = new LinkedBlockingQueue<>(); private AtomicLong start = new AtomicLong(System.currentTimeMillis()); public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) { this.batchSize = batchSize; this.timeoutInMs = timeoutInMs; this.consumer = consumer; } public BatchQueue(int batchSize, Consumer<List<T>> consumer) { this(batchSize, 500, consumer); } public boolean add(T t) { boolean result = queue.add(t); if(!isLooping.get() && result) { isLooping.set(true); startLoop(); } return result; } public void completeAll() { while (!queue.isEmpty()) { drainToConsume(); } } private void startLoop() { new Thread(() -> { start = new AtomicLong(System.currentTimeMillis()); while(true) { long last = System.currentTimeMillis() - start.get() ; if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) { drainToConsume(); } else if(queue.isEmpty()) { isLooping.set(false); break; } } }).start(); } private void drainToConsume() { List<T> drained = new ArrayList<>(); int num = queue.drainTo(drained, batchSize); if(num > 0) { consumer.accept(drained); start.set(System.currentTimeMillis()); } } } |
客户端 Client 的使用代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
package cc.unmi; import java.util.Scanner; public class Client { public static void main(String[] args) { BatchQueue<String> batchQueue = new BatchQueue<>(3, System.out::println); while (true) { String line = new Scanner(System.in).nextLine(); if (line.equals("done")) { batchQueue.completeAll(); break; } batchQueue.add(line); } } } |
运行效果
循环线程只在添加新数据到队列才开始启动,如果队列为空了,会在下一次循环中把检测队列的线程关闭掉,如果有新数据进来再次开启队列检测线程。
调用 compleateAll()
方法或者在队列空闲了指定的 timeout 时间后,队列中剩下的不足数额的记录也会被处理掉。
如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。
本文链接 https://yanbin.blog/simple-java-auto-batch-queue/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
batchQueue.shutdown();哪来呢?
还是你看的仔细,是
completeAll()
方法,不是shutdown()
, 改过的方法名。