0%

我们回到ServerBootstrap的init方法,之前介绍过Attribute的设置,那么Attribute的具体设置是怎样的呢:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    void init(Channel channel) throws Exception {
<!-- more -->
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());//channel的实例是NioServerSocketChannel
}
}

我们在NioServerSocketChannel里边并没有找到attr方法,那么看一下NioServerSocketChannel层次关系:
这里写图片描述
在DefaultAttributeMap找到:

1
2
3
4
5
6
7
public class DefaultAttributeMap implements AttributeMap {
private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes;
public <T> Attribute<T> attr(AttributeKey<T> key) {
....略
}
....略
}

attr(AttributeKey key)方法在DefaultAttributeMap 里边。
在NioServerSocketChannel还有一个 private final ServerSocketChannelConfig config;这样的一个生命,ServerSocketChannelConfig 用来设置ServerSocketChannel的一些配置信息的,可以看到ServerSocketChannelConfig 和Attribute共同完成了channel的配置。
类似:

1
2
3
4
class Channel{
private final ServerSocketChannelConfig config;
private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes;
}

Channel与ChannelHandlerContext作用域分析
首先我们要知道netty4.0和netty4.1是不同的,4.1对ChannelHandlerContext.attr(..) 和 Channel.attr(..)的改进:
http://netty.io/wiki/new-and-noteworthy-in-4.1.html

ChannelHandlerContext.attr(..) == Channel.attr(..)
Both Channel and ChannelHandlerContext implement the interface AttributeMap to enable a user to attach one or more user-defined attributes to them. What sometimes made a user confused was that a Channel and a ChannelHandlerContext had its own storage for the user-defined attributes. For example, even if you put an attribute ‘KEY_X’ via Channel.attr(KEY_X).set(valueX), you will never find it via ChannelHandlerContext.attr(KEY_X).get() and vice versa. This behavior is not only confusing but also is waste of memory.
Channel 和ChannelHandlerContext都 实现了AttributeMap 用来设置用户自定义的属性。有时候Channel 和ChannelHandlerContext 都有自己的一套用户定义的属性(相互之间没有任何关系,即Channel 有自己的map,ChannelHandlerContext 也有自己的map)让用户感到非常困惑,比如我们使用 Channel.attr(KEY_X).set(valueX)设置一个key和value,但是没法通过ChannelHandlerContext.attr(KEY_X).get()方式获得,而且这样还浪费内存。
To address this issue, we decided to keep only one map per Channel internally. AttributeMap always uses AttributeKey as its key. AttributeKey ensures uniqueness between each key, and thus there’s no point of having more than one attribute map per Channel. As long as a user defines its own AttributeKey as a private static final field of his or her ChannelHandler, there will be no risk of duplicate keys.
为了解决这个问题,我们决定在每个Channel 内部只维护一个map,AttributeMap 永远使用AttributeKey 作为他的key,AttributeKey 保证在所有key之中是唯一的,这样就没有必要每个Channel定义多个属性,这样每个用户在ChannelHandler里边定义私有的静态属性的key(AttributeKey )就没有重复键的问题。

我们看一下实际的代码是怎么写的:
Channel的实现类NioServerSocketChannel的父类DefaultAttributeMap的attr方法:
这里写图片描述

ChannelHandlerContext 实现类AbstractChannelHandlerContext的attr方法:

1
2
3
4
5
6
7
8
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...略
public <T> Attribute<T> attr(AttributeKey<T> key) {
return channel().attr(key);//直接使用的是channel的attr属性
}
...略
}

可以看到ChannelHandlerContext 和Channel用的都是Channel的attr,即:ChannelHandlerContext.attr(..) == Channel.attr(..)
到此为止netty的2大模块(初始化和注册)的初始流程解析完毕。接下来说一下注册流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);//初始化
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);//注册
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

还是之前的init代码入口,上一节我们介绍了ChannelOption和AttributeKey,本次我们说下Channel与ChannelHandler及ChannelHandlerContext之间的关系分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<!-- more -->
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

先从ChannelInitializer,我们看下它的doc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

