1.Netty and Java NIO APIs

本章内容主要介绍:

  • Netty 架构
  • 我们为什么需要非阻塞IO
  • 阻塞IO vs 非阻塞IO
  • 了解 JDK 的 NIO实现的问题和 Netty 的解决方法

前面关于 Netty 架构的省略。

异步的设计

整个 Netty 的 API 都是异步的。异步处理并不新鲜,已经出现有一段时间了。在这些年里,IO 经常出现瓶颈,所以异步处理变得越来越重要。

在使用资源时调用异步处理可以变得更有效率,因为当任务进行时,我们可以去干其他事,直到我们收到任务完成的信息。

Callbacks

回调经常在异步处理中使用,回调被传入到方法中直到方法完成才执行。我们可以经常在 JavaScript 中看到异步回调,因为它是 JS 语言的核心。

以下代码展示了当获取到数据后使用的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
interface FetchCallback {
void onData(Data data);

void onError(Throwable cause);
}

interface Fetcher {
// 该 FetchCallback 类型参数有两个回调方法:
// onData :接收到数据后的操作
// onError:出错后的操作
void fetchData(FetchCallback callback);
}

class Worker {
public void doWork() {
Fetcher fetcher = ...
fetcher.fetchData(new FetchCallback() {
@Override
public void onData(Data data) {
// 接收到数据
}

@Override
public void onError(Throwable cause) {
// 出现错误
}
});
...
}
}

如果写过 Android,应该对以上代码很熟悉。

有个问题就是,当你使用一大堆异步方法并回调时,就会出现很多意大利面式的冗余代码。一些人会认为这会导致代码的难读,但我认为这更多的是影响代码的风格。比如 Node.js,它基于 JavaScript,变得越来越流行。它就使用了大量的回调,但依然有很多人觉得很好读很好写。

Futures

第二种就是使用 Futures。Future 是抽象,是对将要变得可用的值的呈现。

Future 接口在 java.util.concurrent 包中,通过为异步处理服务的 Executor 进行使用。

以下例子中,我们传入了 Runnable 对象给 ExecutorService.submit() 方法,方法会返回 Future 对象给我们,利用它我们可以检查到方法是否执行完成。

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService executor = Executors.newCachedThreadPool();
// 无返回值
Runnable task1 = () -> doSomeHeavyWork();
// 带返回值
Callable<Integer> task2 = () -> doSomeHeavyWorkWithResult();
// 提交执行
Future<?> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
// 循环查看是否完成
while (!future1.isDone() || !future2.isDone()) {
// 完成之后做些什么
}

现在让我们尝试将回调例子改造一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
interface Fetcher {
Future<Data> fetchData();
}

class Worker {
public void doWork() {
Fetcher fetcher = ...
Future<Data> future = fetcher.fetchData();
try {
while (!future.isDone()){
// 完成之后做些什么
}
// 接收到数据
System.out.println("Data received: "+ future.get());
}catch (Throwable cause){
// 出现错误
}
}
}

有时候用 Futures 会感到很恶心,因为你需要每隔一段时间检查一下是否完成,而通过回调你可以第一时间收到完成消息。

通过以上两个关于异步执行的例子,你也许会想哪一种会比较好。这里没有明确的答案。如果使用 Netty,它将为你提供最好的操作。

JVM 中 阻塞IO vs 非阻塞IO

早期的 Java 1.4 就有了 NIO 的 API,而 Java7 则提供了新的 NIO.2 API,其包含更多高层次的 API。

如果要用 Java 写和网络相关的程序,有两种方法:

  1. 使用 IO,也叫 阻塞 IO
  2. 使用 NIO,也叫 新/非阻塞 IO

New 还是 Non-Blocking?
NIO 的 N 很明显应该是 Non-Blocking。因为 NIO 出现很久了,没人会说它是 New 的了。

一个线程处理一个连接,它们是 1:1 的关系,因此在 JVM 中会限制你可以创建的线程数。

