默认

Netty 4.x学习(三):线程模型详解

查看数: 76136 | 评论数: 9 | 收藏 6
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2016-02-25 13:43

正文摘要:

1、前言 前面两篇文章里已经说完了《Netty 4.x学习(一):ByteBuf详解》和《Netty 4.x学习(二):Channel和Pipeline详解》,这篇开始讲讲前面欠的债——线程模型(EventLoop和EventExecutor)。 2、Netty的线程 ...

评论

fcs_our2010 发表于 3 年前
学习了,赞一个
xiaobin 发表于 5 年前
写的真不错  虽然还没懂main sub 模式 感觉快到头了 (*^__^*)
l2937878 发表于 7 年前
线程模型有接触过,但是还没有头绪,希望通过这篇文章能有所启发。
不要·不要 发表于 8 年前
现在正需要了解这一块,可惜看不太懂,哎
冈阪日川 发表于 8 年前
Netty和Mina这样的框架,能实现服务端大负载,核心就在线程模型,值得去搞清楚它的原理。
什么狗屁云 发表于 8 年前
是啊,最近在搞Netty,看到这文章不错,就转过来了,顺便给老大的论坛添砖加瓦啊
JackJiang 发表于 8 年前
这篇文章原作是支付宝的开发人员,研究的挺深入,文章真长,用不着的时候实在看不下去了。。。
什么狗屁云 发表于 8 年前
本帖最后由 什么狗屁云 于 2016-02-25 15:59 编辑


原文链接在此:http://hongweiyi.com/2014/01/netty-4-x-thread-model/,感谢原作者。
写的非常详尽,借花献佛,希望对需要的人有用。
什么狗屁云 发表于 8 年前
本帖最后由 什么狗屁云 于 2016-02-25 15:57 编辑


接正文:

4、NioEventLoopGroup实现


这里以常用的NioEventLoopGroup为例。NioEventLoopGroup在Bootstrap初始化时作为参数传入构造方法,由于NioEventLoopGroup涉及的代码较多,就不大篇幅的贴代码了,只写流程性的文字或相应类和方法:
4.1 mainReactor:
1. Bootstrap.bind(port)
2. Bootstrap.initAndRegister()
2.1. Boostrap.init()

初始化Channel,配置Channel参数,以及Pipeline。其中初始化Pipeline中,需要插入ServerBootstrapAcceptor对象用作acceptor接收客户端连接请求,acceptor也是一种ChannelInboundHandlerAdapter。

p.addLast(new ChannelInitializer<Channel>() {
  @Override
  public void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
       currentChildAttrs));
  }
});

调用channel的unsafe对象注册selector,具体实现类为AbstractChannel$AbstractUnsafe.register。如下:

public final void register(final ChannelPromise promise) {
  if (eventLoop.inEventLoop()) {  // 是否在Channel的loop中
    register0(promise);
  } else {  // 不在
    try {
      eventLoop.execute(new Runnable() {  // EventLoop执行一个任务
        @Override
        public void run() {
          register0(promise);
        }
      });
    } catch (Throwable t) {
    // ...
    }
  }
}

eventLoop.execute(runnable);是比较重要的一个方法。在没有启动真正线程时,它会启动线程并将待执行任务放入执行队列里面。启动真正线程(startThread())会判断是否该线程已经启动,如果已经启动则会直接跳过,达到线程复用的目的。启动的线程,主要调用方法是NioEventLoop的run()方法,run()方法在下面有详细介绍:

public void execute(Runnable task) {
  if (task == null) {
    throw new NullPointerException("task");
  }

  boolean inEventLoop = inEventLoop();
  if (inEventLoop) {
    addTask(task);
  } else {
    startThread();  // 启动线程
    addTask(task);  // 添加任务队列

    // ...

  }

  if (!addTaskWakesUp) {
    wakeup(inEventLoop);
  }
}

4 group().register(channel)

将 channel 注册到下一个 EventLoop 中。


2.2. 接收连接请求

      由NioEventLoop.run()接收到请求:

3.1 AbstractNioMessageChannel$NioMessageUnsafe.read()

3.2 NioServerSocketChannel.doReadMessages()

获得childEventLoopGroup中的EventLoop,并依据该loop创建新的SocketChannel对象。


3.3 pipeline.fireChannelRead(readBuf.get(i));

readBuf.get(i)就是3.2中创建的SocketChannel对象。在2.2初始化Bootstrap的时候,已经将acceptor处理器插入pipeline中,所以理所当然,这个SocketChannel对象由acceptor处理器处理。


