200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ的底层通信模块remoting 源码解析

RocketMQ的底层通信模块remoting 源码解析

时间:2022-11-19 03:17:42

相关推荐

RocketMQ的底层通信模块remoting 源码解析

一、前言

remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。

二、Remoting 通信模块结构

remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:

可见通信的类继承自类RemotingService,RemotingService的定义如下:

public interface RemotingService {// 服务启动void start();// 服务停止void shutdown();//注册RPC钩子函数void registerRPCHook(RPCHook rpcHook);}

RemotingServer:继承自RemotingService,定义了服务端的接口

public interface RemotingServer extends RemotingService {/*** 注册处理器* @param requestCode 请求码* @param processor处理器* @param executor线程池* 这三者是绑定关系:* 根据请求的code 找到处理对应请求的处理器与线程池 并完成业务处理。*/void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);/*** 注册缺省处理器* @param processor 缺省处理器* @param executor 线程池*/void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);// 获取服务端口int localListenPort();/*** 根据 请求码 获取 处理器和线程池* @param requestCode 请求码* @return*/Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);/*** 同步调用* @param channel 通信通道* @param request 业务请求对象* @param timeoutMillis 超时时间* @return 响应结果封装*/RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;/*** 异步调用* @param channel 通信通道* @param request 业务请求对象* @param timeoutMillis 超时时间* @param invokeCallback 响应结果回调对象*/void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;/*** 单向调用 (不关注返回结果)* @param channel 通信通道* @param request 业务请求对象* @param timeoutMillis 超时时间*/void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException;}

从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。

NettyRemotingServer:服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);// Netty服务端启动器private final ServerBootstrap serverBootstrap;// worker组private final EventLoopGroup eventLoopGroupSelector;// boss组 private final EventLoopGroup eventLoopGroupBoss;// Netty服务端配置信息类private final NettyServerConfig nettyServerConfig;// 公共线程池 (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)private final ExecutorService publicExecutor;// Netty Channel 特殊状态监听器private final ChannelEventListener channelEventListener;// 定时器 (功能: 扫描 responseTable表,将过期的responseFuture移除)private final Timer timer = new Timer("ServerHouseKeepingService", true);// 用于在pipeline指定handler中 执行任务的线程池private DefaultEventExecutorGroup defaultEventExecutorGroup;// 服务端绑定的端口private int port = 0;private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";private static final String TLS_HANDLER_NAME = "sslHandler";private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";// 用于处理 SSL 握手连接的处理器private HandshakeHandler handshakeHandler;// 协议编码 处理器private NettyEncoder encoder;// 连接管理 处理器private NettyConnectManageHandler connectionManageHandler;// 核心业务 处理器private NettyServerHandler serverHandler;// 参数1: nettyServerConfig Netty服务端配置信息// 参数2: channelEventListener channel特殊状态监听器public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 调用父类 就是通过 Semaphore 设置请求并发限制// 1. 设置 单行请求的并发限制// 2. 设置 异步请求的并发限制super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();tyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;// 创建公共线程池 publicExecutor 线程数量为:4int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSL连接的相关方法 (不在本篇的分析范围内)loadSslContext();}}

NettyRemotingServer当中重要的参数:

父类的属性semaphoreOneway, **semaphoreAsync ** 用来控制请求并发量的serverBootstrapNetty服务器启动器nettyServerConfigNetty服务器配置信息channelEventListenerNetty Channel状态监听器eventLoopGroupSelectorworker组eventLoopGroupBossboss组

NettyRemotingServer的启动

// 启动Netty 服务器@Overridepublic void start() {// Netty pipeline中的指定 handler 采用该线程池执行this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 初始化 处理器 handler// 1. handshakeHandler SSL连接// 2. encoder 编码器// 3. connectionManageHandler 连接管理器处理器// 4. serverHandler 核心业务处理器prepareSharableHandlers();// 下面就是 Netty 创建服务端启动器的固定流程 ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端ServerSocketChannel 类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 设置客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true)// 设置 接收缓冲区 和 发送缓冲区的 大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(tyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroupch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 服务器 绑定端口ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器if (this.channelEventListener != null) {tyEventExecutor.start();}// 提交定时任务,每一秒 执行一次// 扫描 responseTable 表, 将过期的 responseFuture 移除this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:

启动Netty服务器开启 channel状态监听线程开启 扫描responseFuture的定时任务

通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据

1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。

RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。

2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),

3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。

4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

NettyRemotingAbstract:抽象类NettyRemotingAbstractNettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。

public abstract class NettyRemotingAbstract {// 控制 单向请求的 并发量protected final Semaphore semaphoreOneway;// 控制 异步请求的 并发量protected final Semaphore semaphoreAsync;// 响应对象映射表 (key: opaque value:responseFuture)protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =new ConcurrentHashMap<Integer, ResponseFuture>(256);// 请求处理器映射表 (key: requestCode value:(processor,executor) )protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);// Netty事件监听线程池protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();// 默认的请求处理器对 包含(processor,executor) protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;// SSL相关protected volatile SslContext sslContext;// 扩展钩子protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。