阻塞 IO

非阻塞 IO 则可以让你使用一个 selector 处理多个连接,它们是 1:n 的关系。

非阻塞 IO

请记住上面的关系图,让我们更深入阻塞和非阻塞。

接下来我们将写一个简单的 echo 服务端去描述这两种 IO 的不同。

基于阻塞 IO 的 echo 服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PlainEchoServer {
public void serve(int port) throws IOException {
ServerSocket socket = new ServerSocket(port);
try {
while (true) {
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from" + clientSocket);
// 每个新连接都要开启一个线程
// 和使用线程池的本质是一样的
// 一旦需要处理成千上万的并发连接,就会出现灾难
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
// 将读到的数据写回去
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
while (true) {
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ignored) {
}
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

非阻塞 IO 基础

Java7 提供的新 NIO API 被称为 NIO.2,当然你也可以用 NIO。

尽管新的 API 也是异步的,但在实现和对外接口上并不完全相同,但都有相同的特性。比如,两者的实现都使用了名为 ByteBuffer 的抽象作为数据容器。


ByteBuffer

ByteBuffer 是新旧 NIO API 的基础,对于 Netty 来说也是。ByteBuffer 内存可以被分配在堆上,或在直接内存上,这意味着它可以被存储在堆空间以外。通常,当将它传到 channel 上时,在直接内存上的访问会更快,但分配和回收的花费会更高。

ByteBuffer 允许相同数据在 ByteBuffer 实例间分享,而不用去做内存拷贝。它还支持切片和其他操作去限制数据的可见性。

ByteBuffer 的典型使用场景:

  • 向 ByteBuffer 写数据
  • 调用 ByteBuffer.flip() 切换读写模式
  • 从 ByteBuffer 读数据
  • 调用 ByteBuffer.clear() 清除整个 ByteBuffer
  • 调用 ByteBuffer.compact() 清除已从内存拷贝中读取的数据

当你向 ByteBuffer 写入数据时,它会去更新缓存已写的下标位置(即每写1个数据,position+1);当然你也可以手动完成。

当你准备好读取数据时,可以调用 ByteBuffer.flip() 把写模式切换到读模式。调用 ByteBuffer.flip() 会将 ByteBuffer 的 limit 设置为当前 position,并更新 position = 0。这样你就可以读取 ByteBuffer 的所有数据了。

ByteBuffer.compact() 将所有未读取的数据移动到 ByteBuffer 开头,然后调整 position。

下面是使用的场景例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Channel inChannel = ...
ByteBuffer buf = ByteBuffer.allocate(48);

int byteRead = -1;
do {
// 从 Channel 中读取数据到 ByteBuffer
byteRead = inChannel.read(buf);
if (byteRead != -1) {
// 切换到读模式
buf.flip();
while (buf.hasRemaining()) {
// 读取 ByteBuffer 的字节
// 每次 get() 会更新 position+1
System.out.print((char) buf.get());
}
// 使 ByteBuffer 准备下一次写
buf.clear();
}
} while (byteRead != -1);
inChannel.close();

现在基本了解了 ByteBuffer,下一步让我们认识 selectors。


NIO Selector

selector 是决定一个或多个 channels 是否准备好 读或写的 NIO 组件。单个 selector 可以被用于处理多个连接,可以缓和阻塞IO下1个线程处理1个连接的模型需求。

要使用 selector,你需要完成以下步骤:

  1. 创建一个或多个 selectors 去给已开启的 channels(sockets)进行注册。
  2. 当一个 channel 被注册了,要去指定需要监听的事件。以下是可以监听的四种事件(或操作):
  • OP_ACCEPT:连接 accept
  • OP_CONNECT:连接 connect
  • OP_READ:读
  • OP_WRITE:写
  1. 当多个 channels 被注册了,可以调用 Selector.select() 方法去阻塞,直到这些事件中的一个发生。
  2. 当方法不阻塞了,你就可以获得所有的 SelectionKey 实例(它们有注册 channel 的引用和选定的监听事件)。一个 SelectedKey 可以包含多个事件。

为了看它是怎么工作的,让我们实现一个非阻塞版本的 echo 服务端。

基于 NIO 的 echo 的服务端

该版本的服务端使用异步 NIO API,可以让一个线程处理上千个并发客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class PlainNioEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port: " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
// 绑定端口地址
ss.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
// 把 channel 注册到 selector,并关注连接的 accept 事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
try {
// 阻塞直到被选中
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
// 获得所有 SelectedKey 的实例
Set readyKeys = selector.selectedKeys();
Iterator iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
// 将 SelectedKey 移出迭代器
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 接受客户端的连接
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
client.configureBlocking(false);
// 把连接注册到 selector 并设置 ByteBuffer
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));
}
// 检查 SelectedKey 是否可读
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
// 将数据读到 ByteBuffer
client.read(output);
}
// 检查 SelectedKey 是否可写
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
// 将 ByteBuffer 里的数据写到 channel
client.write(output);
output.compact();
}
} catch (IOException e) {
key.channel();
try {
key.channel().close();
} catch (IOException ignored) {
}
}
}
}
}
}

