netty


Netty

1. IO模型

  • BIO 适用于连接数目较小且固定的架构,这种方式对于服务器资源要求较高,有并发局限,JDK1.4以前的唯一选择,但程序简单易理解。基于字节流和字符流,以流的方式进行,阻塞。
  • NIO 适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。基于channel和buffer,以块的方式进行,不阻塞。
  • AIO 适用于连接数目多且连接时间长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

netty基于NIO。

1.1 NIO

1.1.1 buffer

(为什么)它是为了实现缓冲区的概念而产生的。

(是什么)是一种缓冲区的抽象类(abstract class)。本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。

(怎么做)四个属性:

  1. capacity 容量,即可以容纳的最大数据量。在缓冲区创建时被设定并且不能改变。
  2. limit 缓冲区当前终点,不能对缓冲区超过极限位置进行读写操作。且极限位置可以修改的
  3. position 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变值,为下次读写作准备。
  4. mark 标记
  • 注意

    buffer中存放的数据类型,放入和取除的数据类型必须一致。否则抛出BufferUnderflowException异常。

使用:

1.ReadOnlyBuffer
ByteBuffer readBuffer = buffer.asReadOnlyBuffer();
while (readOnlyBuffer.hasRemaining()) {
    System.out.println(readOnlyBuffer.get());
}
// 错误的异常
readOnlyBuffer.put((byte)100);//ReadOnlyBufferException
2. MappedByteBuffer

这个buffer可以让文件直接在内存中进行修改,而如何同步到文件由NIO来完成。

@Test
    public void run() throws IOException {
        RandomAccessFile accessFile = new RandomAccessFile("1.txt", "rw");
        //获取对应的通道
        FileChannel channel = accessFile.getChannel();
        /**
         * 使用的读写模式
         * 可以直接修改的读写位置
         * 映射到内存的大小,即将 1.txt 的多少个字节映射到内存
         * 可以直接修改的范围是 0-5
         */
        java.nio.MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
        mappedByteBuffer.put(0, (byte) 'H');
        //抛出异常,超出索引范围
        mappedByteBuffer.put(5, (byte) '0');

        accessFile.close();

    }
3. buffer分散和聚集
@Test
    public void run() throws Exception{
        // 使用ServerSocketChannel 和 SocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);

        serverSocketChannel.socket().bind(inetSocketAddress);

        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(5);
        byteBuffers[1] = ByteBuffer.allocate(3);

        SocketChannel socketChannel = serverSocketChannel.accept();

        // 等客户端连接
        // 循环读取
        int clientByteLen = 8;
        while (true) {
            int byteRead = 0;
            while (byteRead < clientByteLen) {
                long read = socketChannel.read(byteBuffers);
                byteRead += read;
                System.out.println("byteRead = " + byteRead);

                // 使用流打印,看看当前这个buffer的position 和 limit
                Arrays.asList(byteBuffers).stream().map(buffer -> "posiiton" + buffer.position() + ", limit=" + buffer.limit()).forEach(System.out:: println);
                // 将所有的buffer进行反转
                Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.flip());

                // 将数据读出显示到客户端
                long byteWrite = 0;
                while (byteWrite < clientByteLen) {
                    long write = socketChannel.write(byteBuffers);
                    byteWrite += write;
                }
                Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.clear());
                System.out.println("byteRead:=" + byteRead + " byteWrite = " + byteWrite);
            }
        }

1.2 Channel 通道

(是什么)是一个接口(interface)

  • 通道可以同时读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 可以从缓冲区读数据,也可以写数据到缓冲
public interface Channel extends Closeable {

    /**
     * Tells whether or not this channel is open.
     *
     * @return <tt>true</tt> if, and only if, this channel is open
     */
    public boolean isOpen();

    /**
     * Closes this channel.
     *
     * <p> After a channel is closed, any further attempt to invoke I/O
     * operations upon it will cause a {@link ClosedChannelException} to be
     * thrown.
     *
     * <p> If this channel is already closed then invoking this method has no
     * effect.
     *
     * <p> This method may be invoked at any time.  If some other thread has
     * already invoked it, however, then another invocation will block until
     * the first invocation is complete, after which it will return without
     * effect. </p>
     *
     * @throws  IOException  If an I/O error occurs
     */
    public void close() throws IOException;

}

常用的Channel类有:FileChannelDatagramChannelServerSocketChannelSocketChannel

FileChannel用于文件的读写,DatagramChannel用于UDP的数据读写,ServerSocketChannelSocketChannel用于TCP的数据读写。

1.2 Selector

是什么?

Selector能够检测多个注册的通道上是否有事件发生。注意:多个Channel以事件的方式可以注册到同一个Selector。如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

怎么做?

Netty的IO线程NioEventLoop聚合了Selector(选择器)(也叫多路复用器),可以同时并发处理成百上千个客户端连接。

当线程从某客户端Socket通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。

1. 实现

Selector类是一个抽象类

  • select()阻塞。

  • select(long timeout) 监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间。

  • wakeup() 唤醒selector

  • selectNow() 不阻塞,立马返回。

  • Set selectedKeys(); //从内部集合中得到所有的SelectionKey。

2. NIO 非阻塞网络编程原理分析图
  1. 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
  2. 将socketChannel注册到Selector上,register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel。
  3. 注册后返回一个SelectionKey,会和该Selector关联(集合)。
  4. Selector进行监听select方法,返回有事件发生的通道的个数。
  5. 进一步得到各个SelectionKey。
  6. 再通过SelectionKey得到注册的Channel。
  7. 通过得到的Channel完成业务处理。

2. 零拷贝

零拷贝指的是从操作系统角度看的,没有CPU拷贝。

传统IO,四次拷贝,三次切换。

3.1 mmap优化

  • mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。

拷贝减少为3次,内核态和用户态的切换仍然为3次。

适合小数据量。

3.2 sendfile 优化

基本原理:直接从内核缓冲区进入到Socket Buffer。同时,由于和用户态完全无关,减少了一次上下文切换。

适合大文件传输。

只需要一次系统调用,最少2次数据拷贝。

3. netty 架构

3.1 其他线程模型

  • 传统阻塞IO模式
  • Reactor模式
    • 单Reactor单线程
    • 单Reactor多线程
    • 主从Reactor多线程

3.2 netty线程模型

基于主从Reactor多线程模型,并且对其进行了一定的改进,其中主从Reactor多线程模型有多个Reactor。

  1. Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
  2. 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式。
  3. Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键。

文章作者: 小灰灰
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小灰灰 !
评论
  目录