默认
发表评论 9
想开发IM:买成品怕坑?租第3方怕贵?找开源自已撸?尽量别走弯路了... 找站长给点建议
Netty 4.x学习(三):线程模型详解
阅读(81577) | 评论(9 收藏6 淘帖1 1

1、前言


前面两篇文章里已经说完了《Netty 4.x学习(一):ByteBuf详解》和《Netty 4.x学习(二):Channel和Pipeline详解》,这篇开始讲讲前面欠的债——线程模型(EventLoopEventExecutor)。

2、Netty的线程模型介绍


将具体代码实现前,先来谈谈Netty的线程模型。正如许多博客所提到的,Netty采用了Reactor模式,但是许多博客也只是提到了而已,同时大家也不会忘记附上几张Doug Lee大神的图,但是并不会深入的解释。为了更好的学习和理解Netty的线程模型,我在这里稍微详细的说一下我对它的理解。


Reactor模式有多个变种,Netty基于Multiple Reactors模式(如下图)做了一定的修改,Mutilple Reactors模式有多个reactor:mainReactor和subReactor,其中mainReactor负责客户端的连接请求,并将请求转交给subReactor,后由subReactor负责相应通道的IO请求,非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理。


Netty 4.x学习(三):线程模型详解_Multiple Reactors


Netty的线程模型基于Multiple Reactors模式,借用了mainReactor和subReactor的结构,但是从代码里看来,它并没有Thread Pool这个东东。Netty的subReactor与worker thread是同一个线程,采用IO多路复用机制,可以使一个subReactor监听并处理多个channel的IO请求,我给称之为:「Single Thread with many Channel」。我根据代码整理出下面这种Netty线程模型图:


Netty 4.x学习(三):线程模型详解_Netty线程模型


上图中的parentGroup和childGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池,childGroup线程池会被各个subReactor充分利用,parentGroup线程池则只是在bind某个端口后,获得其中一个线程作为mainReactor。上图我将subReactor和worker thread合并成了一个个的loop,具体的请求操作均在loop中完成,下文会对loop有个稍微详细的解释。另附Doug Lee大神的Reactor介绍:Scalable IO in Java

以上均是Nio情况下。Oio采用的是Thread per Channel机制,即每个连接均创建一个线程负责该连接的所有事宜。

3、EventLoop和EventExecutor的实现原理


EventLoopEventExecutor实现共有4个主要逻辑接口,EventLoopEventLoopGroupEventExecutorEventExecutorGroup,内部实现、继承的逻辑表示无法直视,有种擦边球的感觉。具体的类图如下:

Netty 4.x学习(三):线程模型详解_EventLoop和EventExecutor类图

3.1 EventLoopGroup类介绍:
主要方法是newChild,我理解为EventLoop的工厂类。**EventLoopGroup.newChild创建**EventLoop对象。OioEventLoopGroup除外,它没有实现newChild方法,调用父类的并创建ThreadPerChannelEventLoop对象。

3.2 EventLoop类介绍:
主要方法是run(),是整个Netty执行过程的逻辑代码实现,后面细说。

3.3 EventExecutorGroup类介绍:
线程池实现,主要成员是children数组,主要方法是next(),获得线程池中的一个线程,由子类调用。由于Oio采用的是Thread per Channel机制,所以没有实现前面两个。

3.4 EventExecutor类介绍:
Task的执行类,主要成员是taskQueue以及真正的运行线程对象executor,主要方法是taskQueue操作方法execute、takeTask、addTask等,以及doStartThread方法,后面细说。

全站即时通讯技术资料分类


[1] 网络编程基础资料:
TCP/IP详解 - 第11章·UDP:用户数据报协议
TCP/IP详解 - 第17章·TCP:传输控制协议
TCP/IP详解 - 第18章·TCP连接的建立与终止
TCP/IP详解 - 第21章·TCP的超时与重传
技术往事:改变世界的TCP/IP协议(珍贵多图、手机慎点)
通俗易懂-深入理解TCP协议(上):理论基础
通俗易懂-深入理解TCP协议(下):RTT、滑动窗口、拥塞处理
理论经典:TCP协议的3次握手与4次挥手过程详解
理论联系实际:Wireshark抓包分析TCP 3次握手、4次挥手过程
计算机网络通讯协议关系图(中文珍藏版)
UDP中一个包的大小最大能多大?
P2P技术详解(一):NAT详解——详细原理、P2P简介
P2P技术详解(二):P2P中的NAT穿越(打洞)方案详解
P2P技术详解(三):P2P技术之STUN、TURN、ICE详解
通俗易懂:快速理解P2P技术中的NAT穿透原理
高性能网络编程(一):单台服务器并发TCP连接数到底可以有多少
高性能网络编程(二):上一个10年,著名的C10K并发连接问题
高性能网络编程(三):下一个10年,是时候考虑C10M并发问题了
高性能网络编程(四):从C10K到C10M高性能网络应用的理论探索
不为人知的网络编程(一):浅析TCP协议中的疑难杂症(上篇)
不为人知的网络编程(二):浅析TCP协议中的疑难杂症(下篇)
不为人知的网络编程(三):关闭TCP连接时为什么会TIME_WAIT、CLOSE_WAIT
不为人知的网络编程(四):深入研究分析TCP的异常关闭
不为人知的网络编程(五):UDP的连接性和负载均衡
不为人知的网络编程(六):深入地理解UDP协议并用好它
网络编程懒人入门(一):快速理解网络通信协议(上篇)
网络编程懒人入门(二):快速理解网络通信协议(下篇)
网络编程懒人入门(三):快速理解TCP协议一篇就够
网络编程懒人入门(四):快速理解TCP和UDP的差异
Netty干货分享:京东京麦的生产级TCP网关技术实践总结
>> 更多同类文章 ……

[2] NIO异步网络编程资料:
Java新一代网络编程模型AIO原理及Linux系统AIO介绍
有关“为何选择Netty”的11个疑问及解答
开源NIO框架八卦——到底是先有MINA还是先有Netty?
选Netty还是Mina:深入研究与对比(一)
选Netty还是Mina:深入研究与对比(二)
NIO框架入门(一):服务端基于Netty4的UDP双向通信Demo演示
NIO框架入门(二):服务端基于MINA2的UDP双向通信Demo演示
NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战
NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战
Netty 4.x学习(一):ByteBuf详解
Netty 4.x学习(二):Channel和Pipeline详解
Netty 4.x学习(三):线程模型详解
Apache Mina框架高级篇(一):IoFilter详解
Apache Mina框架高级篇(二):IoHandler详解
MINA2 线程原理总结(含简单测试实例)
Apache MINA2.0 开发指南(中文版)[附件下载]
MINA、Netty的源代码(在线阅读版)已整理发布
解决MINA数据传输中TCP的粘包、缺包问题(有源码)
解决Mina中多个同类型Filter实例共存的问题
实践总结:Netty3.x升级Netty4.x遇到的那些坑(线程篇)
实践总结:Netty3.x VS Netty4.x的线程模型
详解Netty的安全性:原理介绍、代码演示(上篇)
详解Netty的安全性:原理介绍、代码演示(下篇)
详解Netty的优雅退出机制和原理
NIO框架详解:Netty的高性能之道
Twitter:如何使用Netty 4来减少JVM的GC开销(译文)
绝对干货:基于Netty实现海量接入的推送服务技术要点
Netty干货分享:京东京麦的生产级TCP网关技术实践总结
>> 更多同类文章 ……

[3] 有关IM/推送的通信格式、协议的选择:
简述传输层协议TCP和UDP的区别
为什么QQ用的是UDP协议而不是TCP协议?
移动端即时通讯协议选择:UDP还是TCP?
如何选择即时通讯应用的数据传输格式
强列建议将Protobuf作为你的即时通讯应用数据传输格式
全方位评测:Protobuf性能到底有没有比JSON快5倍?
移动端IM开发需要面对的技术问题(含通信协议选择)
简述移动端IM开发的那些坑:架构设计、通信协议和客户端
理论联系实际:一套典型的IM通信协议设计详解
58到家实时消息系统的协议设计等技术实践分享
详解如何在NodeJS中使用Google的Protobuf
>> 更多同类文章 ……

[4] 有关IM/推送的心跳保活处理:
应用保活终极总结(一):Android6.0以下的双进程守护保活实践
应用保活终极总结(二):Android6.0及以上的保活实践(进程防杀篇)
应用保活终极总结(三):Android6.0及以上的保活实践(被杀复活篇)
Android进程保活详解:一篇文章解决你的所有疑问
Android端消息推送总结:实现原理、心跳保活、遇到的问题等
深入的聊聊Android消息推送这件小事
为何基于TCP协议的移动端IM仍然需要心跳保活机制?
微信团队原创分享:Android版微信后台保活实战分享(进程保活篇)
微信团队原创分享:Android版微信后台保活实战分享(网络保活篇)
移动端IM实践:实现Android版微信的智能心跳机制
移动端IM实践:WhatsApp、Line、微信的心跳策略分析
>> 更多同类文章 ……

[5] 有关WEB端即时通讯开发:
新手入门贴:史上最全Web端即时通讯技术原理详解
Web端即时通讯技术盘点:短轮询、Comet、Websocket、SSE
SSE技术详解:一种全新的HTML5服务器推送事件技术
Comet技术详解:基于HTTP长连接的Web端实时通信技术
新手快速入门:WebSocket简明教程
WebSocket详解(一):初步认识WebSocket技术
WebSocket详解(二):技术原理、代码演示和应用案例
WebSocket详解(三):深入WebSocket通信协议细节
socket.io实现消息推送的一点实践及思路
LinkedIn的Web端即时通讯实践:实现单机几十万条长连接
Web端即时通讯技术的发展与WebSocket、Socket.io的技术实践
Web端即时通讯安全:跨站点WebSocket劫持漏洞详解(含示例代码)
开源框架Pomelo实践:搭建Web端高性能分布式IM聊天服务器
使用WebSocket和SSE技术实现Web端消息推送
详解Web端通信方式的演进:从Ajax、JSONP 到 SSE、Websocket
>> 更多同类文章 ……

[6] 有关IM架构设计:
浅谈IM系统的架构设计
简述移动端IM开发的那些坑:架构设计、通信协议和客户端
一套海量在线用户的移动端IM架构设计实践分享(含详细图文)
一套原创分布式即时通讯(IM)系统理论架构方案
从零到卓越:京东客服即时通讯系统的技术架构演进历程
蘑菇街即时通讯/IM服务器开发之架构选择
腾讯QQ1.4亿在线用户的技术挑战和架构演进之路PPT
微信后台基于时间序的海量数据冷热分级架构设计实践
微信技术总监谈架构:微信之道——大道至简(演讲全文)
如何解读《微信技术总监谈架构:微信之道——大道至简》
快速裂变:见证微信强大后台架构从0到1的演进历程(一)
17年的实践:腾讯海量产品的技术方法论
移动端IM中大规模群消息的推送如何保证效率、实时性?
现代IM系统中聊天消息的同步和存储方案探讨
>> 更多同类文章 ……

[7] 有关IM安全的文章:
即时通讯安全篇(一):正确地理解和使用Android端加密算法
即时通讯安全篇(二):探讨组合加密算法在IM中的应用
即时通讯安全篇(三):常用加解密算法与通讯安全讲解
即时通讯安全篇(四):实例分析Android中密钥硬编码的风险
即时通讯安全篇(五):对称加密技术在Android平台上的应用实践
即时通讯安全篇(六):非对称加密技术的原理与应用实践
传输层安全协议SSL/TLS的Java平台实现简介和Demo演示
理论联系实际:一套典型的IM通信协议设计详解(含安全层设计)
微信新一代通信安全解决方案:基于TLS1.3的MMTLS详解
来自阿里OpenIM:打造安全可靠即时通讯服务的技术实践分享
简述实时音视频聊天中端到端加密(E2EE)的工作原理
移动端安全通信的利器——端到端加密(E2EE)技术详解
Web端即时通讯安全:跨站点WebSocket劫持漏洞详解(含示例代码)
通俗易懂:一篇掌握即时通讯的消息传输安全原理
>> 更多同类文章 ……

[8] 有关实时音视频开发:
专访微信视频技术负责人:微信实时视频聊天技术的演进
即时通讯音视频开发(一):视频编解码之理论概述
即时通讯音视频开发(二):视频编解码之数字视频介绍
即时通讯音视频开发(三):视频编解码之编码基础
即时通讯音视频开发(四):视频编解码之预测技术介绍
即时通讯音视频开发(五):认识主流视频编码技术H.264
即时通讯音视频开发(六):如何开始音频编解码技术的学习
即时通讯音视频开发(七):音频基础及编码原理入门
即时通讯音视频开发(八):常见的实时语音通讯编码标准
即时通讯音视频开发(九):实时语音通讯的回音及回音消除概述
即时通讯音视频开发(十):实时语音通讯的回音消除技术详解
即时通讯音视频开发(十一):实时语音通讯丢包补偿技术详解
即时通讯音视频开发(十二):多人实时音视频聊天架构探讨
即时通讯音视频开发(十三):实时视频编码H.264的特点与优势
即时通讯音视频开发(十四):实时音视频数据传输协议介绍
即时通讯音视频开发(十五):聊聊P2P与实时音视频的应用情况
即时通讯音视频开发(十六):移动端实时音视频开发的几个建议
即时通讯音视频开发(十七):视频编码H.264、VP8的前世今生
实时语音聊天中的音频处理与编码压缩技术简述
网易视频云技术分享:音频处理与压缩技术快速入门
学习RFC3550:RTP/RTCP实时传输协议基础知识
简述开源实时音视频技术WebRTC的优缺点
良心分享:WebRTC 零基础开发者教程(中文)
开源实时音视频技术WebRTC中RTP/RTCP数据传输协议的应用
基于RTMP数据传输协议的实时流媒体技术研究(论文全文)
声网架构师谈实时音视频云的实现难点(视频采访)
浅谈开发实时视频直播平台的技术要点
还在靠“喂喂喂”测试实时语音通话质量?本文教你科学的评测方法!
实现延迟低于500毫秒的1080P实时音视频直播的实践分享
移动端实时视频直播技术实践:如何做到实时秒开、流畅不卡
如何用最简单的方法测试你的实时音视频方案
技术揭秘:支持百万级粉丝互动的Facebook实时视频直播
简述实时音视频聊天中端到端加密(E2EE)的工作原理
移动端实时音视频直播技术详解(一):开篇
移动端实时音视频直播技术详解(二):采集
移动端实时音视频直播技术详解(三):处理
移动端实时音视频直播技术详解(四):编码和封装
移动端实时音视频直播技术详解(五):推流和传输
移动端实时音视频直播技术详解(六):延迟优化
理论联系实际:实现一个简单地基于HTML5的实时视频直播
IM实时音视频聊天时的回声消除技术详解
浅谈实时音视频直播中直接影响用户体验的几项关键技术指标
如何优化传输机制来实现实时音视频的超低延迟?
首次披露:快手是如何做到百万观众同场看直播仍能秒开且不卡顿的?
实时通信RTC技术栈之:视频编解码
开源实时音视频技术WebRTC在Windows下的简明编译教程
Android直播入门实践:动手搭建一套简单的直播系统
>> 更多同类文章 ……

[9] IM开发综合文章:
移动端IM中大规模群消息的推送如何保证效率、实时性?
移动端IM开发需要面对的技术问题
开发IM是自己设计协议用字节流好还是字符流好?
请问有人知道语音留言聊天的主流实现方式吗?
IM消息送达保证机制实现(一):保证在线实时消息的可靠投递
IM消息送达保证机制实现(二):保证离线消息的可靠投递
如何保证IM实时消息的“时序性”与“一致性”?
一个低成本确保IM消息时序的方法探讨
IM单聊和群聊中的在线状态同步应该用“推”还是“拉”?
IM群聊消息如此复杂,如何保证不丢不重?
谈谈移动端 IM 开发中登录请求的优化
移动端IM登录时拉取数据如何作到省流量?
浅谈移动端IM的多点登陆和消息漫游原理
完全自已开发的IM该如何设计“失败重试”机制?
通俗易懂:基于集群的移动端IM接入层负载均衡方案分享
微信对网络影响的技术试验及分析(论文全文)
即时通讯系统的原理、技术和应用(技术论文)
开源IM工程“蘑菇街TeamTalk”的现状:一场有始无终的开源秀
QQ音乐团队分享:Android中的图片压缩技术详解(上篇)
QQ音乐团队分享:Android中的图片压缩技术详解(下篇)
腾讯原创分享(一):如何大幅提升移动网络下手机QQ的图片传输速度和成功率
腾讯原创分享(二):如何大幅压缩移动网络下APP的流量消耗(上篇)
腾讯原创分享(二):如何大幅压缩移动网络下APP的流量消耗(下篇)
如约而至:微信自用的移动端IM网络层跨平台组件库Mars已正式开源
基于社交网络的Yelp是如何实现海量用户图片的无损压缩的?
>> 更多同类文章 ……

[10] 开源移动端IM技术框架资料:
开源移动端IM技术框架MobileIMSDK:快速入门
开源移动端IM技术框架MobileIMSDK:常见问题解答
开源移动端IM技术框架MobileIMSDK:压力测试报告
>> 更多同类文章 ……

[11] 有关推送技术的文章:
iOS的推送服务APNs详解:设计思路、技术原理及缺陷等
信鸽团队原创:一起走过 iOS10 上消息推送(APNS)的坑
Android端消息推送总结:实现原理、心跳保活、遇到的问题等
扫盲贴:认识MQTT通信协议
一个基于MQTT通信协议的完整Android推送Demo
IBM技术经理访谈:MQTT协议的制定历程、发展现状等
求教android消息推送:GCM、XMPP、MQTT三种方案的优劣
移动端实时消息推送技术浅析
扫盲贴:浅谈iOS和Android后台实时消息推送的原理和区别
绝对干货:基于Netty实现海量接入的推送服务技术要点
移动端IM实践:谷歌消息推送服务(GCM)研究(来自微信)
为何微信、QQ这样的IM工具不使用GCM服务推送消息?
极光推送系统大规模高并发架构的技术实践分享
从HTTP到MQTT:一个基于位置服务的APP数据通信实践概述
魅族2500万长连接的实时消息推送架构的技术实践分享
专访魅族架构师:海量长连接的实时消息推送系统的心得体会
深入的聊聊Android消息推送这件小事
基于WebSocket实现Hybrid移动应用的消息推送实践(含代码示例)
一个基于长连接的安全可扩展的订阅/推送服务实现思路
实践分享:如何构建一套高可用的移动端消息推送系统?
Go语言构建千万级在线的高并发消息推送系统实践(来自360公司)
腾讯信鸽技术分享:百亿级实时消息推送的实战经验
百万在线的美拍直播弹幕系统的实时推送技术实践之路
>> 更多同类文章 ……

[12] 更多即时通讯技术好文分类:
http://www.52im.net/forum.php?mod=collection&op=all

即时通讯网 - 即时通讯开发者社区! 来源: - 即时通讯开发者社区!

上一篇:Twitter:如何使用Netty 4来减少JVM的GC开销(译文)下一篇:Netty 4.x学习(一):ByteBuf详解

本帖已收录至以下技术专辑

推荐方案
评论 9
学习了,赞一个
签名: 春节快乐
写的真不错  虽然还没懂main sub 模式 感觉快到头了 (*^__^*)
线程模型有接触过,但是还没有头绪,希望通过这篇文章能有所启发。
现在正需要了解这一块,可惜看不太懂,哎
签名: 好想把妹!
Netty和Mina这样的框架,能实现服务端大负载,核心就在线程模型,值得去搞清楚它的原理。
是啊,最近在搞Netty,看到这文章不错,就转过来了,顺便给老大的论坛添砖加瓦啊
签名: 该会员没有填写今日想说内容.
这篇文章原作是支付宝的开发人员,研究的挺深入,文章真长,用不着的时候实在看不下去了。。。
本帖最后由 什么狗屁云 于 2016-02-25 15:59 编辑


原文链接在此:http://hongweiyi.com/2014/01/netty-4-x-thread-model/,感谢原作者。
写的非常详尽,借花献佛,希望对需要的人有用。
签名: 该会员没有填写今日想说内容.
本帖最后由 什么狗屁云 于 2016-02-25 15:57 编辑


接正文:

4、NioEventLoopGroup实现


这里以常用的NioEventLoopGroup为例。NioEventLoopGroup在Bootstrap初始化时作为参数传入构造方法,由于NioEventLoopGroup涉及的代码较多,就不大篇幅的贴代码了,只写流程性的文字或相应类和方法:
4.1 mainReactor:
1. Bootstrap.bind(port)
2. Bootstrap.initAndRegister()
2.1. Boostrap.init()

初始化Channel,配置Channel参数,以及Pipeline。其中初始化Pipeline中,需要插入ServerBootstrapAcceptor对象用作acceptor接收客户端连接请求,acceptor也是一种ChannelInboundHandlerAdapter。

p.addLast(new ChannelInitializer<Channel>() {
  @Override
  public void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
       currentChildAttrs));
  }
});