该例子比之前的 echo 服务端版本都要复杂。但这种复杂是一种权衡。异步代码明显要比同步部分复杂。

下面我们要实现 NIO.2 版本的 echo 服务端。

基于 NIO.2 的 echo 服务端

不像原生 NIO 的实现,NIO.2 允许你发出 IO 操作和提供一个完成时的处理器(CompletionHandler 类)。这个完成时处理器会在 IO 操作完成后执行,完成时处理器的执行是由底层系统驱动的,开发者不用关心它的实现。而且支持在同一时间只执行一个在 channel 中的 CompletionHandler。这可以帮助简化代码,因为它除去了多线程执行带来的复杂度。

原生 NIO 和 NIO.2 的最主要不同点:你不必去检查一个在 channel 中的事件是否发生了,然后再去触发一些动作。

在 NIO.2 中,你只需要触发 IO 操作,和注册一个完成时处理器给它,这个处理器会在操作完成的第一时间得到通知。这可以移除你在自己程序里写逻辑去检查操作完成的必要。

现在让我们看下同样是异步 echo 服务端,用 NIO.2 实现是怎样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class PlainNio2EchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
// 绑定端口地址
serverChannel.bind(address);
final CountDownLatch latch = new CountDownLatch(1);
// 开始接受新的客户端连接
// 一旦有一个客户端被 accepted,CompletionHandler 就会被调用
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
// 再一次接受新的客户端连接
serverChannel.accept(null, this);
ByteBuffer buffer = ByteBuffer.allocate(100);
// 在 channel 中触发读操作
// 一旦读到什么 CompletionHandler 就会第一时间被通知
channel.read(buffer, buffer, new EchoCompletionHandler(channel));
}

@Override
public void failed(Throwable throwable, Object attachment) {
try {
// 有错误就关闭套接字
serverChannel.close();
} catch (IOException ignored) {
} finally {
latch.countDown();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;

EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}

@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
// 在 channel 中触发写操作
// 一旦写了什么 CompletionHandler 就会第一时间被通知
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
// 如果还有东西在 ByteBuffer 中
// 就再次触发写操作
channel.write(buffer, buffer, this);
} else {
buffer.compact();
channel.read(buffer, buffer, EchoCompletionHandler.this);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException ignored) {
}
}
});
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException ignored) {
}
}
}
}

第一眼看上去,比之前使用 NIO 的代码更多了。但要注意到 NIO.2 为你处理好了线程和事件循环的创建。这会简化在创建多线程 NIO 程序时的代码,尽管在这个例子中不太明显。随着程序变得越来越复杂,你产出的代码将会更简洁,简化效果会变得越来越明显。

