200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 【Netty】原理分析:ChannelHandlerContext

【Netty】原理分析:ChannelHandlerContext

时间:2018-12-16 22:39:03

相关推荐

【Netty】原理分析:ChannelHandlerContext

上面一篇文章介绍了 ChannelPipeline,它维护了一个有序的 ChannelHandler 列表。当 ChannelHandler 加入到 ChannelPipeline 的时候,会创建一个对应的 ChannelHandlerContext 并绑定,ChannelPipeline 实际维护的是和 ChannelHandlerContext 的关系,例如在 DefaultChannelPipeline:

// public class DefaultChannelPipeline implements ChannelPipeline {//...final AbstractChannelHandlerContext head; // 头结点final AbstractChannelHandlerContext tail; // 尾结点}

DefaultChannelPipeline 会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用。而 AbstractChannelHandlerContext 中维护了 next 和 prev 指针:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {//...volatile AbstractChannelHandlerContext next; // 前驱节点volatile AbstractChannelHandlerContext prev; // 后置节点}

这样 ChannelHandlerContext 之间形成了双向链表。

1.ChannelHandlerContext 方法

我们来看一下 ChannelHandlerContext 的继承关系:

它继承了 AttributeMap 用于存储信息,实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker 可进行事件的传播。

PS:ChannelPipeline 和 ChannelHandlerContext 都同时继承了 ChannelInboundInvoker 和 ChannelOutboundInvoker接口

1.AttributeMap 接口的方法

public interface AttributeMap {<T> Attribute<T> attr(AttributeKey<T> key);<T> boolean hasAttr(AttributeKey<T> key);}

2.ChannelInboundInvoker 接口的方法

public interface ChannelInboundInvoker {// 触发对下一个 ChannellnboundHandler 上的 channelRegistered() 方法的调用ChannelInboundInvoker fireChannelRegistered();// 触发对下一个 ChannellnboundHandler 上的 channelUnregistered() 方法的调用ChannelInboundInvoker fireChannelUnregistered();// 触发对下一个 ChannellnboundHandler 上的 channelActive() 方法(已连接)的调用ChannelInboundInvoker fireChannelActive();// 触发对下一个 ChannellnboundHandler 上的 channellnactive() 方法(已关闭)的调用ChannelInboundInvoker fireChannelInactive();// 触发对下一个 ChannellnboundHandler 上的 exceptionCaught (Throwable)方法的调用ChannelInboundInvoker fireExceptionCaught(Throwable cause);// 触发对下一个 ChannellnboundHandler 上的 userEventTriggered (Object evt)方法的调用ChannelInboundInvoker fireUserEventTriggered(Object event);// 触发对下一个 ChannellnboundHandler 上的 channelRead() 方法(已接收的消息)的调用ChannelInboundInvoker fireChannelRead(Object msg);// 触发对下一个 ChannellnboundHandler 上的 channelReadComplete() 方法的调用ChannelInboundInvoker fireChannelReadComplete();// 触发对下一个 ChannellnboundHandler 上的 channelWritabilityChanged() 方法的调用ChannelInboundInvoker fireChannelWritabilityChanged();}

3.ChannelOutboundInvoker 接口的方法

public interface ChannelOutboundInvoker {// 绑定到给定的 SocketAddress,并返回 ChannelFutureChannelFuture bind(SocketAddress localAddress);// 连接给定的 SocketAddress,并返回 ChannelFutureChannelFuture connect(SocketAddress remoteAddress);// 从之前分配的 EventExecutor 注销,并返回 ChannelFutureChannelFuture deregister();// 从远程节点断开,并返回 ChannelFutureChannelFuture disconnect(ChannelPromise promise);// 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件// 并在最后一个消息被读取完成后,通知 ChannelInboundHandler 的 channelReadComplete 方法ChannelOutboundInvoker read();// 将写到一个临时队列中ChannelFuture write(Object msg);// 将临时队列中的消息写到 Socket 缓冲区ChannelOutboundInvoker flush();// write + flush = 直接写到 Socket 缓冲区ChannelFuture writeAndFlush(Object msg);//...还有一些不常用的方法就不列出来了}

PS:关于 write() 和 flush() 的源码分析可以参考这篇文章…

4.ChannelHandlerContext 自己的方法

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {// 获取绑定这个实例的 ChannelChannel channel();// 获取调度事件的 EventExecutorEventExecutor executor();// 获取这个实例的唯一名称String name();// 获取绑定这个实例的 ChannelHandlerChannelHandler handler();// 如果所关联的 ChannelHandler 已被从 ChannelPipeline 中移除,则返回 trueboolean isRemoved();// 获取这个实例所关联的 ChannelPipelineChannelPipeline pipeline();// 获取和这这个实例相关联的 Channel 所配置的 ByteBufAllocatorByteBufAllocator alloc();}

注:在我们平时编写 ChannelHandler 要将数据写到 Socket 中时,有两种方案:

ctx.channel().writeAndFlush,将从 Pipeline 的尾部开始往前找 OutboundHandlerctx.writeAndFlush会从当前 handler 往前找 OutboundHandler

2.ChannelHandlerContext 子类

来看 ChannelHandlerContext 继承类图:

1.AbstractChannelHandlerContext

是 ChannelHandlerContext 的一个抽象实现

定义了链表的关键 – next、prev 指针

定义了很多 ChannelHandlerContext 节点状态

实现了上面列出的所有方法,包括父接口 AttributeMap、ChannelInboundInvoker 、ChannelOutboundInvoker 及 ChannelHandlerContext 自己的方法(见下图)

注:由于 ChannelContextHandler 将 AttributeMap 和 ChannelInboundInvoker 接口的方法 @Override 了,所以我们在上图看到 attr() 和 fireXX() 都显示的是实现自 ChannelContextHandler 。

2.DefaultChannelHandlerContext

是 ChannelHandlerContext 的默认实现类,不过主要功能都在AbstractChannelHandlerContext 中已经实现好了,DefaultChannelHandlerContext 非常简单

3.HeadContext

是 ChannelPipeline 中的头节点,是一个比较特殊的节点,

它是 ChannelHandlerContext 的实现类,因此是一个 ChannelPipeline 内部链表节点它实现了入站出站接口 ChannelOutboundHandler 和 ChannelInboundHandler,因此是一个双向处理器。

HeadContext 里面有很多方法。通过内部持有的 unsafe 对象来做具体的读、写、连接、绑定端口等IO事件,功能上看 HeadContext 会将事件传播到下一个入站处理器。

4.TailContext

是 ChannelPipeline 中的尾节点,也是一个比较特殊的节点

它是 ChannelHandlerContext 的实现类,因此是一个 ChannelPipeline 内部链表节点它实现了 ChannelInboundHandler,因此是一个入站事件处理器,可处理入站事件

不过 TailContext 继承自ChannelInboundHandler 的很多入站方法都是空方法。TailContext 大部分情况下是什么都不做,有几个方法会将未处理的异常打印 Warn 日志。

3.事件传播方法 – fireXX() 源码分析

事件传播方法继承自 ChannelInboundInvoker 接口,在 AbstractChannelHandlerContext 抽象类中被实现。我们下面来分析一下 fireChannelRead() 方法的源码:

@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;}

/*** 1.向后查找,如果 ChannelHandlerContext 为 inbound 的类型 就返回*/private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while ((ctx.executionMask & mask) == 0);return ctx;}/*** 2.触发 ChannelPipeline 中后面一个 ChannelInboundHandler 的 channelRead 方法被调用*/static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 获取 ChannelHandlerContext 的 EventExecutorEventExecutor executor = next.executor();// 如果EventExecutor线程在EventLoop线程中,就直接调用if (executor.inEventLoop()) {// 核心!!next.invokeChannelRead(m);// 反之则递交给executor执行} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}}/*** 3.调用下一个入站处理器的 channelRead 方法,将消息传递过去*/private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {// 获取handler对象(ChannelInboundHandler)并且执行该对象的 channelRead 方法......((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {// 通知 Inbound 事件的传播,发生异常notifyHandlerException(t);}} else {fireChannelRead(msg);}}

实际上其他的 fireXX() 方法也是类似的,基本思想就是找到下一个 ChannelHandlerContext 节点然后调用其内部的ChannelHandler 对象对应的方法,其他类似方法就不一一分析了。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。