IO模式基本分为三种:同步阻塞 同步非阻塞 异步非阻塞
同步阻塞IO
Java中提供的标准IO输入输出, 同步IO, 一个数据流即需要一个线程去读取, 此时, 线程将会等待数据流准备时间, 如果未准备好, 则需要一直阻塞, 可以采取线程池的方式对该读取方式进行优化
@Test
fun testBlockingIO() {
val serverSocket = ServerSocket(9000)
println("server: 服务端启动,等待连接...")
// 启动客户端线程
Thread {
println("client: 开始连接服务端...")
// 指定服务端地址及端口
val socket = Socket("localhost", 9000)
val input = DataInputStream(socket.getInputStream())
val output = DataOutputStream(socket.getOutputStream())
println("client: 处理3秒钟")
Thread.sleep(3000)
println("client: 发送消息")
output.writeUTF("hello server!")
output.flush()
println("client: 接收数据[${input.readUTF()}]")\
}.start()
// 服务器阻塞5秒钟, 等待客户端连接
val socket = serverSocket.accept()
println("server: 有客户端连接了")
val input = DataInputStream(socket.getInputStream())
val output = DataOutputStream(socket.getOutputStream())
println("server: 接收数据[${input.readUTF()}]")
println("server: 服务处理5秒钟...")
Thread.sleep(5000L)
println("server: 发送消息")
output.writeUTF("hello client!")
output.flush()
}
同步非阻塞IO
示例代码如下, 与普通IO最大的区别在于, 当去读取IO数据时, 无论数据有没有准备好, 会立即返回, 线程需要不断的轮询去查看socket是否准备好, 相对于同步阻塞IO, 线程不用等在那里等待IO就绪, 但同时浪费了大量cpu的轮询时间
@Test
fun testNonBlockingIO(){
// 服务端
val channel = DatagramChannel.open().apply {
configureBlocking(false) // 非阻塞
bind(InetSocketAddress(9000)) // 绑定端口
}
Thread {
Thread.sleep(5000)
val datagramChannel = DatagramChannel.open().apply {
connect(InetSocketAddress("localhost", 9000))
}
println("client: 发送数据了")
datagramChannel.write(ByteBuffer.wrap("hello nio server".toByteArray()))
}.start()
while(true){
val byteBuffer = ByteBuffer.allocate(1024)
if(channel.receive(byteBuffer) !== null ) {
val length = byteBuffer.position()
byteBuffer.flip()
val data = ByteArray(length).apply {
System.arraycopy(byteBuffer.array(), 0, this, 0, length)
}
println("server: 接收到数据 - [${String(data)}]")
break;
} else {
println("server: 没有读取到数据, 休眠3秒再试试")
Thread.sleep(3000)
}
}
}
AIO(Asynchronous IO) 异步非阻塞IO
主要融合了Java的Future模式, 提供了回调的方式, 当调用阻塞操作时, 启用子线程去处理, 处理完成后, 回调后续的业务操作, 注意, 此时的线程不再是之前主线程, 如果想保持在主线程中执行后续的业务逻辑, 则可以使用 future.get() 方式, 使得主线程阻塞直到任务完成
@Test
fun testAIO() {
val channel = AsynchronousServerSocketChannel.open().apply {
bind(InetSocketAddress(9000)) // 绑定端口
}
// 异步处理 accept 事件
class AcceptHandler(serverSocketChannel: AsynchronousServerSocketChannel)
: CompletionHandler<AsynchronousSocketChannel, Nothing?> {
override fun completed(socketChannel: AsynchronousSocketChannel, attachment: Nothing?) {
println("server: 有client接入了 - $socketChannel")
// 或使用 write(ByteArray byteArray, A attachment, CompletionHandler<Long,? super A> handler); 异步触发消息发送完毕事件
socketChannel.write(ByteBuffer.wrap("hello client!".toByteArray())).get()
}
override fun failed(exc: Throwable?, attachment: Nothing?) {
}
}
channel.accept(null, AcceptHandler(channel))
// AIO Client
val byteBuffer = ByteBuffer.allocate(1024)
val socketChannel = AsynchronousSocketChannel.open().apply {
connect(InetSocketAddress("localhost", 9000)).get()
}
// 可指定超时时间, 使用 CompletionHandler 异步获取执行结果
socketChannel.read(byteBuffer).get()
val length = byteBuffer.position()
byteBuffer.flip()
val data = ByteArray(length)
System.arraycopy(byteBuffer.array(), 0, data, 0, length)
println("client: 接收到数据 - ${String(data)}")
}