浅谈Java中BIO、NIO

2020-08-09T15:47:00
真的就只是浅谈

浅谈Java中的BIO、NIO

到底什么是“IO Block”

很多人说BIO不好,会“blcok”,但到底什么是IO的Blcok呢?看一下下面两种情况:

  • 用系统调用read()从socket里读取数据
  • 用系统调用read()从一个磁盘文件读取一段数据到内存

如果你认为这两种都算“Block”,那么很遗憾,你与Linux理解不同,Linux中这么认为:

  • 对于第一种情况,算作block,因为Linux无法知道网络上对方是否会发数据。如果没数据发过来,对于调用read的程序来说,就只能“等”。
  • 对于第二种情况,不算做block

是的,对于磁盘文件IO,Linux总是不视作Block。

你可能会说,这不科学啊,磁盘读写偶尔也会因为硬件而卡壳啊,怎么能不算Block呢?但实际就是不算。

一个解释是,所谓“Block”是指操作系统可以预见这个Block会发生才会主动Block。例如当读取TCP连接的数据时,如果发现Socket buffer里没有数据就可以确定定对方还没有发过来,于是Block;而对于普通磁盘文件的读写,也许磁盘运作期间会抖动,会短暂暂停,但是操作系统无法预见这种情况,只能视作不会Block,照样执行。

基于这个基本的设定,在讨论IO时,一定要严格区分网络IO和磁盘文件IO。NIO和后文讲到的IO多路复用只对网络IO有意义。

严格的说,O_NONBLOCK和IO多路复用,对标准输入输出描述符、管道和FIFO也都是有效的。但本文侧重于讨论高性能网络服务器下各种IO的含义和关系,所以本文做了简化,只提及网络IO和磁盘文件IO两种情况。

本文先着重讲一下网络IO。

BIO的缺点

两个地方有阻塞

  1. read()方法有阻塞。当第一个Client连接Server后,read方法会被阻塞等待Client发送数据过来
  2. accept()方法有阻塞,当一个Client连接Server后,如果阻塞在read方法那里,会导致其他Client不能连接Server
  3. 所以read()方法那里需要多线程,但是会导致服务端需要开大量的线程,造成资源的浪费
static byte[] bs = new byte[1024];
public static void main(String[] args) throws IOException {

    ServerSocket  serverSocket = new ServerSocket(8080);
    while (true) {
        System.out.println("wait conn.....");
        // 阻塞
        Socket client = serverSocket.accept();
        System.out.println("conn success.....");
        System.out.println("wait data");
        // read也会阻塞
        client.getInputStream().read(bs);
        System.out.println("data success");
        System.out.println(new String(bs));
    }
}

NIO的设计思路

​ 利用单线程来处理并发(redis也是单线程,利用epoll),是基于事件驱动思想

BIO存在的问题

​ 1、read方法有阻塞,怎么减阻塞?设置非阻塞

​ 2、前面已经连接的socket,现在发送数据了,循环在accpet阻塞了,如何解决?需要设置accept为非阻塞

​ 3、还有进行下一次循环,原来的连接丢失,如何解决?保存起来,然后轮询

// 模仿NIO
static ByteBuffer buffer = ByteBuffer.allocate(1024);
static List<SocketChannel> socketList =  new ArrayList<>();
public static void main(String[] args) throws IOException {

    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.configureBlocking(false);

    while (true) {
        // 这个accept的方法是非阻塞的
        SocketChannel client = serverSocket.accept();

        if(client == null){
            // 不管有没有人连接,都必须循环一遍,看是否有数据。还需要解决无意义的空轮询
            for (SocketChannel socket : socketList) {
                // read也会阻塞,必须办法改变这里,设置这里为类似非阻塞效果
                int count = socket.read(buffer);
                if(count != 0) {
                    buffer.flip();
                    System.out.println(new String(buffer.array(), 0, count));
                }
            }
        }else  if (client != null){
            // 设置非阻塞
            client.configureBlocking(false);
            socketList.add(client);
            // 不管有没有人连接,都必须循环一遍,看是否有数据
            for (SocketChannel socket : socketList) {
                // read也会阻塞,必须设置这里也为非阻塞
                int count = socket.read(buffer);
                System.out.println(count + "=======");
                if(count != 0) {
                    buffer.flip();
                    System.out.println(new String(buffer.array(), 0, count));
                }
            }
        }
    }
}

上面程序的存在问题?

  1. 假如1000个连接,只有100在活跃,for循环太浪费,可以把for从JVM执行交给OS执行(通过JNI)
  2. 解决无意义的for循环(有数据才循环它)====>selector、epoll

NIO的服务端Demo

[collapse title="SelectorTest.java"]

public static void main(String[] args) throws IOException {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 设置为非阻塞
    serverSocketChannel.configureBlocking(false);
    // 绑定一个socket对象,监听8899
    ServerSocket socket = serverSocketChannel.socket();
    socket.bind(new InetSocketAddress(8899));
    // 创建一个Selector对象
    Selector selector = Selector.open();
    // 把连接事件注册到Selector对象上
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    ByteBuffer buffer = ByteBuffer.allocate(1024);
    // 死循环,不断轮询
    while (true){
        // 每次进行轮询,阻塞
        selector.select();

        Set<SelectionKey> selectionKeys = selector.keys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext() ){
            SelectionKey selectionKey = iterator.next();
            // 分发事件
            // 1. 连接建立
            if (selectionKey.isAcceptable()){
                // 获取连接对象
                ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();

                SocketChannel socketChannel = null;
                try {
                    // 和客户端的连接socket
                    socketChannel = serverChannel.accept();

                    if (socketChannel != null){
                        // 设置非阻塞
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
            // 2. 读事件处理
            if (selectionKey.isReadable()){
                SocketChannel channel = (SocketChannel) selectionKey.channel();
                int read = 0;
                try {
                    // 读取到buffer中
                    read = channel.read(buffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (read > 0) {
                    // 切换为读模式
                    buffer.flip();
                    Charset charset = StandardCharsets.UTF_8;
                    char[] array = charset.decode(buffer).array();
                    String msg = new String(array);
                    System.out.println(msg);
                    buffer.clear();
                }
            }

        }

    }
}

[/collapse]

NIO分析

1、HeapByteuffer

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

所有通过allocate方法创建的buffer都是堆buffer。

2、DirectByteBuffer

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

所有通过allocateDirect方法创建的buffer都是堆外buffer。

FileChannelImpl源码分析

read()方法

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                // read方法是通过IOUtil的read实现
                n = IOUtil.read(fd, dst, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

IOUtil的read实现

static int read(FileDescriptor fd, ByteBuffer dst, long position,
                NativeDispatcher nd) IOException {
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, nd);

    // Substitute(替换) a native buffer
   // 1. 首先申请一块和缓存同大小的DirectByteBuffer bb。
    ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    try {
        // 2. 读取数据到缓存bb,底层由NativeDispatcher的read实现。
        int n = readIntoNativeBuffer(fd, bb, position, nd);
        bb.flip();
        if (n > 0)
            // 3. 把bb的数据读取到dst(用户定义的缓存,在jvm中分配内存)
            dst.put(bb);
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

read方法导致数据复制了两次(一次到TemporaryDirectBuffer,第二次在jvm分配的内存)。同样write方法也是复制了两次。

参考链接

结合代码详细聊聊 Java 网络编程中的 BIO、NIO 和 AIO

NIO - Buffer & Selector & Channel

当前页面是本站的「Baidu MIP」版。发表评论请点击:完整版 »