当前位置:首页>热门 > >正文

世界观察:【Netty源码分析】03 客户端接入流程

  • 2023-03-28 14:15:38来源:腾讯云

Netty服务端启动完成,这时候客户端连接就可以接入进来了,下面我们就来分析下客户端连接接入的流程。

之前分析过NioEventLoop线程启动方法是startThread(),由于这个方法里面的逻辑比较复杂,并没有展开,这一节就是从这个方法开始分析。


(资料图片仅供参考)

startThread

private void startThread() {    if (state == ST_NOT_STARTED) {        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            try {                doStartThread();            } catch (Throwable cause) {                STATE_UPDATER.set(this, ST_NOT_STARTED);                PlatformDependent.throwException(cause);            }        }    }}

这个方法主要主要完成2件事:

利用casNioEventLoop的状态由ST_NOT_STARTED修改成ST_STARTED,即表示NioEventLoop线程启动;执行doStartThread()方法;

doStartThread()方法看着比较复杂,核心逻辑如下,向线程池执行器executor提交一个任务,而这个线程池执行器类型是ThreadPerTaskExecutor,即每次执行任务都会创建一个新线程,而且这个任务是无限循环的:事件轮询selector.select()、事件处理processSelectedKeys()和任务队列处理runAllTasks(),这样NioEventLoop就和具体的Thread线程进行了关联:

