将具体代码实现前,先来谈谈Netty的线程模型。正如许多博客所提到的,Netty采用了Reactor模式,但是许多博客也只是提到了而已,同时大家也不会忘记附上几张Doug Lee大神的图,但是并不会深入的解释。为了更好的学习和理解Netty的线程模型,我在这里稍微详细的说一下我对它的理解。
Reactor模式有多个变种,Netty基于Multiple Reactors模式(如下图)做了一定的修改,Mutilple Reactors模式有多个reactor:mainReactor和subReactor,其中mainReactor负责客户端的连接请求,并将请求转交给subReactor,后由subReactor负责相应通道的IO请求,非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理。
wpid-Multi-reactors3.png (174.77 KB, 下载次数: 3048) 下载附件 保存到相册 Multiple Reactors 8 年前 上传
wpid-Multi-reactors3.png (174.77 KB, 下载次数: 3048)
下载附件 保存到相册
Multiple Reactors
8 年前 上传
Netty的线程模型基于Multiple Reactors模式,借用了mainReactor和subReactor的结构,但是从代码里看来,它并没有Thread Pool这个东东。Netty的subReactor与worker thread是同一个线程,采用IO多路复用机制,可以使一个subReactor监听并处理多个channel的IO请求,我给称之为:「Single Thread with many Channel」。我根据代码整理出下面这种Netty线程模型图:
wpid-Netty-thread-model3.png (227.46 KB, 下载次数: 3121) 下载附件 保存到相册 Netty线程模型 8 年前 上传
wpid-Netty-thread-model3.png (227.46 KB, 下载次数: 3121)
Netty线程模型
上图中的parentGroup和childGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池,childGroup线程池会被各个subReactor充分利用,parentGroup线程池则只是在bind某个端口后,获得其中一个线程作为mainReactor。上图我将subReactor和worker thread合并成了一个个的loop,具体的请求操作均在loop中完成,下文会对loop有个稍微详细的解释。另附Doug Lee大神的Reactor介绍:Scalable IO in Java
以上均是Nio情况下。Oio采用的是Thread per Channel机制,即每个连接均创建一个线程负责该连接的所有事宜。
3、EventLoop和EventExecutor的实现原理
EventLoop和EventExecutor实现共有4个主要逻辑接口,EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup,内部实现、继承的逻辑表示无法直视,有种擦边球的感觉。具体的类图如下:
7a57723abbfe71b9dba03f22ea44a726.jpg (40.99 KB, 下载次数: 2931)
EventLoop和EventExecutor类图
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
引用此评论
初始化Channel,配置Channel参数,以及Pipeline。其中初始化Pipeline中,需要插入ServerBootstrapAcceptor对象用作acceptor接收客户端连接请求,acceptor也是一种ChannelInboundHandlerAdapter。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, currentChildAttrs)); } });
调用channel的unsafe对象注册selector,具体实现类为AbstractChannel$AbstractUnsafe.register。如下:
public final void register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { // 是否在Channel的loop中 register0(promise); } else { // 不在 try { eventLoop.execute(new Runnable() { // EventLoop执行一个任务 @Override public void run() { register0(promise); } }); } catch (Throwable t) { // ... } } }
eventLoop.execute(runnable);是比较重要的一个方法。在没有启动真正线程时,它会启动线程并将待执行任务放入执行队列里面。启动真正线程(startThread())会判断是否该线程已经启动,如果已经启动则会直接跳过,达到线程复用的目的。启动的线程,主要调用方法是NioEventLoop的run()方法,run()方法在下面有详细介绍:
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); // 启动线程 addTask(task); // 添加任务队列 // ... } if (!addTaskWakesUp) { wakeup(inEventLoop); } }
将 channel 注册到下一个 EventLoop 中。
2.2. 接收连接请求
由NioEventLoop.run()接收到请求:
3.1 AbstractNioMessageChannel$NioMessageUnsafe.read()
3.2 NioServerSocketChannel.doReadMessages()
获得childEventLoopGroup中的EventLoop,并依据该loop创建新的SocketChannel对象。
3.3 pipeline.fireChannelRead(readBuf.get(i));
readBuf.get(i)就是3.2中创建的SocketChannel对象。在2.2初始化Bootstrap的时候,已经将acceptor处理器插入pipeline中,所以理所当然,这个SocketChannel对象由acceptor处理器处理。
3.4 ServerBootstrapAcceptor$ServerBootstrapAcceptor.channelRead();
该方法流程与2.2、2.3类似,初始化子channel,并注册到相应的selector。注册的时候,也会调用eventLoop.execute用以执行注册任务,execute时,启动子线程。即启动了subReactor。
loop:
loop是我自己提出来的组件,仅是代表subReactor的主要运行逻辑。例子可以参考NioEventLoop.run()。
loop会不断循环一个过程:select -> processSelectedKeys(IO操作) -> runAllTasks(非IO操作),如下代码:
protected void run() { for (;;) { // ... try { if (hasTasks()) { // 如果队列中仍有任务 selectNow(); } else { select(); // ... } // ... final long ioStartTime = System.nanoTime(); // 用以控制IO任务与非IO任务的运行时间比 needsToSelectAgain = false; // IO任务 if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; // 非IO任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { // ... } } }
就目前而言,基本上IO任务都会走processSelectedKeysOptimized方法,该方法即代表使用了优化的SelectedKeys。除非采用了比较特殊的JDK实现,基本都会走该方法。
1. selectedKeys在openSelector()方法中初始化,Netty通过反射修改了Selector的selectedKeys成员和publicSelectedKeys成员。替换成了自己的实现——SelectedSelectionKeySet。 2. 从OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均采用了HashSet实现。HashSet采用HashMap实现,插入需要计算Hash并解决Hash冲突并挂链,而SelectedSelectionKeySet实现使用了双数组,每次插入尾部,扩展策略为double,调用flip()则返回当前数组并切换到另外一个数据。 3. ByteBuf中去掉了flip,在这里是否也可以呢?
processSelectedKeysOptimized主要流程如下:
final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
在获得attachment后,判断是Channel呢还是其他,其他则是NioTask。找遍代码并没有发现Netty有注册NioTask的行为,同时也没发现NioTask的实现类。只有在NioEventLoop.register方法中有注册NioTask至selector的行为,便判断该行为是由用户调用,可以针对某个Channel注册自己的NioTask。这里就只讲第一个processSelectdKey(k, (AbstractNioChannel) a),但代码就不贴了。
和常规的NIO代码类似,processSelectdKey是判断SeletedKeys的readyOps,并做出相应的操作。操作均是unsafe做的。如read可以参考:AbstractNioByteChannel$NioByteUnsafe.read()。IO操作的流程大致都是:
代码调用的是task.run(),而不是task.start()。即是单线程执行所有任务
protected boolean runAllTasks(long timeoutNanos) { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } // 控制时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }
以上内容从设计和代码层面总结Netty线程模型的大致内容,中间有很多不成熟的思考与理解,请轻拍与指正。
看源码过程中是比较折磨人的。首先得了解你学习东西的业务价值是哪里?即你学了这个之后能用在哪里,只是不考虑场景仅仅为了看代码而看代码比较难以深入理解其内涵;其次,看代码一定一定得从逻辑、结构层面看,从细节层面看只会越陷越深,有种一叶障目不见泰山的感觉;最后,最好是能够将代码逻辑、结构画出来,或者整理出思维导图啥的,可以用以理清思路。
本人属:兔
精华主题数超过100个。
连续任职达2年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
Copyright © 2014-2024 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.125000 second(s), 37 queries , Gzip On.