200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ消费者端消息列队六种负载均衡算法分析

RocketMQ消费者端消息列队六种负载均衡算法分析

时间:2023-01-03 17:54:35

相关推荐

RocketMQ消费者端消息列队六种负载均衡算法分析

在RocketMQ启动的时候会启动负载均衡线程,过程如下:

//DefaultMQPullConsumerImpl.start()mQClientFactory.start();//上面点进去 ->MQClientInstance.start(),rebalanceService继承了ServiceThread,//ServiceThread实现了Runnable接口this.rebalanceService.start();//继续下一层,MQClientInstance.doRebalance()找到下面impl.doRebalance();//..在一层层点进去,最后找到RebalanceImpl.rebalanceByTopic方法,找到AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

AllocateMessageQueueStrategy就是实现消费者消息队列负载均衡算法的接口。

该接口在rocketMq-4.3.0版本中有六种实现方法:

AllocateMessageQueueAveragely:平均算法AllocateMessageQueueAveragelyByCircle:环形平均算法AllocateMessageQueueByConfig:根据配置负载均衡算法AllocateMessageQueueByMachineRoom:根据机房负载均衡算法AllocateMessageQueueConsistentHash:一致性哈希负载均衡算法AllocateMachineRoomNearby:靠近机房策略

在客户端没有指定的情况下,RocketMQ默认使用AllocateMessageQueueAveragely平均算法。

一、 AllocateMessageQueueAveragely平均负载均衡算法

平均算法顾名思义就是取平均值,该方法四个参数,consumerGroup(消费者组名称)、

currentCID(当前消费者的id)、mqAll(当前topic下面所有的消息队列)、cidAll(当前消费者组下面所有的消费者id)。算法思想就是把算出平均值然后将连续的队列分配给每个消费者。假设队列大小是8(编号0-7),消费者数量3(编号0-2),分配结果就是:

消费者0:队列0,1,2;

消费者1:队列3,4,5;

消费者2:队列6,7。

下面具体来看代码:

@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// cidAll.size() = 3; mqAll.size()=8;mod=2;//index=0;averageSize=3;startIndex=0;range=3,result={0,1,2}//index = 1;averageSize=3;startIndex=3;range = 3;result={3,4,5}//index = 2;averageSize=2;startIndex=6;range = 2;result={6,7}int index = cidAll.indexOf(currentCID);//取模a除以b的余数int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}

参数校验的就不说了,index算出当前消费者的下标,mod是队列数量对消费者取模,算出来余数(在不能平均分的情况下,前面mod个消费者多一个队列)。averageSize就是算出当前消费者可以分配多少个消息队列。startIndex消息队列开始的下标。

二、AllocateMessageQueueAveragelyByCircle环形平均分配算法

环形分配就可以看成:所有消费者围成一个环,然后循环这个环分配队列。AllocateMessageQueueAveragely方法平均分配的是连续的队列,环形分配的就是间隔的队列。核心代码就一个for循环,也很好理解。假设mq8个,消费者3个,分配后的结果就是{0,3,6},{1,4,7},{2,5}

public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {private final Logger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 假设:cidAll.size() = 3; mqAll.size()=8;index=0{0,3,6},index=1{1,4,7},index=2{2,5},转圈算法int index = cidAll.indexOf(currentCID);for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));}}return result;}@Overridepublic String getName() {return "AVG_BY_CIRCLE";}}

三、AllocateMessageQueueByMachineRoom机房分配算法

机房分配现根据MQ中的brokerName找出有效的机房信息(也就是消息队列)。然后在评分,这个算法的逻辑是先算出平均值和余数,它和AllocateMessageQueueAveragely平均算法的不同在于,它是先给每个消费者分配mod(平均值个数)个消息队列,然后余数在从头开始一个个分配,假设mq有8个,消费者3个,那么平均值mod = 2,余数2,分配方式就是每个消费者先分配两个mq,{0,1},{2,3},{4,5},然后余数2个在从头开始分配,最后就是{0,1,6},{2,3,7},{4,5}。