private void doStartThread() {    assert thread == null;    //executor线程执行器,类型是:ThreadPerTaskExecutor,即每次执行任务都会创建一个新线程    executor.execute(new Runnable() {        @Override        public void run() {            //将executor线程执行器创建的线程:FastThreadLocalThread保存到EventLoop的全局变量中,相当于thread和EventLoop的绑定            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }            boolean success = false;            updateLastExecutionTime();            try {                //然后调用EventLoop中的run方法进行启动                SingleThreadEventExecutor.this.run();                success = true;            } catch (Throwable t) {                logger.warn("Unexpected exception from an event executor: ", t);            }        }    });}

该方法大致完成2件事:

thread = Thread.currentThread();:将executor线程池分配的线程保存起来,这样就完成了NioEventLoopThread线程的关联;SingleThreadEventExecutor.this.run():具体实现在NioEventLoop.run()方法,所以,startThread()核心就是分配一个线程运行NioEventLoop.run()方法。
protected void run() {    for (;;) {        try {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:// 默认实现下,不存在这个情况                    continue;                case SelectStrategy.BUSY_WAIT:                case SelectStrategy.SELECT:                    //selector.select轮询io事件                    select(wakenUp.getAndSet(false));                    if (wakenUp.get()) {                        selector.wakeup();                    }                default:                }            } catch (IOException e) {                rebuildSelector0();                handleLoopException(e);                continue;            }            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            if (ioRatio == 100) {               try {                    // 处理 Channel 感兴趣的就绪 IO 事件                    processSelectedKeys();                } finally {                    // 运行所有普通任务和定时任务,不限制时间                    runAllTasks();                }            } else {                final long ioStartTime = System.nanoTime();                try {                    // 处理IO事件                    processSelectedKeys();                } finally {                    // 运行所有普通任务和定时任务,限制时间                    final long ioTime = System.nanoTime() - ioStartTime;                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            }        } catch (Throwable t) {            handleLoopException(t);        }        //  EventLoop 优雅关闭        try {            if (isShuttingDown()) {                closeAll();                if (confirmShutdown()) {                    return;                }            }        } catch (Throwable t) {            handleLoopException(t);        }    }}

该方法主要完成三件事:

select(wakenUp.getAndSet(false)):主要执行selector.select()方法进行事件轮询processSelectedKeys():如果轮询到事件,会在这里进行处理runAllTasks():处理任务队列和定时任务队列中的任务

下面我们就分别来分析下这三个方法。

select

private void select(boolean oldWakenUp) throws IOException {    Selector selector = this.selector;    try {        int selectCnt = 0;//计数器置0        long currentTimeNanos = System.nanoTime();        /**         * selectDeadLineNanos是select()方法运行的截止时间         *         * currentTimeNanos:可以看成当前时间         * delayNanos(currentTimeNanos):获取间隔时间,这里分为两种情况:         * 1、netty里面定时任务队列scheduledTaskQueue是按照延迟时间从小到大进行排序,如果定时任务队列中有任务,         * 则只需要获取到第一个任务的启动时间 - 当前时间 = select()方法可以运行的时间间隔,即:select()方法要在第一个定时任务执行之前退出,这样才能去执行定时任务         * 2、如果定时任务队列没有任务,则delayNanos(currentTimeNanos)返回1秒对应的时间间隔         *         */        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);        for (;;) {            //计算超时时间            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;            /**             * timeoutMillis <= 0表示当前已经超时了,不能继续向下执行select()方法了,需要立即退出select方法,在退出前还有个判断:selectCnt == 0             * selectCnt == 0表示第一次进入循环,则执行下Selector.selectNow()检出准备好的网络IO事件,该方法不会阻塞,             */            if (timeoutMillis <= 0) {                if (selectCnt == 0) {                    selector.selectNow();                    selectCnt = 1;                }                break;            }            /**             * 如果没有超时,但是通过hasTasks()判断到taskQueue任务队列中有需要执行的任务,这时也需要退出select()方法             * 1、利用cas将wakeUp值由false变成true,wakeUp=true表示线程处于唤醒状态,可以执行任务,进入select()方法前会把wakeUp设置成false             * 表示线程处于select()方法阻塞中,不能处理任务队列中的任务,这时只要处理Selector.select()             * 2、退出前执行一次:selector.selectNow()             */            if (hasTasks() && wakenUp.compareAndSet(false, true)) {                //有任务,进行一次非阻塞式的select                selector.selectNow();                selectCnt = 1;                break;            }            //调用select方法,阻塞时间为上面算出的最近一个将要超时的定时任务时间            /**             * 未超时,任务队列中也没有需要执行的任务,这时就可以放心的执行Selector.select()方法了,这里带上之前计算出的超时时间             * 如果之前计算时存在定时任务,则保证在第一个定时任务启动前唤醒即可,没有定时任务则默认超时1秒             */            int selectedKeys = selector.select(timeoutMillis);            //轮询次数+1            selectCnt ++;            /**             * 发生如下几种情况,select()方法都需要退出:             * 1、selectedKeys != 0:表示轮询到IO事件             * 2、oldWakenUp:这个是入参,值为false,是在select()方法中控制是否需要退出,默认是没有使用到的,没有意义             * 3、wakenUp.get():进入select()方法之前,wakeUp被设置成false,如果这里为true,表示已有外部线程对线程进行唤醒操作,             *      一般就是addTask()添加新任务时会触发唤醒,然后及时去执行taskQueue中的任务             * 4、hasTasks() || hasScheduledTasks():判断任务队列和定时任务队列是否有任务需要执行             */            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                break;            }            //线程中断响应:如果线程被中断,计数器置1,break退出for循环,则退出select()检测            if (Thread.interrupted()) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely because " +                            "Thread.currentThread().interrupt() was called. Use " +                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");                }                selectCnt = 1;                break;            }            /**             * 正常情况下:time >= currentTimeNanos + TimeUnit.MILLISECONDS.toNanos(timeoutMillis)             * 但是,jdk nio中存在一个bug,selector.select(timeoutMillis)在没有IO事件触发时并不会等待超时而是立即返回,造成空轮询             *             * 下面就是Netty解决空轮询问题             * 1、if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)             *      表示selector.select(timeoutMillis)经过超时后才被唤醒,属于正常情况,把selectCnt重置成1             * 2、如果不是,表示可能发生空轮询selectCnt不会被重置成1,for循环一次selectCnt就会被累加1次;             * 3、等到 selectCnt > 门限值,默认是512,可以通过io.netty.selectorAutoRebuildThreshold参数设置,             *      则判断真正发生了nio空循环bug,则重建Selector替换掉当前这个出问题的Selector             */            long time = System.nanoTime();            //判断执行了一次阻塞式select后,当前时间和开始时间是否大于超时时间。(大于是很正常的,小于的话,说明没有执行发生了空轮询)            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                selectCnt = 1;             } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                selector = selectRebuildSelector(selectCnt);                selectCnt = 1;                break;            }            currentTimeNanos = time;        }    } catch (CancelledKeyException e) {        if (logger.isDebugEnabled()) {            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                        selector, e);        }    }}

select()方法代码看着很复杂,其核心思想理解再来分析就比较简单的。select()主要是用来执行selector.select()IO事件进行轮询,作为server,这里就是轮询OP_ACCEPT事件,看是否有客户端接入进来。但是NioEventLoop是单线程处理模式,不可能让线程一直处理selector.select(),还有轮询到的事件以及任务队列中任务等等都需要使用这个线程进行处理,所以,上面一大堆代码都是用来判断什么时候退出select()方法的,总结下退出逻辑主要分为如下几种情况:

在执行selector.select()方法之前,计算出一个超时时间,超时时间默认是1秒,如果定时任务队列有任务,则取出第一个任务(按顺序存放),保证在该定时任务执行之前退出select()方法即可;如果超时就退出,退出前判断是否是第一次进入for循环,如果是在退出之前调用一次无阻塞的selector.selectNow()轮询下判断任务队列taskQueue中是否有任务,如果有则将wakenUp利用cas设置成true,执行下无阻塞的selector.selectNow()轮询后退出select()方法如果上面情况都不存在,开始执行阻塞selector.select(timeoutMillis)轮询,并将之前计算的超时时间带上;selector.select(timeoutMillis)执行完成后,继续判断是否需要退出select()方法,发生如下任一情况则要退出:轮询到IO事件,则需要退出select()方法去处理事件外部线程对对线程执行过唤醒操作,比如addTask()等操作需要唤醒线程执行队列任务,才能及时去执行taskQueue中的任务:进入select()方法之前,wakeUp被设置成false,如果这里为true,表示已有外部线程对线程进行唤醒操作任务队列taskQueue或定时任务队列scheduledTaskQueue中有需要处理的任务,这时需要退出select()方法,转去执行任务

processSelectedKeys

private void processSelectedKeys() {    //selectedKeys != null表示已对Selector进行优化过,替换掉Selector内部的selectedKeys,正常情况下进入这个流程    if (selectedKeys != null) {        processSelectedKeysOptimized();    } else {        processSelectedKeysPlain(selector.selectedKeys());    } }

processSelectedKeys()主要是对selector.select()方法轮询到的事件进行处理,作为server,如果轮询到OP_ACCEPT,就表示有客户端接入进来了,那我们就跟踪下这个方法,看接入进来的客户端处理流程。

Netty是对Selector进行了优化,将selectedKeysSet实现替换成了数组实现,提升性能,所以,这里一般走的是processSelectedKeysOptimized()这个流程:

private void processSelectedKeysOptimized() {    for (int i = 0; i < selectedKeys.size; ++i) {        final SelectionKey k = selectedKeys.keys[i];        selectedKeys.keys[i] = null;        //k.attachment()获取到的就是NioServerSocketChannel        final Object a = k.attachment();        if (a instanceof AbstractNioChannel) {//一般是走这个分支流程            processSelectedKey(k, (AbstractNioChannel) a);        } else {            @SuppressWarnings("unchecked")            NioTask task = (NioTask) a;            processSelectedKey(k, task);        }        if (needsToSelectAgain) {            selectedKeys.reset(i + 1);            selectAgain();            i = -1;        }    }}

这里关键一点是Object a = k.attachment();,之前分析过向selector注册时把NioServerSocketChannel作为attachment添加进去,所以,这里取出来的就是NioServerSocketChannel对象。processSelectedKey()方法通过if判断事件类型进行处理,server端这里肯定是OP_ACCEPT

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

具体的处理逻辑交由Unsafe对象进行处理:

public void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                //doReadMessages()读出来一个客户端连接的Channel                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                allocHandle.incMessagesRead(localRead);            } while (allocHandle.continueReading());        } catch (Throwable t) {            exception = t;        }        int size = readBuf.size();        for (int i = 0; i < size; i ++) {            readPending = false;            pipeline.fireChannelRead(readBuf.get(i));        }        readBuf.clear();        allocHandle.readComplete();        pipeline.fireChannelReadComplete();        if (exception != null) {            closed = closeOnReadError(exception);            pipeline.fireExceptionCaught(exception);        }        if (closed) {            inputShutdown = true;            if (isOpen()) {                close(voidPromise());            }        }    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

这个类主要完成2件事:

doReadMessages(readBuf):调用serverSocketChannel.accept()接收到客户端连接socketChannel,并封装成Netty中类型:NioSocketChannel,然后放入到readBuf集合中;pipeline.fireChannelRead(readBuf.get(i));:将读入的客户端连接作为参数,即NioSocketChannel对象,通过pipeline触发channelRead事件进行handler间传播,注意这里的pipelineNioServerSocketChannel中的,即server端的。最终会进入到ServerBootstrapAcceptor#channelRead方法中进行处理。
public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    child.pipeline().addLast(childHandler);    setChannelOptions(child, childOptions, logger);    for (Entry, Object> e: childAttrs) {        child.attr((AttributeKey) e.getKey()).set(e.getValue());    }    try {        //childGroup.register(child):会给客户端连接进来的Channel从线程池中获取一个EventLoop绑定给Channel        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

这个方法主要完成3件事:

child.pipeline().addLast(childHandler):向NioSocketChannel中添加ServerBootstrap.childHandler(new TestServerInitializer()),后面通过触发handlerAdded()时回调initChannel()实现向pipeline添加handler;设置optionattr信息;childGroup.register(child):将客户端连接NioSocketChannel注册到NioEventLoop实例上,基本和之前分析NioServerSocketChannel注册逻辑一致,这个过程中会触发三个事件:handlerAddedchannelRegisteredchannelActive,之前NioServerSocketChannel注册时只能触发前两个,绑定端口后才能触发第三个事件,客户端连接不存在端口绑定问题,所以这里会直接触发channelActive。和NioServerSocketChannel一样,真正向selector注册感兴趣事件就是在channelActive触发这里:
public void channelActive(ChannelHandlerContext ctx) {    //触发channelActive事件传播    ctx.fireChannelActive();    //向selector注册真正关注的事件    readIfIsAutoRead();}

channelActive和之前分析NioServerSocketChannel的处理逻辑一致,就不再分析。

总结

分析到这里,基本搞清楚了客户端接入的处理流程,现在再次总结下:

NioServerSocketChannel绑定的NioEventLoop不停轮询OP_ACCEPT,触发后通过调用java api获取到ServerSocket,然后包装成NioSocketChannel;然后触发channelRead事件传播,然后会进入server pipeline中非常重要的一个handlerServerBootstrapAcceptor,连接处理器专门处理客户端连接;在ServerBootstrapAcceptor#channelRead()方法中,完成NioSocketChannel的设置:optionattrhandler添加等;最重要的是将channel注册到NioEventLoop上,注册过程中会触发三种事件:handlerAddedchannelRegisteredchannelActive,和之前分析server channel注册过程一样,最终在channelActive这里向selector注册真正感兴趣IO事件,整个流程全部完成。

标签:

延伸阅读

推荐阅读

世界观察:【Netty源码分析】03 客户端接入流程

Netty服务端启动完成,这时候客户端连接就可以接入进来了,下面我们就来分析下客户端连接接入的流程。

拉斯提弗列罗马之家酒店

1、拉斯提弗列罗马之家酒店位于罗马。本文到此分享完毕,希望对大家有所帮助。

飞天诚信3月28日快速反弹

以下是飞天诚信在北京时间3月28日11:16分盘口异动快照:3月28日,飞天诚信盘中快速反弹,5分钟内涨幅超过2%,截至11点16分,报12 4元,成交772

简讯:深圳福田区的专科植发医院种植发际线一般多少钱啊?

很多人认为植发比较好的医院都在福田区,虽然说的太过夸张,但是福田区的植发实力已经影响力,大家应该也能略知一二了吧!目前深圳福田区比好

当前速讯:海南购房必看——《南海佳园》养老优势不足介绍分析

海南购房必看,《南海佳园》养老优势不足介绍分析,海口买房攻略2023,南海佳园楼盘详情,南海佳园楼盘地址,2023南海佳园全面解读,海南陈霞

观热点:【机构调研记录】大成基金调研百亚股份、伟星股份等3只个股(附名单)

个股亮点:国内一次性卫生用品行业的知名综合性企业;公司“好之”品牌为婴儿纸尿裤品牌,定位于中高端产品。

热塑性|全球短讯

热塑性和热固性是针对塑料制品而言 塑料制品大致可以分为两大类,一类叫做热塑性塑料:即加热后,塑料制品会随温度升高逐渐软化

当前通讯!临安市安全教育平台作业_临安市安全教育平台

1、您好,您的问题似乎表意不明,完善一下您的问题吧。2、  如果回答有不完善的地方,或不明白的地方,可以追问  希望采纳

【速看料】三元环境拟向银行申请500万贷款 由关联方滕德海夫妇、辜筱菊夫妇提供信用担保

挖贝网3月27日,三元环境(833347)近日发布公告,四川三元环境治理股份有限公司(以下简称“公司”)取得中信银行股份有限公司成都分行提...

河南内黄:漫天花海竞芳菲 乡村旅游热起来|热闻

随着天气回暖,河南省内黄县东庄镇千亩梨花、豆公镇万亩桃花迎春绽放,吸引了众多游人和文艺爱好者前来踏青赏花,拍照打卡。记者:杨静摄制:

全球百事通!在网上做什么可以赚钱呢_在网上做什么可以赚钱

1、淘宝开店的最具体步骤:一、开店之前:A.注册会员名:名字很重要,有特色的名字更能让别人注意你,记住你;怎样才算是

万向钱潮股东户数减少444户,户均持股16.52万元

万向钱潮最新股东户数8 95万户,高于行业平均水平。公司户均持有流通股份3 08万股;户均流通市值16 52万元。

天天观焦点:天津灵活就业人员社保2023年缴费标准 最新是多少钱一个月?

天津灵活就业人员社保缴费基数2023最新是多少钱一个月?下面跟社保网小编一起来看看具体详情吧!天津灵活就业人员社保缴费基数目前是多少?答

创新药研发“山高路险”,药企如何探路降低市场风险?

创新药研发“山高路险”,药企如何探路降低市场风险?,医药,药品,科学,药企,创新药,阿斯利康,新药研发,山高路险

手汗症怎么治疗最好_手汗症怎么治疗-视点

1、您好!  治疗方法包括收敛剂、止汗剂、镇静剂、抗胆碱能药物、催眠疗法、心理疗法、电子透入疗法、针灸等,除抗胆碱能药物

化解房企风险要对症下药 头条焦点

化解房企风险要对症下药-中央经济工作会议指出,有效防范化解重大经济金融风险。今年《政府工作报告》提出,有效防范化解优质头部房企风险,改

为做房企优等生,中交地产get了这些技能!-世界热讯

为做房企优等生,中交地产get了这些技能!,央企,绿城,置业,优等生,中交地产,中交集团,保利地产

毛姆:抱歉,侦探小说才是我想要的 世界速看料

毛姆:抱歉,侦探小说才是我想要的

银河证券:金价进入上涨周期将使黄金股形成戴维斯双击

银河证券3月27日研报指出,3月美国银行业危机或将扭转美联储的货币政策,美联储加息周期结束在即,并可能在下半年开启新一轮

每日热闻!会计从月初到月末的整个流程_某同学家的电子式电能表上个月月初月末

1、解:(1)本月消耗的电能:W=12614 6kw•h-12519 6kw•h=95kw•h.(2)∵“220V”表示

thaad

1、末段高空区域防御系统,英语:TerminalHighAltitudeAreaDefense,缩写:THAAD

天天快资讯:长兴茶园春意正浓 又见农户采茶忙

在浙江省湖州市的产茶大县长兴,为了抢时间,经验丰富的采茶工们每天要站9小时,弯腰上万次采摘春茶。天刚蒙蒙亮,在陈双双的指挥下,100多名

英汉传媒话语修辞对比研究_关于英汉传媒话语修辞对比研究的简介

1、《英汉传媒话语修辞对比研究》是2007年10月郑州大学出版社出版的图书,作者是胡曙中。本文关于英汉传媒话语修辞对

2023数博会|贵阳市大数据公司赴京开展数字经济招商和数博会招展

近日,贵阳市大数据产业有限公司、2023数博会展览组赴京开展数字经济招商和数博会招展工作。工作组先后与中关村产业技术联盟

大兴机场:国际航线新增伦敦大阪等通航点

从3月26日起,大兴机场正式启动2023年夏航季航班计划,本航季将持续至10月28日结束,共计217天。换季后,新一批国际航线会根据国家政策及市场

看到uzi退役时提到IG,笑笑孙亚龙忍不住了:其实那时候我们很强

前言:看到uzi退役时提到IG,笑笑孙亚龙忍不住了,提醒粉丝注意:uzi最喜爱的那场比赛里,其实我们很强。uzi最满意的比赛在uzi退役之后,许多

二维码支付是什么意思_视点

二维码支付是一种基于账户体系搭起来的新一代无线支付方案。在该支付方案下,商家可把账号、商品价格等交易信息汇编成一个二维码,并印刷在各

我国首座深远海浮式风电平台将离开广东珠海前往海南文昌海域 天天时快讯

今天(26日)上午,我国第一座深远海浮式风电平台——“海油观澜号”将离开广东珠海福陆码头,前往海南的文昌海域,进行海上安装和调试。“海...

金粒宴是真是假_金粒宴

1、日本首创的一种变态食物。2、也是继女体盛后日本人民的强烈少女崇拜情结的又一大产物。3、传说中的金粒餐据说是日本上

回复恶意差评经典回复_恶意差评经典回复 滚动

1、01知道对方已经给自己差评,不要回避,请根据实际情况先给对方回评。2、02双方都评价后,买家的差评将出现在自己的信誉

猜您喜欢

Copyright ©  2015-2022 中公服装网版权所有  备案号:沪ICP备2022005074号-18   联系邮箱:5855973@qq.com