调用channel的unsafe对象注册selector,具体实现类为AbstractChannel$AbstractUnsafe.register。如下:

public final void register(final ChannelPromise promise) {
  if (eventLoop.inEventLoop()) {  // 是否在Channel的loop中
    register0(promise);
  } else {  // 不在
    try {
      eventLoop.execute(new Runnable() {  // EventLoop执行一个任务
        @Override
        public void run() {
          register0(promise);
        }
      });
    } catch (Throwable t) {
    // ...
    }
  }
}

eventLoop.execute(runnable);是比较重要的一个方法。在没有启动真正线程时,它会启动线程并将待执行任务放入执行队列里面。启动真正线程(startThread())会判断是否该线程已经启动,如果已经启动则会直接跳过,达到线程复用的目的。启动的线程,主要调用方法是NioEventLoop的run()方法,run()方法在下面有详细介绍:

public void execute(Runnable task) {
  if (task == null) {
    throw new NullPointerException("task");
  }

  boolean inEventLoop = inEventLoop();
  if (inEventLoop) {
    addTask(task);
  } else {
    startThread();  // 启动线程
    addTask(task);  // 添加任务队列

    // ...

  }

  if (!addTaskWakesUp) {
    wakeup(inEventLoop);
  }
}

4 group().register(channel)