/**
* A special {@link ChannelInboundHandler} which offers an easy way to initialize a {@link Channel} once it was
* registered to its {@link EventLoop}.
* 一个特殊的ChannelInboundHandler,提供了简单的方式来初始化Channel,通过注册到EventLoop里边来实现的。
* Implementations are most often used in the context of {@link Bootstrap#handler(ChannelHandler)} ,
* {@link ServerBootstrap#handler(ChannelHandler)} and {@link ServerBootstrap#childHandler(ChannelHandler)} to
* setup the {@link ChannelPipeline} of a {@link Channel}.
* 具体实现经常使用在Bootstrap#handler(ChannelHandler)、ServerBootstrap#handler(ChannelHandler)、
* ServerBootstrap#childHandler(ChannelHandler)等来初始化Channel的ChannelPipeline
* <pre>
* 使用举例:
* public class MyChannelInitializer extends {@link ChannelInitializer} {
* public void initChannel({@link Channel} channel) {
* channel.pipeline().addLast("myHandler", new MyHandler());
* }
* }
*
* {@link ServerBootstrap} bootstrap = ...;
* ...
* bootstrap.childHandler(new MyChannelInitializer());
* ...
* </pre>
* Be aware that this class is marked as {@link Sharable} and so the implementation must be safe to be re-used.
*注意这个类是标记为Sharable的,所以在实现的时候必须是线程安全的。
* @param <C> A sub-type of {@link Channel}
*/
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

}

程序调用p.addLast(new ChannelInitializer() ……这样的方式把ChannelInitializer放到ChannelPipeline当中,那么ChannelPipeline.addLast()的逻辑是怎样的呢?

1
2
3
4
5
6
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
* 在pipeline的最后一个位置插入一个ChannelHandler
* @param handlers the handlers to insert last
*/
ChannelPipeline addLast(ChannelHandler... handlers);

查看ChannelPipeline 的实现类DefaultChannelPipeline:

1
2
3
4
5
6
7
public class DefaultChannelPipeline implements ChannelPipeline {
...略
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
...略

调用了addLast(null, handlers),第一个参数为null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
即executor为null
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//循环遍历,调用addLast(executor, null, h)方法,第二个参数也为null
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}

return this;
}

进入最终的方法,其中group为null,name为null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;//首先声明一个AbstractChannelHandlerContext
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

上文提到AbstractChannelHandlerContext ,那么看下AbstractChannelHandlerContext 的doc是怎么一个东西:

1
2
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

进入他的接口ChannelHandlerContext:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