下一步我们看看 JDK 的 NIO 存在的一些问题。

NIO 问题以及 Netty 是如何解决的

跨平台和兼容性问题

NIO 比较底层,取决于操作系统是如何处理 IO 的。Java 有统一的 API,这让它可以在所有操作系统上都能工作。

当你使用 NIO 时,你可能会发现在 Linux 下的代码工作正常,但在 Windows 下却有问题。我的建议就是,即使你不使用 NIO,你要想支持所有操作系统,你都要进行测试。所以就算所有的测试在你的 Linux 下都通过了,也要在其他操作系统上进行验证。如果你不打算这样,那就有得你爽喽。

NIO.2 只支持 Java7 以上,如果你在 Java6 以下,那就用不了了。而且,在写这篇文章时,NIO.2 还没有支持 datagram channels(针对 UDP 协议),所以只能限制使用 TCP 协议。

Netty 解决了这个问题,可以运行在 Java6 和 7 上。不用再担心向下兼容,而且还能享受到简单统一的 API。

ByteBuffer 可扩展性

如你所见,ByteBuffer 是数据的容器。不幸的是,JDK 没有包含 ByteBuffer 的实现,所以不支持对 ByteBuffer 实例的封装。如果你想压缩内存拷贝,这会很有用。如果你还是想“我自己实现它“。不用浪费你的时间,ByteBuffer 的构造器是私有的,所以不太可能进行扩展。

Netty 提供了自己的 ByteBuffer 实现,打破了上面的限制,而且通过提供一些构造方法,使用,和生成方法 API。

分散和聚集可能内存泄漏

许多 channel 的实现支持分散和聚集。这个特性可以让多个 ByteBuffer 实例在同一时间写入或读出,从而提高性能。这是内核/OS进行的读/写处理,因为更接近底层,所以可以更高效。

在你想切割不同 ByteBuffer 实例去分别处理缓存时,通常就要使用分散/聚集。比如你想在一个 ByteBuffer 中放入 header,而把 body 放到其他 ByteBuffer。

下图就展示了分散读是怎样的:

分散读

传入一组 ByteBuffer 实例到 ScatteringByteChannel,然后数据就会从 channel 分散读到 buffer。

写也差不多,只是数据是从 buffer 写到 channel:

聚集写

传入一组 ByteBuffer 实例到 GatheringByteChannel.write() 方法,然后数据就会从 buffer 写到 channel。

不幸的是,这特性可能会导致内存泄露,从而引发 OutOfMemoryError,所以你要谨慎使用分散/聚集。

著名的 epoll bug

在类 Linux 操作系统中,selector 使用 epoll - IO 事件发现。这是一种可以让操作系统在网络栈中异步工作的高性能技术。但直到今天,epoll 仍然有一个可以让 selector 变为 invalid 状态的 bug,会导致 CPU 占用达到100%。唯一的解决办法就是回收旧的 selector,把之前注册的 channel 实例都注册到新的 selector 上。

这里主要是由于 Selector.select() 方法停止阻塞,然后立即返回了,即使没有 SelectionKeys 被选中。这和它的本质是有冲突的,因为当它的事件没有被选中时是不能变为非阻塞的。

解决 epoll 的方法被限制了,但 Netty 尝试自动检测和预防这种情况。

下面是会出现 epoll bug 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
while (true) {
// 立即返回
// 并且当没有事件被选中时返回的是0
int selected = selector.select();
Set<SelectedKeys> readyKeys = selector.selectedKeys();
// 获得所有 SelectedKeys
// 没东西被选中时,迭代器会是空
Iterator iterator = readyKeys.iterator();
// 空迭代器,则无法进入
while (iterator.hasNext()){
...
}
}
...

最主要的影响是那段 while 循环会吃 CPU:

1
2
3
4
5
...
while (true) {

}
...
Author

Zoctan

Posted on

2018-10-18

Updated on

2023-03-14

Licensed under