200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > zookeeper(四)领导者选举

zookeeper(四)领导者选举

时间:2023-10-06 04:27:08

相关推荐

zookeeper(四)领导者选举

先上图:

首先看到这个图肯定就会很懵,确实比较多,体系结构也比较复杂,目前但是如果是根据源码调试加上这篇博客应该问题就不是很大。

我们首先从判定为集群开始说起:即从org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig开始。这里重点说一下quorumPeer.start();方法。

@Overridepublic synchronized void start() {//加载事务和快照。其实就是恢复的意思。主要就是把数据加载到内存,拿到纪元号之类的loadDataBase();//就是启动thread线程,跟单机的那个线程做的一样的事cnxnFactory.start();//领导者选举。(重要)///johnvwan/p/9546909.html 这篇博客讲得很好。startLeaderElection();//使用选举算法选出领导并开始同步。super.start();}

前面2个不说,就跟单机启动一模一样。重点是第三个和第四个。

跟踪到这个方法中:org.apache.zookeeper.server.quorum.QuorumPeer#startLeaderElection

synchronized public void startLeaderElection() {try {//参数:1、myid。2.最新的zxid。3、当前的纪元号//首先投自己一票。在投票箱中写上自己的信息。currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());} catch(IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}//getView().values()就是获取到配置文件中的所有以server.开头的信息。包括observerfor (QuorumServer p : getView().values()) {if (p.id == myid) {//如果id号是自己//为什么要有这个判断,因为你不但要告诉其它服务器投的谁,你还要告诉其它服务器是你投的myQuorumAddr = p.addr;break;}}if (myQuorumAddr == null) {throw new RuntimeException("My id " + myid + " not in the peer list");}if (electionType == 0) {try {udpSocket = new DatagramSocket(myQuorumAddr.getPort());responder = new ResponderThread();responder.start();} catch (SocketException e) {throw new RuntimeException(e);}}//创建选举算法,默认是3.electionAlg可以更改。//此外还有0,1,2.但是现在一般都不推荐了。所以一般不会动//步骤如下://1、初始化QuorumCnxManager//2、初始化QuorumCnxManager.Listener//3、运行QuorumCnxManager.Listener//4、运行QuorumCnxManager//5、返回FastLeaderElection对象//流程如下://1.把自己的投票放入queueSendMap中:用于发送自己的投票信息。首先肯定是投自己。//2.queueSendMap中发送给其它服务器,如果是自己这一台就直接放到recvQueue中,如果是其它服务器就通过socket发送投票信息。//3.不断的检测recvQueue中的投票信息,如果在某一时刻recvQueue中过半的服务器(通过sid标识),那个被投票的那个人就设置为leader,其它就设置为follower。//服务器连接方式id大的去连接id小的,不允许小的连接大的。this.electionAlg = createElectionAlgorithm(electionType);}

这个方法的整体逻辑已经在方法注释中体现出来了。最复杂的是哪个方法呢?看注释就知道是createElectionAlgorithm这个方法了。

protected Election createElectionAlgorithm(int electionAlgorithm){Election le=null;//TODO: use a factory rather than a switchswitch (electionAlgorithm) {case 0:le = new LeaderElection(this);break;case 1:le = new AuthFastLeaderElection(this);break;case 2:le = new AuthFastLeaderElection(this, true);break;case 3://创建一个QuorumCnxManager,里面存在几个重要的属性。//ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap://ConcurrentHashMap<Long, SendWorker> senderWorkerMap:就是用来记录其他服务器id以及对应的SendWorker的//ArrayBlockingQueue<Message> recvQueue:保存选票//QuorumCnxManager.Listenerqcm = createCnxnManager();//姑且认为就是一个监听器。因为是一个线程。QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){listener.start();//启动监听器//这里面比较复杂le = new FastLeaderElection(this, qcm);} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;}

由于0 1 2都是不建议使用了,所以咱们就直接看3的情况。当然这里面哪里很复杂呢?当然是le = new FastLeaderElection(this, qcm);这个了。这个也是今天的重点,快速领导者选举,会很复杂,我们一步一步来。

FastLeaderElection初始化

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){this.stop = false;this.manager = manager;starter(self, manager);}

干了什么呢?就是初始化的赋值,然后进入starter方法。好像也没什么,我们看看这个方法。

starter(self, manager)

private void starter(QuorumPeer self, QuorumCnxManager manager) {this.self = self;proposedLeader = -1;proposedZxid = -1;sendqueue = new LinkedBlockingQueue<ToSend>();recvqueue = new LinkedBlockingQueue<Notification>();this.messenger = new Messenger(manager);}

前面3行就是初始化的。重点是后面3行。看起来好像也没有什么,但是意义非常重大。你现在不需要记住什么,就只需要记住一点,初始化了两个queue。sendqueue 和recvqueue ,用处顾名思义。牢记,后面能用上。

this.messenger = new Messenger(manager)

Messenger(QuorumCnxManager manager) {this.ws = new WorkerSender(manager);Thread t = new Thread(this.ws,"WorkerSender[myid=" + self.getId() + "]");t.setDaemon(true);t.start();this.wr = new WorkerReceiver(manager);t = new Thread(this.wr,"WorkerReceiver[myid=" + self.getId() + "]");t.setDaemon(true);t.start();}

看起也简单。new WorkerSender和new WorkerReceiver而已,然后就开子线程启动了。那么现在开始涉及到多线程了。我们先看WorkerSender,再看WorkerReceiver。

WorkerSender

public void run() {while (!stop) {try {//这里取了是把所有的参与者都放进去了。什么时候放进去的呢?// 通过sendNotifications函数放进去的。ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;//开始处理process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}

sendqueue有印象没有,前面初始化提到过哦。这个是取方法。那么肯定就有放的方法。从哪里放的呢?当然你看到了我的注释。但是啥时候调用的这个方法?神奇不?

当然调试就知道啦!

一直往前推就能追溯到这个方法:org.apache.zookeeper.server.quorum.QuorumPeer#run,这个方式是什么时候启动的呢?第一个代码段的super.start();方法启动的。我们看这段代码:

public void run() {setName("QuorumPeer" + "[myid=" + getId() + "]" +cnxnFactory.getLocalAddress());System.out.println("当前线程:" + Thread.currentThread().getName());LOG.debug("Starting quorum peer");//这个try的内容看不懂要干啥?try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for(QuorumServer s: getView().values()){ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxLocalPeerBean = null;}} else {p = new RemotePeerBean(s);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxQuorumBean = null;}try {/** Main loop*///这里我有一个误区,我以为break是直接跳出了while循环,但实际是只是跳出了switch,还是基础不够扎实。break不但用于跳出循环,也用于跳出switch。//测试文件:com.xq.test.TestWhilewhile (running) {switch (getPeerState()) {case LOOKING:LOG.info("LOOKING");if (Boolean.getBoolean("readonlymode.enabled")) {//如果开启了只读LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(),this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();setBCVote(null);setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception",e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {//没有开启只读try {setBCVote(null);//不断更新投票,直到选出leadersetCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e ); } finally {observer.shutdown();setObserver(null);setPeerState(ServerState.LOOKING);}break;case FOLLOWING:try {LOG.info("FOLLOWING");//产出一个适合follower的类setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;case LEADING:LOG.info("LEADING");try {//产出一个适合leader的类setLeader(makeLeader(logFactory));//这个只是选择过半能接受的epoch,假设有更新的zxid没有启动//在这个确定epoch的时候它启动了,那么这个leader会直接丢失多的数据吗?看代码好像是的leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;}}} finally {LOG.warn("QuorumPeer main thread exited");try {MBeanRegistry.getInstance().unregisterAll();} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}jmxQuorumBean = null;jmxLocalPeerBean = null;}}

从长度和注释就知道这是一块非常核心的代码。因为它就是真正的选举核心以及选举完了之后的角色分配。这段代码实在是非常复杂,建议结合下面的图理解。

为了便于理解再用文字补充一下:

我们所了解到的WorkerSender线程是从sendqueue取的数据。而我们现在就是在看是那里放的数据?放的应该是什么数据?这个应该理解一下就能想到是你投票的数据了。关于多线程这块需要有一定的基础才方便理解。

现在我们回到WorkerSender线程里面,再来回顾一下这段代码:

public void run() {while (!stop) {try {//这里取了是把所有的参与者都放进去了。什么时候放进去的呢?// 通过sendNotifications函数放进去的。ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;//开始处理process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}

取出来之后重点就是process(m);处理了。

void process(ToSend m) {ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader,m.zxid, m.electionEpoch, m.peerEpoch);//构造除byte的数组发送消息manager.toSend(m.sid, requestBuffer);}

主要是看怎么发送的,定位到 manager.toSend(m.sid, requestBuffer)。

public void toSend(Long sid, ByteBuffer b) {/** If sending message to myself, then simply enqueue it (loopback).*/if (this.mySid == sid) {//如果是本机的话b.position(0);//直接添加到recvQueue中addToRecvQueue(new Message(b.duplicate(), sid));/** Otherwise send to the corresponding thread to send.*/} else {//不是本机的话/** Start a new connection if doesn't have one already.*/ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);//putIfAbsent 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回nullArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);if (bqExisting != null) {//如果不是null,说明已经存在了。证明已经投过一票了。就把b加入到bqExisting中addToSendQueue(bqExisting, b);} else {//如果为null,说明目前为止还没有投过。就把b加入到bq中。为啥添加到不同的队列中?addToSendQueue(bq, b);}connectOne(sid);}}

这段代码有细节没懂,不过不是很影响大局。

对照着图下一个应该是SendWorker线程了。这个是啥时候被造出来的?

这个时候分为两种情况。

1.当前就一台启动了,因为目前就一台,所以不需要像其它服务器发消息,所以这个时候并不会造出这个线程,而如果第二台起来了,它才会造这个线程。下面给个调用链。它是怎么实现的?就是不断的探测能否与其它服务器通信,只有能通信才造线程。

org.apache.zookeeper.server.quorum.QuorumPeer#run—>org.apache.zookeeper.server.quorum.QuorumPeer#setCurrentVote–>org.apache.zookeeper.server.quorum.QuorumCnxManager#connectAll–>org.apache.zookeeper.server.quorum.QuorumCnxManager#connectOne–>org.apache.zookeeper.server.quorum.QuorumCnxManager#initiateConnection–>org.apache.zookeeper.server.quorum.QuorumCnxManager#startConnection

2.当前不是第一个启动的,即直接就能与其它服务器通信。

这个又需要追溯到前面了。QuorumCnxManager.Listener listener = qcm.listener;这个在本页面搜一下。在启动listener之后。在这里面就已经造好了SendWorker和RecvWorker线程。这个又是怎么实现的?因为一启动就打开了端口,此时另外一台在向这边发数据,所以是被迫造的线程!

org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener#run–>org.apache.zookeeper.server.quorum.QuorumCnxManager#receiveConnection–>org.apache.zookeeper.server.quorum.QuorumCnxManager#handleConnection

RecvWorker线程也是在这个时候造出来的。

WorkerReceiver也比较类似。虽然细节不一样,但是大体流程也应该清楚了。

主要是要对投票的逻辑图熟悉。在看这些稍微好点。

为了完整性再来看看接收流程。就从前面的WorkerReceiver开始。

Messenger(QuorumCnxManager manager) {this.ws = new WorkerSender(manager);Thread t = new Thread(this.ws,"WorkerSender[myid=" + self.getId() + "]");t.setDaemon(true);t.start();this.wr = new WorkerReceiver(manager);t = new Thread(this.wr,"WorkerReceiver[myid=" + self.getId() + "]");t.setDaemon(true);t.start();}

这个线程对应的run方法也是非常复杂,因为按照常理,我们每接到一个更新的投票就要做统计。统计什么呢?统计是否过半了,以及对其它情况的处理。

因为代码太多了。我们先对照流程图来看。

从流程图上可以看到的是WorkerReceiver线程是从recvQueue中取数据,并放到recvqueue中。也就是说WorkerReceiver做了一个中间转接的过程。知道的大体的情况我们再来看详细的代码。

public void run() {Message response;while (!stop) {// Sleeps on receivetry{//从recvQueue中取出消息。response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);if(response == null) continue;/** If it is from an observer, respond right away.* Note that the following predicate assumes that* if a server is not a follower, then it must be* an observer. If we ever have any other type of* learner in the future, we'll have to change the* way we check for observers.*///判断能否投票,因为observer是不能投票的,所以先验证是不是参与者if(!validVoter(response.sid)){//如果不是参与者(如观察者)Vote current = self.getCurrentVote();ToSend notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,current.getPeerEpoch());//直接把接收到的消息发出去。sendqueue.offer(notmsg);} else {//如果是参与者// Receive new messageif (LOG.isDebugEnabled()) {LOG.debug("Receive new notification message. My id = "+ self.getId());}/** We check for 28 bytes for backward compatibility*///兼容性检查if (response.buffer.capacity() < 28) {LOG.error("Got a short response: "+ response.buffer.capacity());continue;}boolean backCompatibility = (response.buffer.capacity() == 28);//为啥要清空?response.buffer.clear();// Instantiate Notification and set its attributesNotification n = new Notification();// State of peer that sent this messageQuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;switch (response.buffer.getInt()) {case 0:ackstate = QuorumPeer.ServerState.LOOKING;break;case 1:ackstate = QuorumPeer.ServerState.FOLLOWING;break;case 2:ackstate = QuorumPeer.ServerState.LEADING;break;case 3:ackstate = QuorumPeer.ServerState.OBSERVING;break;default:continue;}n.leader = response.buffer.getLong();n.zxid = response.buffer.getLong();n.electionEpoch = response.buffer.getLong();n.state = ackstate;n.sid = response.sid;if(!backCompatibility){n.peerEpoch = response.buffer.getLong();} else {if(LOG.isInfoEnabled()){LOG.info("Backward compatibility mode, server id=" + n.sid);}n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);}/** Version added in 3.4.6*/n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0;/** Print notification info*/if(LOG.isInfoEnabled()){printNotification(n);}/** If this server is looking, then send proposed leader*/if(self.getPeerState() == QuorumPeer.ServerState.LOOKING) {//如果自己也在寻找leader的话recvqueue.offer(n);//就放到recvqueue中/** Send a notification back if the peer that sent this* message is also looking and its logical clock is* lagging behind.*///如果发送此消息的对方也正在LOOKING,并且其logical clock落后,则发送通知。if((ackstate == QuorumPeer.ServerState.LOOKING)&& (n.electionEpoch < logicalclock.get())){Vote v = getVote();ToSend notmsg = new ToSend(ToSend.mType.notification,v.getId(),v.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,v.getPeerEpoch());sendqueue.offer(notmsg);}} else {//如果自己并不是寻找leader的话,则将自己认为的leader发送给对方。/** If this server is not looking, but the one that sent the ack* is looking, then send back what it believes to be the leader.*/Vote current = self.getCurrentVote();System.out.println("当前认可的sid:"+current.getId());if(ackstate == QuorumPeer.ServerState.LOOKING){if(LOG.isDebugEnabled()){LOG.debug("Sending new notification. My id = " +self.getId() + " recipient=" +response.sid + " zxid=0x" +Long.toHexString(current.getZxid()) +" leader=" + current.getId());}ToSend notmsg;if(n.version > 0x0) {notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),current.getElectionEpoch(),self.getPeerState(),response.sid,current.getPeerEpoch());} else {Vote bcVote = self.getBCVote();notmsg = new ToSend(ToSend.mType.notification,bcVote.getId(),bcVote.getZxid(),bcVote.getElectionEpoch(),self.getPeerState(),response.sid,bcVote.getPeerEpoch());}sendqueue.offer(notmsg);}}}} catch (InterruptedException e) {System.out.println("Interrupted Exception while waiting for new message" +e.toString());}}LOG.info("WorkerReceiver is down");}}

提一提上述代码的关键细节:

1.manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS)方法怎么取的

public Message pollRecvQueue(long timeout, TimeUnit unit)throws InterruptedException {return recvQueue.poll(timeout, unit);}

2.放入recvqueue中

if(self.getPeerState() == QuorumPeer.ServerState.LOOKING) {//如果自己也在寻找leader的话recvqueue.offer(n);//就放到recvqueue中/** Send a notification back if the peer that sent this* message is also looking and its logical clock is* lagging behind.*///如果发送此消息的对方也正在LOOKING,并且其logical clock落后,则发送通知。if((ackstate == QuorumPeer.ServerState.LOOKING)&& (n.electionEpoch < logicalclock.get())){Vote v = getVote();ToSend notmsg = new ToSend(ToSend.mType.notification,v.getId(),v.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,v.getPeerEpoch());sendqueue.offer(notmsg);}}

至于RecvWorker线程就自己取调试,比较简单。

最后我们来说说它的验证。这个也非常复杂了。看代码。

org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader这个方法是在什么时候调用的呢?就是org.apache.zookeeper.server.quorum.QuorumPeer#run这个线程里面。这里面就描述了找leader的流程。

//领导者选举public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}if (self.start_fle == 0) {self.start_fle = Time.currentElapsedTime();}try {//初始化一个投票的mapHashMap<Long, Vote> recvset = new HashMap<Long, Vote>();HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = finalizeWait;synchronized(this){logicalclock.incrementAndGet();//更新选票,这个时候一直投的自己updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id = " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));//向其它服务器发送选票sendNotifications();/** Loop in which we exchange notifications until we find a leader*///不断获取其他服务器的投票信息,直到选出Leaderwhile ((self.getPeerState() == ServerState.LOOKING) &&(!stop)) {/** Remove next notification from queue, times out after 2 times* the termination time*/Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*/if(n == null){//如果取到的为null,说明没有取到消息。if(manager.haveDelivered()){sendNotifications();} else {manager.connectAll();}/** Exponential backoff*/int tmpTimeOut = notTimeout*2;notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);LOG.info("Notification time out: " + notTimeout);}else if(validVoter(n.sid) && validVoter(n.leader)) {/** Only proceed if the vote comes from a replica in the* voting view for a replica in the voting view.*///处理投票switch (n.state) {case LOOKING:// If notification > current, replace and send messages out//如果参加选举的纪元(Epoch)>当前逻辑的纪元,说明参加选举的肯定比现在选举的更新,更有说服力//所以当前就把纪元更新一下。清空自己得到的投票(比如唐代的尚方宝剑肯定不能拿到清朝使用,一个道理)。//然后就投那个参加选举的纪元。if (n.electionEpoch > logicalclock.get()) {logicalclock.set(n.electionEpoch);recvset.clear();if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}//通知其它服务器sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {//如果比当前小,证明参加选举的已经过时,就直接忽略这个投票信息。if(LOG.isDebugEnabled()){LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"+ Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));}break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {//这个时候epoch肯定一致了。所以就来比较一下zxid,谁得zxid越新就投给谁。//如果连zxid都一致,那就看sid(即serverid),这个肯定是能区分的,所以就确定了投票。//但是如果自己跟自己比,就没有意义了,所以自己就不进。updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}if(LOG.isDebugEnabled()){LOG.debug("Adding vote: from=" + n.sid +", proposed leader=" + n.leader +", proposed zxid=0x" + Long.toHexString(n.zxid) +", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));}recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));///判断是否投票结束,就是判断是否有过半的服务器投的同一个。if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) {// Verify if there is any change in the proposed leader//验证投票中过半的投票是否有变动while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*///上面的while一直从recvqueue取,直到取为空,并且还要符合过半机制才结束,所以这里的n基本就是null//然后就分配各自的角色。if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock.get(),proposedEpoch);leaveInstance(endVote);return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: " + n.sid);break;case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*/if(n.electionEpoch == logicalclock.get()){recvset.put(n.sid, new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch));if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify* a majority is following the same leader.*/outofelection.put(n.sid, new Vote(n.version,n.leader,n.zxid,n.electionEpoch,n.peerEpoch,n.state));if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock.set(n.electionEpoch);self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());}Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",n.state, n.sid);break;}} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);}if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);}}}return null;} finally {try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}",manager.getConnectionThreadCount());}}

看看上面代码中的totalOrderPredicate方法。为什么要说这个方法呢?因为这个方法多次出现,所以非常重要,这个方法就是拿来判断别的服务器是否比你更好。判断逻辑就是:Epoch,Zxid,myid依次比较,直到比较出一个大小出来。

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same* as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));}

那么哪里判断是结束投票呢?org.apache.zookeeper.server.quorum.FastLeaderElection#termPredicate这个方法中。

protected boolean termPredicate(HashMap<Long, Vote> votes,Vote vote) {HashSet<Long> set = new HashSet<Long>();/** First make the views consistent. Sometimes peers will have* different zxids for a server depending on timing.*/for (Map.Entry<Long,Vote> entry : votes.entrySet()) {if (vote.equals(entry.getValue())){set.add(entry.getKey());}}//这里的containsQuorum具体在下面就是它的实现return self.getQuorumVerifier().containsQuorum(set);}

public boolean containsQuorum(Set<Long> set){return (set.size() > half);}

分配完后就要确定各自的角色了,值得一提的是它并不是过半马上就设置了,而是做了严谨的判断。

它确定好有leader产生后就会再次判断,是否还是这样的结果。而且会取完recvqueue的所有数据后确定是这个了才会分配角色。recvqueue取完就表示一件事情,其它服务器没有发送投票了,证明他们也选举了leader。所以在这个时候确定leader就能完全判断了。

有人可能会问万一服务器选出来的leader不同呢?

这个问题不存在的,首先过半验证就是说有一半以上的服务器选的同一个,那么想两个服务器选出不一样的leader,那么发送的数据永远不可能存在另一个过半的服务器(当然你非要说数据被篡改了之类的就不用说了,凡事无绝对),即不可能存在这种情况,而且必须是recvqueue空了后才设置角色的,说明基本要所有服务器确认了leader才能为空。所以选出来的一定是同一个leader!!

后面再说选举完了的流程,这篇先到这里,够长了。后面主要了解,设置角色之后各自需要做什么工作,发送一个请求的流程是怎样的?一致性是怎么保证的等等。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。