3.4 ServerBootstrapAcceptor$ServerBootstrapAcceptor.channelRead();

该方法流程与2.2、2.3类似,初始化子channel,并注册到相应的selector。注册的时候,也会调用eventLoop.execute用以执行注册任务,execute时,启动子线程。即启动了subReactor。


4.2 subReactor:
subReactor的流程较为简单,主体完全依赖于loop,用以执行read、write还有自定义的NioTask操作,就不深入了,直接跳过解释loop过程。

loop:

loop是我自己提出来的组件,仅是代表subReactor的主要运行逻辑。例子可以参考NioEventLoop.run()。

loop会不断循环一个过程:select -> processSelectedKeys(IO操作) -> runAllTasks(非IO操作),如下代码:

protected void run() {
  for (;;) {
    // ...
    try {
      if (hasTasks()) { // 如果队列中仍有任务
        selectNow();
      } else {
        select();
        // ...
      }

      // ...

      final long ioStartTime = System.nanoTime();  // 用以控制IO任务与非IO任务的运行时间比
      needsToSelectAgain = false;
      // IO任务
      if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
      } else {
        processSelectedKeysPlain(selector.selectedKeys());
      }
      final long ioTime = System.nanoTime() - ioStartTime;

      final int ioRatio = this.ioRatio;
      // 非IO任务
      runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

      if (isShuttingDown()) {
        closeAll();
        if (confirmShutdown()) {
          break;
        }
      }
    } catch (Throwable t) {
    // ...
    }
  }
}

就目前而言,基本上IO任务都会走processSelectedKeysOptimized方法,该方法即代表使用了优化的SelectedKeys。除非采用了比较特殊的JDK实现,基本都会走该方法。


1. selectedKeys在openSelector()方法中初始化,Netty通过反射修改了Selector的selectedKeys成员和publicSelectedKeys成员。替换成了自己的实现——SelectedSelectionKeySet。
2. 从OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均采用了HashSet实现。HashSet采用HashMap实现,插入需要计算Hash并解决Hash冲突并挂链,而SelectedSelectionKeySet实现使用了双数组,每次插入尾部,扩展策略为double,调用flip()则返回当前数组并切换到另外一个数据。
3. ByteBuf中去掉了flip,在这里是否也可以呢?


processSelectedKeysOptimized主要流程如下:

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
  processSelectedKey(k, (AbstractNioChannel) a);
} else {
  @SuppressWarnings("unchecked")
  NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  processSelectedKey(k, task);
}

在获得attachment后,判断是Channel呢还是其他,其他则是NioTask。找遍代码并没有发现Netty有注册NioTask的行为,同时也没发现NioTask的实现类。只有在NioEventLoop.register方法中有注册NioTask至selector的行为,便判断该行为是由用户调用,可以针对某个Channel注册自己的NioTask。这里就只讲第一个processSelectdKey(k, (AbstractNioChannel) a),但代码就不贴了。


和常规的NIO代码类似,processSelectdKey是判断SeletedKeys的readyOps,并做出相应的操作。操作均是unsafe做的。如read可以参考:AbstractNioByteChannel$NioByteUnsafe.read()。IO操作的流程大致都是:

  • 获得数据
  • 调用pipeline的方法,fireChannel***
  • 插入任务队列

执行完所有IO操作后,开始执行非IO任务(runAllTasks)。Netty会控制IO和非IO任务的比例,ioTime * (100 - ioRatio) / ioRatio,默认ioRatio为50。runAllTasks乃是父类SingleThreadExecutor的方法。方法主体很简单,将任务从TaskQueue拎出来,直接调用任务的run方法即可。

代码调用的是task.run(),而不是task.start()。即是单线程执行所有任务


protected boolean runAllTasks(long timeoutNanos) {
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}

// 控制时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

this.lastExecutionTime = lastExecutionTime;
return true;
}

5、总结


以上内容从设计和代码层面总结Netty线程模型的大致内容,中间有很多不成熟的思考与理解,请轻拍与指正。


看源码过程中是比较折磨人的。首先得了解你学习东西的业务价值是哪里?即你学了这个之后能用在哪里,只是不考虑场景仅仅为了看代码而看代码比较难以深入理解其内涵;其次,看代码一定一定得从逻辑、结构层面看,从细节层面看只会越陷越深,有种一叶障目不见泰山的感觉;最后,最好是能够将代码逻辑、结构画出来,或者整理出思维导图啥的,可以用以理清思路。

返回顶部