将 channel 注册到下一个 EventLoop 中。


2.2. 接收连接请求

      由NioEventLoop.run()接收到请求:

3.1 AbstractNioMessageChannel$NioMessageUnsafe.read()

3.2 NioServerSocketChannel.doReadMessages()

获得childEventLoopGroup中的EventLoop,并依据该loop创建新的SocketChannel对象。


3.3 pipeline.fireChannelRead(readBuf.get(i));

readBuf.get(i)就是3.2中创建的SocketChannel对象。在2.2初始化Bootstrap的时候,已经将acceptor处理器插入pipeline中,所以理所当然,这个SocketChannel对象由acceptor处理器处理。


3.4 ServerBootstrapAcceptor$ServerBootstrapAcceptor.channelRead();

该方法流程与2.2、2.3类似,初始化子channel,并注册到相应的selector。注册的时候,也会调用eventLoop.execute用以执行注册任务,execute时,启动子线程。即启动了subReactor。


4.2 subReactor:
subReactor的流程较为简单,主体完全依赖于loop,用以执行read、write还有自定义的NioTask操作,就不深入了,直接跳过解释loop过程。

loop:

loop是我自己提出来的组件,仅是代表subReactor的主要运行逻辑。例子可以参考NioEventLoop.run()。

loop会不断循环一个过程:select -> processSelectedKeys(IO操作) -> runAllTasks(非IO操作),如下代码:

