浅谈Java中BIO、NIO
真的就只是浅谈
浅谈Java中的BIO、NIO
到底什么是“IO Block”
很多人说BIO不好,会“blcok”,但到底什么是IO的Blcok呢?看一下下面两种情况:
- 用系统调用
read()
从socket里读取数据 - 用系统调用
read()
从一个磁盘文件读取一段数据到内存
如果你认为这两种都算“Block”,那么很遗憾,你与Linux理解不同,Linux中这么认为:
- 对于第一种情况,算作block,因为Linux无法知道网络上对方是否会发数据。如果没数据发过来,对于调用
read
的程序来说,就只能“等”。 - 对于第二种情况,不算做block。
是的,对于磁盘文件IO,Linux总是不视作Block。
你可能会说,这不科学啊,磁盘读写偶尔也会因为硬件而卡壳啊,怎么能不算Block呢?但实际就是不算。
“
一个解释是,所谓“Block”是指操作系统可以预见这个Block会发生才会主动Block。例如当读取TCP连接的数据时,如果发现Socket buffer里没有数据就可以确定定对方还没有发过来,于是Block;而对于普通磁盘文件的读写,也许磁盘运作期间会抖动,会短暂暂停,但是操作系统无法预见这种情况,只能视作不会Block,照样执行。
基于这个基本的设定,在讨论IO时,一定要严格区分网络IO和磁盘文件IO。NIO和后文讲到的IO多路复用只对网络IO有意义。
“
严格的说,O_NONBLOCK和IO多路复用,对标准输入输出描述符、管道和FIFO也都是有效的。但本文侧重于讨论高性能网络服务器下各种IO的含义和关系,所以本文做了简化,只提及网络IO和磁盘文件IO两种情况。
本文先着重讲一下网络IO。
BIO的缺点
两个地方有阻塞:
- read()方法有阻塞。当第一个Client连接Server后,read方法会被阻塞等待Client发送数据过来
- accept()方法有阻塞,当一个Client连接Server后,如果阻塞在read方法那里,会导致其他Client不能连接Server
- 所以read()方法那里需要多线程,但是会导致服务端需要开大量的线程,造成资源的浪费
static byte[] bs = new byte[1024];
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
while (true) {
System.out.println("wait conn.....");
// 阻塞
Socket client = serverSocket.accept();
System.out.println("conn success.....");
System.out.println("wait data");
// read也会阻塞
client.getInputStream().read(bs);
System.out.println("data success");
System.out.println(new String(bs));
}
}
NIO的设计思路
利用单线程来处理并发(redis也是单线程,利用epoll),是基于事件驱动
思想
BIO存在的问题
1、read方法有阻塞,怎么减阻塞?设置非阻塞
2、前面已经连接的socket,现在发送数据了,循环在accpet阻塞了,如何解决?需要设置accept为非阻塞
3、还有进行下一次循环,原来的连接丢失,如何解决?保存起来,然后轮询
// 模仿NIO
static ByteBuffer buffer = ByteBuffer.allocate(1024);
static List<SocketChannel> socketList = new ArrayList<>();
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
while (true) {
// 这个accept的方法是非阻塞的
SocketChannel client = serverSocket.accept();
if(client == null){
// 不管有没有人连接,都必须循环一遍,看是否有数据。还需要解决无意义的空轮询
for (SocketChannel socket : socketList) {
// read也会阻塞,必须办法改变这里,设置这里为类似非阻塞效果
int count = socket.read(buffer);
if(count != 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, count));
}
}
}else if (client != null){
// 设置非阻塞
client.configureBlocking(false);
socketList.add(client);
// 不管有没有人连接,都必须循环一遍,看是否有数据
for (SocketChannel socket : socketList) {
// read也会阻塞,必须设置这里也为非阻塞
int count = socket.read(buffer);
System.out.println(count + "=======");
if(count != 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, count));
}
}
}
}
}
上面程序的存在问题?
- 假如1000个连接,只有100在活跃,for循环太浪费,可以把for从JVM执行交给OS执行(通过JNI)
- 解决无意义的for循环(有数据才循环它)====>selector、epoll
NIO的服务端Demo
[collapse title="SelectorTest.java"]
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定一个socket对象,监听8899
ServerSocket socket = serverSocketChannel.socket();
socket.bind(new InetSocketAddress(8899));
// 创建一个Selector对象
Selector selector = Selector.open();
// 把连接事件注册到Selector对象上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 死循环,不断轮询
while (true){
// 每次进行轮询,阻塞
selector.select();
Set<SelectionKey> selectionKeys = selector.keys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext() ){
SelectionKey selectionKey = iterator.next();
// 分发事件
// 1. 连接建立
if (selectionKey.isAcceptable()){
// 获取连接对象
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = null;
try {
// 和客户端的连接socket
socketChannel = serverChannel.accept();
if (socketChannel != null){
// 设置非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
}catch (IOException e){
e.printStackTrace();
}
}
// 2. 读事件处理
if (selectionKey.isReadable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
int read = 0;
try {
// 读取到buffer中
read = channel.read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
if (read > 0) {
// 切换为读模式
buffer.flip();
Charset charset = StandardCharsets.UTF_8;
char[] array = charset.decode(buffer).array();
String msg = new String(array);
System.out.println(msg);
buffer.clear();
}
}
}
}
}
[/collapse]
NIO分析
1、HeapByteuffer
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
所有通过allocate方法创建的buffer都是堆buffer。
2、DirectByteBuffer
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
所有通过allocateDirect方法创建的buffer都是堆外buffer。
FileChannelImpl源码分析
read()方法
public int read(ByteBuffer dst) throws IOException {
ensureOpen();
if (!readable)
throw new NonReadableChannelException();
synchronized (positionLock) {
int n = 0;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return 0;
do {
// read方法是通过IOUtil的read实现
n = IOUtil.read(fd, dst, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end(n > 0);
assert IOStatus.check(n);
}
}
}
IOUtil的read实现
static int read(FileDescriptor fd, ByteBuffer dst, long position,
NativeDispatcher nd) IOException {
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd);
// Substitute(替换) a native buffer
// 1. 首先申请一块和缓存同大小的DirectByteBuffer bb。
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
// 2. 读取数据到缓存bb,底层由NativeDispatcher的read实现。
int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
if (n > 0)
// 3. 把bb的数据读取到dst(用户定义的缓存,在jvm中分配内存)
dst.put(bb);
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
read方法导致数据复制了两次(一次到TemporaryDirectBuffer,第二次在jvm分配的内存)。同样write方法也是复制了两次。