public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {private Set<String> consumeridcs;@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();int currentIndex = cidAll.indexOf(currentCID);if (currentIndex < 0) {return result;}List<MessageQueue> premqAll = new ArrayList<MessageQueue>();//根据brokerName解析出所有有效的机房信息(有效mq)for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);}}//平均结果int mod = premqAll.size() / cidAll.size();// 平均之后的余数int rem = premqAll.size() % cidAll.size();int startIndex = mod * currentIndex;int endIndex = startIndex + mod;//所有消费者先平均分配,多的余数在依次分配,例如:队列8,消费者3,{0,1,6},{2,3,7},{4,5}for (int i = startIndex; i < endIndex; i++) {result.add(mqAll.get(i));}if (rem > currentIndex) {result.add(premqAll.get(currentIndex + mod * cidAll.size()));}return result;}@Overridepublic String getName() {return "MACHINE_ROOM";}public Set<String> getConsumeridcs() {return consumeridcs;}public void setConsumeridcs(Set<String> consumeridcs) {this.consumeridcs = consumeridcs;}

四、AllocateMessageQueueConsistentHash一致性哈希负载均衡算法

这个算法算是这几种中最复杂的一个吧。一致性哈希负载均衡的目的是要保证:相同的请求尽可能落在同一个服务器上。为什么是说尽可能?因为服务器会发生上下线,在少数服务器变化的时候不应该影响大多数的请求。再讲本节算法前,先简单的介绍一下一致性哈希算法,大家如果想有一个更深入的理解,可以去网上搜索更多相关资料。

普通hash算法存在的问题

普通hash算法我们可以简单理解为对key值进行hash之后对服务器取模,也就是hash(key) % n.这个时候如果我们的一台服务器宕机了,或者需要新增一台服务器,那么我们的n值就会变更,这样就会导致我们所有的请求都会变更。举个简单的例子,我们有个redis集群,部署了4台服务器,如果我们将key1使用随机存储,那么我们找key1的时候可能就需要遍历4服务器,效率差。在换种方式,对key1哈希操作后取模,将它定位到一台服务器上,这样在查找key1的时候我们就可以很快的定位到一台服务器上。可是这样还有种问题就是之前所说的如果我们redis集群增加了一台服务器,或者有一台服务器宕机了要从集群中去除。这样再通过hash算出了值就发生了变化。短时间发生缓存雪崩。

一致性hash算法

核心点:

哈希环。刚才的hash算法是对服务器取模,一致性哈希算法使用的是对2^32取模,

就是一致性哈希将整个hash空间组织成了一个圆环,0-2^32-1.物理节点:将服务器(ip+端口)进行hash,映射成环上的一个节点。当请求到来时,根据请求的key,hash映射到环上,顺时针选取最近的一个服务器进行请求。虚拟节点:当环上的服务器较少的时候,会出现分配不均匀的情况,即大量的请求落在同一台服务器上。为了避免这种情况,就引入了虚拟节点,比如通过添加后缀的方式给物理节点克隆出三个虚拟节点,如果两台物理节点,都克隆三个虚拟节点,那么环上就一共有8个节点。只是被克隆的虚拟节点最后还是会定位到实际物理节点上,但是可以有效的分摊请求。

一致性哈希相对于普通hash,优点在于映射到环上的其请求,是发送到环上离他最近的一个服务器,如果我们一台服务器宕机或者新增一台服务器,那么影响的请求只有这台服务器和前一个服务器节点之间的请求,其他的并不会影响。

接下来,我们再来看看RocketMq是如何实现一致性哈希负载均衡算法

//核心代码Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));}//ConsistentHashRouter关键类final ConsistentHashRouter<ClientNode> router; //for building hash ringif (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);} else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);}List<MessageQueue> results = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;private static class ClientNode implements Node {private final String clientID;public ClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}}

上面代码在ConsistentHashRouter这个类中构建了哈希环,算法的主要实现都是在这个类中实现的。先来看这个类的构造方法。构造方法中pNodes表示物理节点;vNodeCount表示虚拟节点个数,默认十个;HashFunction表示哈希算法接口,可以自己实现,默认使用MD5实现哈希算法。addNode方法将物理节点和虚拟节点映射到哈希环上。

public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {if (hashFunction == null) {throw new NullPointerException("Hash Function is null");}this.hashFunction = hashFunction;if (pNodes != null) {for (T pNode : pNodes) {addNode(pNode, vNodeCount);}}}

哈希环的构建

这里构建哈希环是通过TreeMap来实现的。

private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();

将物理节点、虚拟节点放入treeMap里。通过treeMap的tailMap、firstKey()等方法来获取请求映射对应的节点。后面在细讲。

addNode方法的实现:

public void addNode(T pNode, int vNodeCount) {if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);//获取环上对应物理节点已经有的虚拟节点个数int existingReplicas = getExistingReplicas(pNode);for (int i = 0; i < vNodeCount; i++) {VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);ring.put(hashFunction.hash(vNode.getKey()), vNode);}}public int getExistingReplicas(T pNode) {int replicas = 0;//循环ring,找出物理节点pNode在环上有多少虚拟节点for (VirtualNode<T> vNode : ring.values()) {if (vNode.isVirtualNodeOf(pNode)) {replicas++;}}return replicas;}

addNode先通过getExistingReplicas方法找出该pNode物理节点在环上已经存在的虚拟节点的个数。然后循环创建新的虚拟节点,下标依次+1。再将新的虚拟节点加入哈希环。哈希环构建好了,再回去继续看我们的主流程。

接着循环所有mq,通过router.routeNode方法找到mq映射的物理节点。

ClientNode clientNode = router.routeNode(mq.toString());//ConsistentHashRouter.routeNodepublic T routeNode(String objectKey) {if (ring.isEmpty()) {return null;}Long hashVal = hashFunction.hash(objectKey);//tailMap方法找到大于等于hashVal映射的集合SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();return ring.get(nodeHashVal).getPhysicalNode();}

将mq.toString哈希后,得出hashVal,使用TreeMap.tailMap方法得出map中key值大于等于hashVal的集合映射,就是得出了key值在hashVal之后的所有值。接着判断后面的map集合是不是空的,如果不是空的,就取它的第一个值(firstKey),也就是hashVal映射在环上的顺时针最近的节点。如果是空的,就说明该hashVal后面已经没有节点了,因为是环,所以取ring.first(),取环上的第一个节点。然后返回该节点对应的物理节点。

回到主流程,拿到物理节点后,和当前请求客户端id进行比较,是同一个,就把mq分配给他

if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}

五、AllocateMessageQueueByConfig通过配置负载均衡

这个没啥好说的,自定义配置。

六、AllocateMachineRoomNearby靠近机房策略

休息一下

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。