🔥🔥 SegmentFault D-Day Online 开源开放与新技术创新,快来报名 >>>
rocketMQ通信模块
Rocketmq的通信层是基于通信框架netty 4.0.21.Final之上做了简单的协议封装,基本的类图如下:
通讯模块是怎么进行的消息传输的
先来看看服务器端启动做了什么:
netty服务器启动,监听在8888;netty设置了一个心跳检测器IdleStateHandler,读写超时时间为120s,在120s后都没有读写操作将会触发相应事件。启动一个线程,获取阻塞队列eventQueue的元素,当netty channel通道发生CONNECT, CLOSE,IDLE,EXCEPTION事件时,队列会被放入事件对象启动一个定时器Timer,每个1s执行一次,扫描ResponseFuture,超时没有获取结果的会被移除掉
客户端跟服务器端差不多。
rocketmq提供了三种通信方式:
一、invokeSyncImpl 同步调用(主要实现参见NettyRemotingAbstract.invokeSyncImpl
)
同步调用是指客户端发起远程调用后,当前线程会被阻塞,直到服务器端返回结果或发生超时异常,我们在发送消息时需要同步知道消息发送成功还是失败,一般使用这种方式。
我们知道,netty是异步基于事件驱动的,当我们使用netty向远程服务器发送消息是通过channel.writeAndFlush
方法,此方法是异步的,那我们如何同步的获取服务器的返回结果呢?这里的做法是在向服务器发送消息时设置一个唯一的序列号,本地会通过上下文保存一个ResponseFuture
对象在Map
中,key
就是这个唯一的序列号,value
就是这个ResponseFuture
对象,ResponseFuture
对象会设置一个CountDownLatch
,每当发送完消息后,就会调用CountDownLatch
的await
方法挂起当前线程;当服务器返回结果时也会携带之前客户端传递过去的唯一序列号,这样就可以找到ResponseFuture
对象,再调用CountDownLatch
的countDown
方法,此时客户端之前挂起的线程就会苏醒过来,完成一次同步调用。
二、invokeAsyncImpl异步调用(主要实现参见NettyRemotingAbstract.invokeAsyncImpl
)
客户端发起远程调用前会先设置一个InvokeCallback
类,当然也是设置在ResponseFuture
对象中,调用结束后不会等待结果,当服务器返回时也是跟同步调用一样会在新的线程里面先找到ResponseFuture
,然后执行回调接口也就是InvokeCallback
的operationComplete
方法。如果服务器返回结果超时,也会进行回调,客户端可以根据相关的状态来执行相关逻辑。
异步调用不会阻塞线程,调用后会立即返回,调用结果会在异步线程里面执行回调来获取,使用Async需要控制好节奏,不能发送的太快以防止压垮服务器端。所以在invokeAsyncImpl方法里面设置了一个信号量,默认是64个,只有获取到许可的请求才能真正发起远程调用。
三、invokeOnewayImpl 单向调用(主要实现参见NettyRemotingAbstract.invokeOnewayImpl
)
客户端发送请求后不会等待服务端返回的结果,并且会忽略服务端的处理结果;当前线程调用完毕,调用方并不关心服务器端的处理结果,也不会被阻塞,跟异步调用一样需要控制好节奏以防压垮服务器端。在invokeOnewayImpl方法里面也设置了一个信号量,默认是256个,只有获取到许可的请求才能真正发起远程调用。
三种通信方式的对比
传输中的协议
RemotingCommand是rocketMQ消息传输的媒介,所有的消息都会包装成RemotingCommand来进行传输。而这个对象会在netty传输之前进行编码,消息接收到进行解码。
RemotingCommand是由头部(header)和消息体(body)组成,消息发送的时候,头部和消息体会分开进行编码。那么RemotingCommand是如何组成的呢?
RemotingCommand的核心字段:
public class RemotingCommand{private int code;private LanguageCode language = LanguageCode.JAVA;private int version = 0;private int opaque = requestId.getAndIncrement();private int flag = 0;private String remark;private HashMap<String, String> extFields;private transient CommandCustomHeader customHeader;private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;private transient byte[] body;}
头部(header)
请求头接收方和发起方的含义略有不同,下面的表格详细的说明:
头信息里面还包括了CommandCustomHeader的自定义的一些头信息,会被通过反射的方式放在extFields字段里面
消息体
消息体是直接变为byte数组,由客户端自己序列化,这两部分后一起放入netty传输的ByteBuffer中,一起传输到接收端
报文格式与序列化
length:表示整个数据包的长度 占4个字节header length:表示header的长度(高一位字节表示序列化类型,低三位字节表示长度)
headerData的序列化有两种方式:
json:使用fastjson进行序列化自定义:使用bytebuffer自定义序列化
Netty服务器端在启动时设置了TCP参数的含义
SO_BACKLOG:1024
指定全连接队列数,linux系统在文件/proc/sys/net/core/somaxconn
指定,默认128;
还有一个半连接队列数,linux在文件/proc/sys/net/ipv4/tcp_max_syn_backlog
指定
SO_REUSEADDR:true
重用处于time_wait状态下的连接
SO_KEEPALIVE:false
保活机制
TCP_NODELAY:true
关闭Nagle算法,Nagle算法可以降低网络里小包的数量,从而提升网络性能,关闭可以提高实时性
SO_SNDBUF:65535
发送缓存区大小
SO_RCVBUF:65535
接受缓存区大小
SO_RCVLOWAT:接收缓存水位线
SO_SNDLOWAT:发送缓存水位线
它们一般被I/O复用系统调用用来判断socket是否可读或可写。当TCP接收缓冲区中可读数据的总数大于其低水位标记时,I/O复用系统调用将通知应用程序可以从对应的socket上读取数据;当TCP发送缓冲区中的空闲空间(可以写入数据的空间)大于其低水位标记时,I/O复用系统调用将通知应用程序可以往对应的socket上写入数据
在netty中好像没有看到有设置这两个参数
CONNECT_TIMEOUT_MILLIS:3000
连接超时时间