protected void run() {
  for (;;) {
    // ...
    try {
      if (hasTasks()) { // 如果队列中仍有任务
        selectNow();
      } else {
        select();
        // ...
      }

      // ...

      final long ioStartTime = System.nanoTime();  // 用以控制IO任务与非IO任务的运行时间比
      needsToSelectAgain = false;
      // IO任务
      if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
      } else {
        processSelectedKeysPlain(selector.selectedKeys());
      }
      final long ioTime = System.nanoTime() - ioStartTime;

      final int ioRatio = this.ioRatio;
      // 非IO任务
      runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

      if (isShuttingDown()) {
        closeAll();
        if (confirmShutdown()) {
          break;
        }
      }
    } catch (Throwable t) {
    // ...
    }
  }
}

就目前而言,基本上IO任务都会走processSelectedKeysOptimized方法,该方法即代表使用了优化的SelectedKeys。除非采用了比较特殊的JDK实现,基本都会走该方法。


1. selectedKeys在openSelector()方法中初始化,Netty通过反射修改了Selector的selectedKeys成员和publicSelectedKeys成员。替换成了自己的实现——SelectedSelectionKeySet。
2. 从OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均采用了HashSet实现。HashSet采用HashMap实现,插入需要计算Hash并解决Hash冲突并挂链,而SelectedSelectionKeySet实现使用了双数组,每次插入尾部,扩展策略为double,调用flip()则返回当前数组并切换到另外一个数据。
3. ByteBuf中去掉了flip,在这里是否也可以呢?


