在 NodeJS 中进行异步操作很简单,而 Java 到了 7 开始才支持异步的 IO 操作。虽然之前的版本有引入非阻塞 IO,但编码中还不易体现出它的优越性。亮一下 NodeJS 用异步 IO 的例子:
var fs = require('fs');
fs.readFile('Test.scala', 'utf-8', function(err, data){
if( !err ) {
console.log(data);
}
});
console.log('continue doing other thins');
执行输出是
continue doing other things
CONTENT FROM FILE Test.scala
对的,理想中的异步操作就是,传递回调函数来读取文件,读取完成后招待回调,且不阻塞主线程。
在 Java 8 之前,因为没有 Lambda 支持只能应用内部类的方式。JDK 提供了以下异步 Channel 来实现异步操作
AsynchronousFileChannel
AsynchronousSocketChannel
AsynchronousServerSocketChannel
AsynchronousDatagramChannel
获取异步结果的方式有 Future 和 CompletionHandler
例子看下两种方式的代码实现:
1. CompletionHandler 回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public static void main(String[] args) throws IOException, InterruptedException { Path path = Paths.get("Test.Scala"); AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(path); ByteBuffer buffer = ByteBuffer.allocate(100); asynchronousFileChannel.read(buffer, 0, "attachment information", new CompletionHandler<Integer, Object>() { @Override public void completed(Integer readCount, Object attachment) { System.out.println(attachment); System.out.println(new String(buffer.array())); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("Error:" + exc); } }); System.out.println("continue doing other things"); Thread.sleep(1000); } |
参数 Object attachment 可以用来携带你自己需要的信息,上面代码只读取一次。如果大文件需要多次读缓冲,这就要用递归,麻烦些了。
完整的读取文件所有内容的代码可参考:Reading from a file using the AsynchronousFileChannel class
上面代码输出
continue doing other things
attachment information
CONTENT FROM FILE Test.scala
也说明了读取时不会阻断程序继续往下执行,读取就绪后呼叫 CompletionHandler 实例。
2. Future 方式
1 2 3 4 5 6 7 8 9 10 11 |
public static void main(String[] args) throws Exception { AsynchronousFileChannel asynFileChannel = AsynchronousFileChannel.open(Paths.get("Test.Scala"), StandardOpenOption.READ); ByteBuffer buffer = ByteBuffer.allocate(100); Future<Integer> future = asynFileChannel.read(buffer, 0); while (!future.isDone()) { System.out.println(new String(buffer.array(), 0, result.get())); } } |
在 while(!result.isDone()) 处仍然阻碍了主线程继续往下执行。
或者要自己启动线程来处理 future 而影响主线程往下执行,或者要 ExecutorService
帮忙。
个人认为只有像 NodeJS 或 CompletionHandler 那样隐藏线程细节的异步才是真异步编程。又由于 CompletionHandler 不是一个 SAM 类型的接口,所以也就无法直接转换为 Lambda 表达式。
我们可以把 CompletionHandler 拆成两个 SAM 来应用 Java8 的 Lambda 表达式,下面是我做的一个尝试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
package test; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Paths; public class TestNIO2 { public static void main(String[] args) throws IOException, InterruptedException { read(AsynchronousFileChannel.open(Paths.get("Test.Scala")), ByteBuffer.allocate(100), 0, "start reading", (readCount, buffer, attachment) -> { System.out.println(attachment); System.out.println(new String(buffer.array(), 0, readCount)); }, (exc, attachment) -> { System.out.println("Error:" + exc); }); System.out.println("continue doing other things"); Thread.sleep(1000); } static <A> void read(AsynchronousFileChannel channel, ByteBuffer buffer, long position, A attachment, final ReadSuccess<A> success, final ReadFailed<A> failure) { channel.read(buffer, 0, attachment, new CompletionHandler<Integer, A>() { @Override public void completed(Integer readCount, A attachment) { success.apply(readCount, buffer, attachment); } @Override public void failed(Throwable exc, A attachment) { failure.apply(exc, attachment); } }); } } interface ReadSuccess<A> { void apply(Integer readCount, ByteBuffer buffer, A attachment); } interface ReadFailed<A> { void apply(Throwable exc, A attachment); } |
注意,上面的代码仍然只读取了一次数据到 ByteBuffer 中,文件大小超过 100 的字节将读不全,需要进一步递归处理。若只用来读取文件我们可以把 read() 的第二,三位置上的参数 ByteBuffer buffer, Long position 隐藏起来,可完全实现 NodeJS 的调用方式。
本文链接 https://yanbin.blog/java-nio2-aio-readfile/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。