java开发

技术学习笔记

Java几种IO模式

IO模式基本分为三种:同步阻塞 同步非阻塞 异步非阻塞



同步阻塞IO

Java中提供的标准IO输入输出, 同步IO, 一个数据流即需要一个线程去读取, 此时, 线程将会等待数据流准备时间, 如果未准备好, 则需要一直阻塞, 可以采取线程池的方式对该读取方式进行优化

@Test
fun testBlockingIO() {
    val serverSocket = ServerSocket(9000)
    println("server: 服务端启动,等待连接...")
    // 启动客户端线程
    Thread {
        println("client: 开始连接服务端...")
        // 指定服务端地址及端口
        val socket = Socket("localhost", 9000)
        val input = DataInputStream(socket.getInputStream())
        val output = DataOutputStream(socket.getOutputStream())
        println("client: 处理3秒钟")
        Thread.sleep(3000)
        println("client: 发送消息")
        output.writeUTF("hello server!")
        output.flush()
        println("client: 接收数据[${input.readUTF()}]")\
    }.start()
    // 服务器阻塞5秒钟, 等待客户端连接
    val socket = serverSocket.accept()
    println("server: 有客户端连接了")
    val input = DataInputStream(socket.getInputStream())
    val output = DataOutputStream(socket.getOutputStream())
    println("server: 接收数据[${input.readUTF()}]")
    println("server: 服务处理5秒钟...")
    Thread.sleep(5000L)
    println("server: 发送消息")
    output.writeUTF("hello client!")
    output.flush()
}
 

同步非阻塞IO

示例代码如下, 与普通IO最大的区别在于, 当去读取IO数据时, 无论数据有没有准备好, 会立即返回, 线程需要不断的轮询去查看socket是否准备好, 相对于同步阻塞IO, 线程不用等在那里等待IO就绪, 但同时浪费了大量cpu的轮询时间

@Test
fun testNonBlockingIO(){
    // 服务端
    val channel = DatagramChannel.open().apply {
        configureBlocking(false) // 非阻塞
        bind(InetSocketAddress(9000)) // 绑定端口
    }
    Thread {
        Thread.sleep(5000)
        val datagramChannel = DatagramChannel.open().apply {
            connect(InetSocketAddress("localhost", 9000))
        }
        println("client: 发送数据了")
        datagramChannel.write(ByteBuffer.wrap("hello nio server".toByteArray()))
    }.start()
    while(true){
        val byteBuffer = ByteBuffer.allocate(1024)
        if(channel.receive(byteBuffer) !== null ) {
            val length = byteBuffer.position()
            byteBuffer.flip()
            val data = ByteArray(length).apply {
                System.arraycopy(byteBuffer.array(), 0, this, 0, length)
            }
            println("server: 接收到数据 - [${String(data)}]")
            break;
        } else {
            println("server: 没有读取到数据, 休眠3秒再试试")
            Thread.sleep(3000)
        }
    }
}
 

AIO(Asynchronous IO) 异步非阻塞IO

主要融合了Java的Future模式, 提供了回调的方式, 当调用阻塞操作时, 启用子线程去处理, 处理完成后, 回调后续的业务操作, 注意, 此时的线程不再是之前主线程, 如果想保持在主线程中执行后续的业务逻辑, 则可以使用 future.get() 方式, 使得主线程阻塞直到任务完成


@Test
fun testAIO() {
    val channel = AsynchronousServerSocketChannel.open().apply {
        bind(InetSocketAddress(9000)) // 绑定端口
    }
    // 异步处理 accept 事件
    class AcceptHandler(serverSocketChannel: AsynchronousServerSocketChannel)
        : CompletionHandler<AsynchronousSocketChannel, Nothing?> {
        override fun completed(socketChannel: AsynchronousSocketChannel, attachment: Nothing?) {
            println("server: 有client接入了 - $socketChannel")
            // 或使用 write(ByteArray byteArray, A attachment, CompletionHandler<Long,? super A> handler); 异步触发消息发送完毕事件
            socketChannel.write(ByteBuffer.wrap("hello client!".toByteArray())).get()
        }
        override fun failed(exc: Throwable?, attachment: Nothing?) {
        }
    }
    channel.accept(null, AcceptHandler(channel))
    // AIO Client
    val byteBuffer = ByteBuffer.allocate(1024)
    val socketChannel = AsynchronousSocketChannel.open().apply {
        connect(InetSocketAddress("localhost", 9000)).get()
    }
    // 可指定超时时间, 使用 CompletionHandler 异步获取执行结果
    socketChannel.read(byteBuffer).get()
    val length = byteBuffer.position()
    byteBuffer.flip()
    val data = ByteArray(length)
    System.arraycopy(byteBuffer.array(), 0, data, 0, length)
    println("client: 接收到数据 - ${String(data)}")
}
 

发表评论:

Powered By Z-BlogPHP 1.7.1

唐云飞个人日记