/**
* Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline}
* and other handlers. Among other things a handler can notify the next {@link ChannelHandler} in the
* {@link ChannelPipeline} as well as modify the {@link ChannelPipeline} it belongs to dynamically.
* 使ChannelHandler和它的ChannelPipeline以及其他的处理器之间进行交互,可以通知ChannelPipeline里的下一个ChannelHandler,
* 以及动态的修改它属的ChannelPipeline
* <h3>Notify</h3>
* 通知
* You can notify the closest handler in the same {@link ChannelPipeline} by calling one of the various methods
* provided here.
*你可以通过调用各种方法来通知ChannelPipeline里边最近的一个handler
* Please refer to {@link ChannelPipeline} to understand how an event flows.
* 请参考ChannelPipeline来理解事件的过程。
* <h3>Modifying a pipeline</h3>
* 修改一个pipeline
* You can get the {@link ChannelPipeline} your handler belongs to by calling
* {@link #pipeline()}. A non-trivial application could insert, remove, or
* replace handlers in the pipeline dynamically at runtime.
*你可以调用所属处理器的pipeline()方法得到ChannelPipeline,一个应用可以在pipeline 里边动态的插入,删除或者替换处理器。
* <h3>Retrieving for later use</h3>
* 获取为了以后使用
* You can keep the {@link ChannelHandlerContext} for later use, such as
* triggering an event outside the handler methods, even from a different thread.
* 你可以持有ChannelHandlerContext为了后续使用,比如在handler 方法之外触发一个事件,甚至是不同的线程。
* <pre>
* public class MyHandler extends {@link ChannelDuplexHandler} {
*
* <b>private {@link ChannelHandlerContext} ctx;</b>
*
* public void beforeAdd({@link ChannelHandlerContext} ctx) {
* <b>this.ctx = ctx;</b>//提前获得ChannelHandlerContext
* }
*
* public void login(String username, password) {
* ctx.write(new LoginMessage(username, password));//之后的业务逻辑再去使用
* }
* ...
* }
* </pre>
*
* <h3>Storing stateful information</h3>
* 存储状态信息
* {@link #attr(AttributeKey)} allow you to
* store and access stateful information that is related with a handler and its
* context. Please refer to {@link ChannelHandler} to learn various recommended
* ways to manage stateful information.
* AttributeKey允许你存储和它有关联的handler 以及它的上下文的状态信息,可以参考ChannelHandler学习不同的方式来管理状态信息
* <h3>A handler can have more than one context</h3>
* 一个handler 可以有多个上下文
* Please note that a {@link ChannelHandler} instance can be added to more than
* one {@link ChannelPipeline}. It means a single {@link ChannelHandler}
* instance can have more than one {@link ChannelHandlerContext} and therefore
* the single instance can be invoked with different
* {@link ChannelHandlerContext}s if it is added to one or more
* {@link ChannelPipeline}s more than once.
* 注意,一个ChannelHandler可以被添加多次在一个ChannelPipeline里边,意味着一个单独的ChannelHandler实例可以有多个
* ChannelHandlerContext以及因此一个单独的实例可以被多个ChannelHandlerContext多次调用,如果ChannelHandler实例被添加了多次。
* <p>
* For example, the following handler will have as many independent {@link AttributeKey}s
* as how many times it is added to pipelines, regardless if it is added to the
* same pipeline multiple times or added to different pipelines multiple times:
* <pre>
* public class FactorialHandler extends {@link ChannelInboundHandlerAdapter} {
*
* private final {@link AttributeKey}&lt;{@link Integer}&gt; counter = {@link AttributeKey}.valueOf("counter");
*
* // This handler will receive a sequence of increasing integers starting
* // from 1.
* {@code @Override}
* public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
* Integer a = ctx.attr(counter).get();
*
* if (a == null) {
* a = 1;
* }
*
* attr.set(a * (Integer) msg);
* }
* }
*
* // Different context objects are given to "f1", "f2", "f3", and "f4" even if
* // they refer to the same handler instance. Because the FactorialHandler
* // stores its state in a context object (using an {@link AttributeKey}), the factorial is
* // calculated correctly 4 times once the two pipelines (p1 and p2) are active.
* 给出"f1", "f2", "f3", and "f4"不同的上下文对象,但是他们来自同一个实例,因为FactorialHandler存储了他们的状态在上下文对象里边
* (使用AttributeKey),当处于活动状态的factorial ,factorial 被计算了四次在2个pipelines (p1 和 p2)中。
* FactorialHandler fh = new FactorialHandler();
*
* {@link ChannelPipeline} p1 = {@link Channels}.pipeline();
* p1.addLast("f1", fh);
* p1.addLast("f2", fh);
*
* {@link ChannelPipeline} p2 = {@link Channels}.pipeline();
* p2.addLast("f3", fh);
* p2.addLast("f4", fh);
* </pre>
*
* <h3>Additional resources worth reading</h3>
* <p>
* Please refer to the {@link ChannelHandler}, and
* {@link ChannelPipeline} to find out more about inbound and outbound operations,
* what fundamental differences they have, how they flow in a pipeline, and how to handle
* the operation in your application.
* 请参考ChannelHandler和ChannelPipeline来找出更多的关于出栈和入栈的操作、他们之间最基本的不同、怎样在pipeline流动,怎么使用在应用当中
*/
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

前边讲了EventLoopGroup的一些知识,在netty的架构这块我们使用一种bossGroup加workerGroup的方式,bossGroup只负责请求的转发,workerGroup是具体的数据处理,其实netty整个框架使用的是Reactor(响应器)的设计模式。这方面知名的大佬就是Doug Lea,Java.util.current包的很多线程的API和工具都出自大佬之手。
这里写图片描述

Read more »