被问及 Java 多线程,多会想到 Thread, Runnable,更通常是用 new Thread(){public void run(){...}}.start() 来启动一个线程。那都是 JDK 1.5 之前的年代了,现在还这么回答就 Out 了。用用 JDK 1.5 给我们带来的 java.util.concurrent 吧,更酷了。这里不涉及它的并发集合类,同步互斥机制,只说线程及线程池的应用举例。
1. 新的启动线程的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public static void main(String[] args) throws Exception { Callable<Integer> callable = new Callable<Integer>() { public Integer call() throws Exception { System.out.println("callable executed."); return new Random().nextInt(100); } }; FutureTask<Integer> future = new FutureTask<Integer>(callable); new Thread(future).start(); System.out.println("do your things here"); System.out.println(future.get()); } |
这里的 Callable,与曾经的 Runnable 很相似,只不过它的 call() 方法是有返回值的。因为 FutureTask 实现了 Runnable 和 Future,所以可以放在 new Thread(future) 中 start()。之后可随时获取线程执行的返回值,future.get() 时一定会等待线程执行完。
这就是 Future 模式,在 PlayFramwork 中的异步就是这样使用的,F.Promise 和 Controller 的 await() 的处理方式。
显然既然是线程, call() 方法中的代码会在子线程中执行。
再请记住下面的 Callable.call() 方法都会在新的子线程中执行。
2. 无法割舍的 ExecutorService,submit() 任务方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newSingleThreadExecutor(); Future<Integer> future = threadPool.submit(new Callable<Integer>(){ public Integer call() throws Exception { System.out.println("callable executed."); return new Random().nextInt(200); } }); System.out.println("do your things"); System.out.println(future.get()); threadPool.shutdown(); } |
Executors 中有多个方法可可返回 ExecutorService, 诸如:newSingleThreadExecutor(), newCachedThreadPool(), newFixedThreadPool, newScheduledThreadPool(), newSingleThreadScheduledExecutor()。由这些方法可知它可以用来使用线程池来执行任务,可以进行任务调度。
用完 ExecutorService 后,需调用它的 shutdown() 关掉,否则程序不会结束。
3. 如果有一组任务,它们要被放在线程池中执行,待到全部任务执行完后再汇总结果,该怎么做呢,join 线程吗?那会显得有些高深了,但有了 ExecutorService 这一切便迎韧而解了,看吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newCachedThreadPool(); List<Callable<String>> callables = new ArrayList<Callable<String>>(); for(int i = 1; i < 5; i++){ final int id = i; callables.add(new Callable<String>(){ public String call() throws Exception { return "taskID" + id; } }); } List<Future<String>> futures = threadPool.invokeAll(callables); for(Future<String> future: futures){ System.out.println(future.get()); } threadPool.shutdown(); } |
传统的线程方式,别说是线程池和等待所有线程结束,就是处理最终来处理线程执行得到的结果都不易。
我比较喜欢上面的 invokeAll() 后处理一个 Future 集合的方式。
这里必须等待所有的线程全部执行完毕才去输出结果,倘若要在每个单独的线程执行完后立即输出自己的结果该怎么办呢?也就是不想长时间去等待,最后一大堆结果倒出来,而是希望一个个执行结果逐个飚出来。那就不用关心 Future 的内容,在 Callable 中输出,像
1 2 3 4 5 6 7 8 9 10 11 |
for(int i = 1; i < 5; i++){ final int id = i; callables.add(new Callable<String>(){ public String call() throws Exception { System.out.println("taskID" + id); return "taskID" + id; //这个返回值是装模作样的 } }); } List<Future<String>> futures = threadPool.invokeAll(callables); |
4. 还可以用 ExecutorCompletionService:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newCachedThreadPool(); CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool); for(int i = 1; i < 5; i++){ final int taskID = i; cs.submit(new Callable<Integer>(){ public Integer call() throws Exception { return taskID; } }); } for(int i=1; i<5; i++){ System.out.println(cs.take().get()); } threadPool.shutdown(); } |
上面, cs.take().get() 依次按序遍历前面相同顺序提交的任务结果,不能直接命中执行的结果。
有必要提一下 ScheduledExecutorService 这个接口,它的 schedule(task, initDelay), scheduleAtFixedRate() 和 scheduleWithFixedDelay() 让你做到比 Timer,TimerTask 更强的任务调度,当然替代不了 Quartz 的。
参考:1. Java线程(六):Callable和Future
2. Java多线程之ExecutorService
3. java.util.concurrent - invokeAll via ExecutorService