processSelectedKeysOptimized主要流程如下:

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
  processSelectedKey(k, (AbstractNioChannel) a);
} else {
  @SuppressWarnings("unchecked")
  NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  processSelectedKey(k, task);
}

在获得attachment后,判断是Channel呢还是其他,其他则是NioTask。找遍代码并没有发现Netty有注册NioTask的行为,同时也没发现NioTask的实现类。只有在NioEventLoop.register方法中有注册NioTask至selector的行为,便判断该行为是由用户调用,可以针对某个Channel注册自己的NioTask。这里就只讲第一个processSelectdKey(k, (AbstractNioChannel) a),但代码就不贴了。


和常规的NIO代码类似,processSelectdKey是判断SeletedKeys的readyOps,并做出相应的操作。操作均是unsafe做的。如read可以参考:AbstractNioByteChannel$NioByteUnsafe.read()。IO操作的流程大致都是:

  • 获得数据
  • 调用pipeline的方法,fireChannel***
  • 插入任务队列

执行完所有IO操作后,开始执行非IO任务(runAllTasks)。Netty会控制IO和非IO任务的比例,ioTime * (100 - ioRatio) / ioRatio,默认ioRatio为50。runAllTasks乃是父类SingleThreadExecutor的方法。方法主体很简单,将任务从TaskQueue拎出来,直接调用任务的run方法即可。

代码调用的是task.run(),而不是task.start()。即是单线程执行所有任务


protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}

// 控制时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

this.lastExecutionTime = lastExecutionTime;
return true;
}

5、总结


以上内容从设计和代码层面总结Netty线程模型的大致内容,中间有很多不成熟的思考与理解,请轻拍与指正。


看源码过程中是比较折磨人的。首先得了解你学习东西的业务价值是哪里?即你学了这个之后能用在哪里,只是不考虑场景仅仅为了看代码而看代码比较难以深入理解其内涵;其次,看代码一定一定得从逻辑、结构层面看,从细节层面看只会越陷越深,有种一叶障目不见泰山的感觉;最后,最好是能够将代码逻辑、结构画出来,或者整理出思维导图啥的,可以用以理清思路。

签名: 该会员没有填写今日想说内容.
打赏楼主 ×
使用微信打赏! 使用支付宝打赏!

返回顶部