IO模式基本分为三种:同步阻塞 同步非阻塞 异步非阻塞
同步阻塞IO
Java中提供的标准IO输入输出, 同步IO, 一个数据流即需要一个线程去读取, 此时, 线程将会等待数据流准备时间, 如果未准备好, 则需要一直阻塞, 可以采取线程池的方式对该读取方式进行优化
@Test fun testBlockingIO() { val serverSocket = ServerSocket(9000) println("server: 服务端启动,等待连接...") // 启动客户端线程 Thread { println("client: 开始连接服务端...") // 指定服务端地址及端口 val socket = Socket("localhost", 9000) val input = DataInputStream(socket.getInputStream()) val output = DataOutputStream(socket.getOutputStream()) println("client: 处理3秒钟") Thread.sleep(3000) println("client: 发送消息") output.writeUTF("hello server!") output.flush() println("client: 接收数据[${input.readUTF()}]")\ }.start() // 服务器阻塞5秒钟, 等待客户端连接 val socket = serverSocket.accept() println("server: 有客户端连接了") val input = DataInputStream(socket.getInputStream()) val output = DataOutputStream(socket.getOutputStream()) println("server: 接收数据[${input.readUTF()}]") println("server: 服务处理5秒钟...") Thread.sleep(5000L) println("server: 发送消息") output.writeUTF("hello client!") output.flush() }
同步非阻塞IO
示例代码如下, 与普通IO最大的区别在于, 当去读取IO数据时, 无论数据有没有准备好, 会立即返回, 线程需要不断的轮询去查看socket是否准备好, 相对于同步阻塞IO, 线程不用等在那里等待IO就绪, 但同时浪费了大量cpu的轮询时间
@Test fun testNonBlockingIO(){ // 服务端 val channel = DatagramChannel.open().apply { configureBlocking(false) // 非阻塞 bind(InetSocketAddress(9000)) // 绑定端口 } Thread { Thread.sleep(5000) val datagramChannel = DatagramChannel.open().apply { connect(InetSocketAddress("localhost", 9000)) } println("client: 发送数据了") datagramChannel.write(ByteBuffer.wrap("hello nio server".toByteArray())) }.start() while(true){ val byteBuffer = ByteBuffer.allocate(1024) if(channel.receive(byteBuffer) !== null ) { val length = byteBuffer.position() byteBuffer.flip() val data = ByteArray(length).apply { System.arraycopy(byteBuffer.array(), 0, this, 0, length) } println("server: 接收到数据 - [${String(data)}]") break; } else { println("server: 没有读取到数据, 休眠3秒再试试") Thread.sleep(3000) } } }
AIO(Asynchronous IO) 异步非阻塞IO
主要融合了Java的Future模式, 提供了回调的方式, 当调用阻塞操作时, 启用子线程去处理, 处理完成后, 回调后续的业务操作, 注意, 此时的线程不再是之前主线程, 如果想保持在主线程中执行后续的业务逻辑, 则可以使用 future.get() 方式, 使得主线程阻塞直到任务完成
@Test fun testAIO() { val channel = AsynchronousServerSocketChannel.open().apply { bind(InetSocketAddress(9000)) // 绑定端口 } // 异步处理 accept 事件 class AcceptHandler(serverSocketChannel: AsynchronousServerSocketChannel) : CompletionHandler<AsynchronousSocketChannel, Nothing?> { override fun completed(socketChannel: AsynchronousSocketChannel, attachment: Nothing?) { println("server: 有client接入了 - $socketChannel") // 或使用 write(ByteArray byteArray, A attachment, CompletionHandler<Long,? super A> handler); 异步触发消息发送完毕事件 socketChannel.write(ByteBuffer.wrap("hello client!".toByteArray())).get() } override fun failed(exc: Throwable?, attachment: Nothing?) { } } channel.accept(null, AcceptHandler(channel)) // AIO Client val byteBuffer = ByteBuffer.allocate(1024) val socketChannel = AsynchronousSocketChannel.open().apply { connect(InetSocketAddress("localhost", 9000)).get() } // 可指定超时时间, 使用 CompletionHandler 异步获取执行结果 socketChannel.read(byteBuffer).get() val length = byteBuffer.position() byteBuffer.flip() val data = ByteArray(length) System.arraycopy(byteBuffer.array(), 0, data, 0, length) println("client: 接收到数据 - ${String(data)}") }