一个简单的 Java 自动批处理队列

实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。


这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque 来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize, 的那些记录可主动调用 completeAll() 函数或在达到 timeout 后来触发批处理,并且结束队列内的循环线程。

注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。

下面是 BatchQueue 的简单实现
 1package cc.unmi;
 2
 3import java.util.ArrayList;
 4import java.util.List;
 5import java.util.concurrent.BlockingQueue;
 6import java.util.concurrent.LinkedBlockingQueue;
 7import java.util.concurrent.atomic.AtomicBoolean;
 8import java.util.concurrent.atomic.AtomicLong;
 9import java.util.function.Consumer;
10
11public class BatchQueue<T> {
12    private final int batchSize;
13    private final Consumer<List<T>> consumer;
14    private final int timeoutInMs;
15
16    private AtomicBoolean isLooping = new AtomicBoolean(false);
17    private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
18
19    private AtomicLong start = new AtomicLong(System.currentTimeMillis());
20
21    public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) {
22        this.batchSize = batchSize;
23        this.timeoutInMs = timeoutInMs;
24        this.consumer = consumer;
25    }
26
27    public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
28        this(batchSize, 500, consumer);
29    }
30
31    public boolean add(T t) {
32        boolean result = queue.add(t);
33        if(!isLooping.get() && result) {
34            isLooping.set(true);
35            startLoop();
36        }
37        return result;
38    }
39
40    public void completeAll() {
41        while (!queue.isEmpty()) {
42            drainToConsume();
43        }
44    }
45
46    private void startLoop() {
47        new Thread(() -> {
48            start = new AtomicLong(System.currentTimeMillis());
49            while(true) {
50                long last = System.currentTimeMillis() - start.get() ;
51                if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) {
52                    drainToConsume();
53                } else if(queue.isEmpty()) {
54                    isLooping.set(false);
55                    break;
56                }
57            }
58        }).start();
59    }
60
61    private void drainToConsume() {
62        List<T> drained = new ArrayList<>();
63        int num = queue.drainTo(drained, batchSize);
64        if(num > 0) {
65            consumer.accept(drained);
66            start.set(System.currentTimeMillis());
67        }
68    }
69}

客户端 Client 的使用代码如下:
 1package cc.unmi;
 2
 3import java.util.Scanner;
 4
 5public class Client {
 6    public static void main(String[] args) {
 7        BatchQueue<String> batchQueue = new BatchQueue<>(3, System.out::println);
 8        while (true) {
 9            String line = new Scanner(System.in).nextLine();
10            if (line.equals("done")) {
11                batchQueue.completeAll();
12                break;
13            }
14            batchQueue.add(line);
15        }
16    }
17}

运行效果

循环线程只在添加新数据到队列才开始启动,如果队列为空了,会在下一次循环中把检测队列的线程关闭掉,如果有新数据进来再次开启队列检测线程。

调用 compleateAll() 方法或者在队列空闲了指定的 timeout 时间后,队列中剩下的不足数额的记录也会被处理掉。

如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。 永久链接 https://yanbin.blog/simple-java-auto-batch-queue/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。