一个简单的 Java 自动批处理队列
实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。
这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是
注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。
下面是 BatchQueue 的简单实现
客户端 Client 的使用代码如下:
运行效果
循环线程只在添加新数据到队列才开始启动,如果队列为空了,会在下一次循环中把检测队列的线程关闭掉,如果有新数据进来再次开启队列检测线程。
调用
如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。 永久链接 https://yanbin.blog/simple-java-auto-batch-queue/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是
BlockingQuque 来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize, 的那些记录可主动调用 completeAll() 函数或在达到 timeout 后来触发批处理,并且结束队列内的循环线程。注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。
下面是 BatchQueue 的简单实现
1package cc.unmi;
2
3import java.util.ArrayList;
4import java.util.List;
5import java.util.concurrent.BlockingQueue;
6import java.util.concurrent.LinkedBlockingQueue;
7import java.util.concurrent.atomic.AtomicBoolean;
8import java.util.concurrent.atomic.AtomicLong;
9import java.util.function.Consumer;
10
11public class BatchQueue<T> {
12 private final int batchSize;
13 private final Consumer<List<T>> consumer;
14 private final int timeoutInMs;
15
16 private AtomicBoolean isLooping = new AtomicBoolean(false);
17 private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
18
19 private AtomicLong start = new AtomicLong(System.currentTimeMillis());
20
21 public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) {
22 this.batchSize = batchSize;
23 this.timeoutInMs = timeoutInMs;
24 this.consumer = consumer;
25 }
26
27 public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
28 this(batchSize, 500, consumer);
29 }
30
31 public boolean add(T t) {
32 boolean result = queue.add(t);
33 if(!isLooping.get() && result) {
34 isLooping.set(true);
35 startLoop();
36 }
37 return result;
38 }
39
40 public void completeAll() {
41 while (!queue.isEmpty()) {
42 drainToConsume();
43 }
44 }
45
46 private void startLoop() {
47 new Thread(() -> {
48 start = new AtomicLong(System.currentTimeMillis());
49 while(true) {
50 long last = System.currentTimeMillis() - start.get() ;
51 if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) {
52 drainToConsume();
53 } else if(queue.isEmpty()) {
54 isLooping.set(false);
55 break;
56 }
57 }
58 }).start();
59 }
60
61 private void drainToConsume() {
62 List<T> drained = new ArrayList<>();
63 int num = queue.drainTo(drained, batchSize);
64 if(num > 0) {
65 consumer.accept(drained);
66 start.set(System.currentTimeMillis());
67 }
68 }
69}客户端 Client 的使用代码如下:
1package cc.unmi;
2
3import java.util.Scanner;
4
5public class Client {
6 public static void main(String[] args) {
7 BatchQueue<String> batchQueue = new BatchQueue<>(3, System.out::println);
8 while (true) {
9 String line = new Scanner(System.in).nextLine();
10 if (line.equals("done")) {
11 batchQueue.completeAll();
12 break;
13 }
14 batchQueue.add(line);
15 }
16 }
17}运行效果
循环线程只在添加新数据到队列才开始启动,如果队列为空了,会在下一次循环中把检测队列的线程关闭掉,如果有新数据进来再次开启队列检测线程。调用
compleateAll() 方法或者在队列空闲了指定的 timeout 时间后,队列中剩下的不足数额的记录也会被处理掉。如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。 永久链接 https://yanbin.blog/simple-java-auto-batch-queue/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。