/** * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get * processed for later selection during the event loop. *EventLoopGroup 首先是一个接口,继承了EventExecutorGroup ,主要的功能是在时间循环对Channel的注册, */ public interface EventLoopGroup extends EventExecutorGroup { /** * Return the next {@link EventLoop} to use * 一个EventLoopGroup 有多个EventLoop ,地方法得到下一个EventLoop */ @Override EventLoop next();
/** * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture} * will get notified once the registration was complete. * 将参数channel 注册到EventLoop当中,然后注册完毕之后会异步的ChannelFuture 返回到ChannelFuture 当中。 */ ChannelFuture register(Channel channel);
/** * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed * {@link ChannelFuture} will get notified once the registration was complete and also will get returned. * 也是讲channel注册到EventLoop当中,当时我们发现 参数是ChannelPromise 类型的,不是Channel 类型的,那只有一种可能, * ChannelPromise 里边包含Channel 的引用,后续会展开讲解。 */ ChannelFuture register(ChannelPromise promise);
/** * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture} * will get notified once the registration was complete and also will get returned. * * @deprecated Use {@link #register(ChannelPromise)} instead. * 废弃的注册,在 ChannelFuture register(ChannelPromise promise);方法当中ChannelPromise 已经包含了Channel 的引用,那么这个 * 方法把Channel 也作为参数,是一种功能上的重复,因此被Deprecated,不推荐使用。 */ @Deprecated ChannelFuture register(Channel channel, ChannelPromise promise); }
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override Channel channel(); ...略 ... }
找一个ChannelPromise 的实现类:
1 2 3 4 5 6 7
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;内部的Channel引用。 private long checkpoint; ... ...略 }
接着EventLoopGroup 继承接口EventExecutorGroup:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use * via its {@link #next()} method. Besides this, it is also responsible for handling their * life-cycle and allows shutting them down in a global fashion. * */ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> { ... ...中间方法略 ... /** * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}. * EventExecutorGroup内部管理了EventExecutor 。 */ EventExecutor next();
透过EventLoopGroup 和EventExecutorGroup我们知道他们都有自己的EventLoop和EventExecutor 回到:EventLoopGroup bossGroup = new NioEventLoopGroup();这行代码,NioEventLoopGroup()可以传递参数比如new NioEventLoopGroup(1);代表有一个线程接受连接。 进入NioEventLoopGroup(Int nThreads)的构造器:
1 2 3
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null);//第二个参数传null }
public class Test { public static void main(String[] args) { int result = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); System.out.println(result); } }
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads]; //循环对MultithreadEventExecutorGroup的数组EventExecutor初始化,根据指定的线程数量。 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }