百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

zookeeper的Leader选举源码解析

liuian 2025-03-04 13:06 39 浏览

作者:京东物流 梁吉超

zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。

01Leader选举机制

Leader选举机制采用半数选举算法。

每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。

02Leader选举集群配置

1. 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg

2. 修改zoo.cfg文件,修改值如下:

【plain】
zoo1.cfg文件内容:
dataDir=/export/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer


zoo2.cfg文件内容:
dataDir=/export/data/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer


zoo3.cfg文件内容:
dataDir=/export/data/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer


zoo4.cfg文件内容:
dataDir=/export/data/zookeeper-4
clientPort=2184
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer

3. server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识

  • participant默认参与选举标识,可不写. observer不参与选举

4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。

5. 启动三个zookeeper实例:

  • bin/zkServer.sh start conf/zoo1.cfg
  • bin/zkServer.sh start conf/zoo2.cfg
  • bin/zkServer.sh start conf/zoo3.cfg

6. 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。

03Leader选举流程

图1 第一轮到第二轮投票流程

前提:

设定票据数据格式vote(sid,zxid,epoch)

  • sid是Server ID每台服务的唯一标识,是myid文件内容;
  • zxid是数据事务id号;
  • epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。

按照顺序启动sid=1,sid=2节点

第一轮投票:

1. sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;

2. sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);

4. sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;

5. 第一轮投票选举结束。

第二轮投票:

1. sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;

2. sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;

4. sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。

这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。

3.1 Leader选举采用多层队列架构

zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。

图2 多层队列上下关系交互流程图


04解析代码入口类

通过查看zkServer.sh文件内容找到服务启动类:

org.apache.zookeeper.server.quorum.QuorumPeerMain

05选举流程代码解析

图3 选举代码实现流程图

1. 加载配置文件QuorumPeerConfig.parse(path);

针对 Leader选举关键配置信息如下:

  • 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。
    • 设置peerType当前应用是否参与选举
  • new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.
【Java】
public QuorumMaj(Properties props) throws ConfigException {
        for (Entry entry : props.entrySet()) {
            String key = entry.getKey().toString();
            String value = entry.getValue().toString();
            //读取集群配置文件中的server.开头的应用实例配置信息
            if (key.startsWith("server.")) {
                int dot = key.indexOf('.');
                long sid = Long.parseLong(key.substring(dot + 1));
                QuorumServer qs = new QuorumServer(sid, value);
                allMembers.put(Long.valueOf(sid), qs);
                if (qs.type == LearnerType.PARTICIPANT)
//应用实例绑定的角色为PARTICIPANT意为参与选举
                    votingMembers.put(Long.valueOf(sid), qs);
                else {
                    //观察者成员
                    observingMembers.put(Long.valueOf(sid), qs);
                }
            } else if (key.equals("version")) {
                version = Long.parseLong(value, 16);
            }
        }
        //过半基数
        half = votingMembers.size() / 2;
    }

2.
QuorumPeerMain.runFromConfig(config) 启动服务;

3.
QuorumPeer.startLeaderElection() 开启选举服务;

  • 设置当前选票new Vote(sid,zxid,epoch)
【plain】
synchronized public void startLeaderElection(){
try {
           if (getPeerState() == ServerState.LOOKING) {
               //首轮:当前节点默认投票对象为自己
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }
//........
}
  • 创建选举管理类:QuorumCnxnManager;
  • 初始化recvQueue接收投票队列(第二层传输队列);
  • 初始化queueSendMap按sid发送投票队列(第二层传输队列);
  • 初始化senderWorkerMap发送投票工作线程容器,表示着与sid投票节点已连接;
  • 初始化选举监听线程类QuorumCnxnManager.Listener。
【Java】
//QuorumPeer.createCnxnManager()
public QuorumCnxManager(QuorumPeer self,
                        final long mySid,
                        Map view,
                        QuorumAuthServer authServer,
                        QuorumAuthLearner authLearner,
                        int socketTimeout,
                        boolean listenOnAllIPs,
                        int quorumCnxnThreadsSize,
                        boolean quorumSaslAuthEnabled) {
    //接收投票队列(第二层传输队列)
    this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY);
    //按sid发送投票队列(第二层传输队列)
    this.queueSendMap = new ConcurrentHashMap>();
    //发送投票工作线程容器,表示着与sid投票节点已连接 
    this.senderWorkerMap = new ConcurrentHashMap();
    this.lastMessageSent = new ConcurrentHashMap();


    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }


    this.self = self;


    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;


    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);
    // Starts listener thread that waits for connection requests 
    //创建选举监听线程 接收选举投票请求
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();//启动选举监听线程
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
return le;}

4. 开启选举监听线程
QuorumCnxnManager.Listener;

  • 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap
  • sid>self.sid才可以连接过来。
【Java】
//上面的listener.start()执行后,选择此方法
public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    Socket client = null;
    while((!shutdown) && (numRetries < 3)){
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);
            if (self.getQuorumListenOnAllIPs()) {
                int port = self.getElectionAddress().getPort();
                addr = new InetSocketAddress(port);
            } else {
                // Resolve hostname for this server in case the
                // underlying ip address has changed.
                self.recreateSocketAddresses(self.getId());
                addr = self.getElectionAddress();
            }
            LOG.info("My election bind port: " + addr.toString());
            setName(addr.toString());
            ss.bind(addr);
            while (!shutdown) {
                client = ss.accept();
                setSockOpts(client);
                LOG.info("Received connection request "
                        + client.getRemoteSocketAddress());
                // Receive and handle the connection request
                // asynchronously if the quorum sasl authentication is
                // enabled. This is required because sasl server
                // authentication process may take few seconds to finish,
                // this may delay next peer connection requests.
                if (quorumSaslAuthEnabled) {
                    receiveConnectionAsync(client);
                } else {
//接收连接信息
                    receiveConnection(client);
                }
                numRetries = 0;
            }
        } catch (IOException e) {
            if (shutdown) {
                break;
            }
            LOG.error("Exception while listening", e);
            numRetries++;
            try {
                ss.close();
                Thread.sleep(1000);
            } catch (IOException ie) {
                LOG.error("Error closing server socket", ie);
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while sleeping. " +
                    "Ignoring exception", ie);
            }
            closeSocket(client);
        }
    }
    LOG.info("Leaving listener");
    if (!shutdown) {
        LOG.error("As I'm leaving the listener thread, "
                + "I won't be able to participate in leader "
                + "election any longer: "
                + self.getElectionAddress());
    } else if (ss != null) {
        // Clean up for shutdown.
        try {
            ss.close();
        } catch (IOException ie) {
            // Don't log an error for shutdown.
            LOG.debug("Error closing server socket", ie);
        }
    }
}


//代码执行路径:receiveConnection()->handleConnection(...)
private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
//...省略
     if (sid < self.getId()) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }


            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: {}", sid);
            closeSocket(sock);


            if (electionAddr != null) {
                connectOne(sid, electionAddr);
            } else {
                connectOne(sid);
            }


        } else { // Otherwise start worker threads to receive data.
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);


            SendWorker vsw = senderWorkerMap.get(sid);


            if (vsw != null) {
                vsw.finish();
            }
  //存储连接信息
            senderWorkerMap.put(sid, sw);


            queueSendMap.putIfAbsent(sid,
                    new ArrayBlockingQueue(SEND_CAPACITY));


            sw.start();
            rw.start();
     }
}

5. 创建FastLeaderElection快速选举服务;

  • 初始选票发送队列sendqueue(第一层队列)
  • 初始选票接收队列recvqueue(第一层队列)
  • 创建线程WorkerSender
  • 创建线程WorkerReceiver
【Java】
//FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    //发送队列sendqueue(第一层队列)
    sendqueue = new LinkedBlockingQueue();
    //接收队列recvqueue(第一层队列)
    recvqueue = new LinkedBlockingQueue();
    this.messenger = new Messenger(manager);
}
//new Messenger(manager)
Messenger(QuorumCnxManager manager) {
    //创建线程WorkerSender
    this.ws = new WorkerSender(manager);


    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);
    //创建线程WorkerReceiver
    this.wr = new WorkerReceiver(manager);


    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}

6. 开启WorkerSender和WorkerReceiver线程。

WorkerSender线程自旋获取sendqueue第一层队列元素

  • sendqueue队列元素内容为相关选票信息详见ToSend类;
  • 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;
  • 不相同将sendqueue队列元素转储到queueSendMap第二层传输队列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
//...
  public void run() {
    while (!stop) {
        try {
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;
  //将投票信息发送出去
            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
  }
}
//QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
    /*
     * If sending message to myself, then simply enqueue it (loopback).
     */
    if (this.mySid == sid) {
         b.position(0);
         addToRecvQueue(new Message(b.duplicate(), sid));
        /*
         * Otherwise send to the corresponding thread to send.
         */
    } else {
         /*
          * Start a new connection if doesn't have one already.
          */
         ArrayBlockingQueue bq = new ArrayBlockingQueue(
            SEND_CAPACITY);
         ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq);
         //转储到queueSendMap第二层传输队列中
         if (oldq != null) {
             addToSendQueue(oldq, b);
         } else {
             addToSendQueue(bq, b);
         }
         connectOne(sid);     
    }
}

WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。

【Java】
//WorkerReceiver
public void run() {
    Message response;
    while (!stop) {
      // Sleeps on receive
      try {
          //自旋获取recvQueue第二层传输队列元素
          response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
          if(response == null) continue;
          // The current protocol and two previous generations all send at least 28 bytes
          if (response.buffer.capacity() < 28) {
              LOG.error("Got a short response: " + response.buffer.capacity());
              continue;
          }
          //...
  if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
         //第二层传输队列元素转存到recvqueue第一层队列中
         recvqueue.offer(n);
         //...
      }
    }
//...
}


06选举核心逻辑

1. 启动线程QuorumPeer

开始Leader选举投票makeLEStrategy().lookForLeader();

sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。

【plain】
//QuorunPeer.run
//...
try {
   reconfigFlagClear();
    if (shuttingDownLE) {
       shuttingDownLE = false;
       startLeaderElection();
       }
    //makeLEStrategy().lookForLeader() 发送投票
    setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
    setPeerState(ServerState.LOOKING);
}  
//...
//FastLeaderElection.lookLeader
public Vote lookForLeader() throws InterruptedException {
//...
  //向其他应用发送投票
sendNotifications();
//...
}


private void sendNotifications() {
    //获取应用节点
    for (long sid : self.getCurrentAndNextConfigVoters()) {
        QuorumVerifier qv = self.getQuorumVerifier();
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch, qv.toString().getBytes());
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        //储存投票信息
        sendqueue.offer(notmsg);
    }
}


class WorkerSender extends ZooKeeperThread {
    //...
    public void run() {
    while (!stop) {
        try {
//提取已储存的投票信息
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;


            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
  }
//...
}

自旋recvqueue队列元素获取投票过来的选票信息:

【Java】
public Vote lookForLeader() throws InterruptedException {
//...
/*
 * Loop in which we exchange notifications until we find a leader
 */
while ((self.getPeerState() == ServerState.LOOKING) &&
        (!stop)){
    /*
     * Remove next notification from queue, times out after 2 times
     * the termination time
     */
    //提取投递过来的选票信息
    Notification n = recvqueue.poll(notTimeout,
            TimeUnit.MILLISECONDS);
/*
 * Sends more notifications if haven't received enough.
 * Otherwise processes new notification.
 */
if(n == null){
    if(manager.haveDelivered()){
        //已全部连接成功,并且前一轮投票都完成,需要再次发起投票
        sendNotifications();
    } else {
        //如果未收到选票信息,manager.contentAll()自动连接其它socket节点
        manager.connectAll();
    }
    /*
     * Exponential backoff
     */
    int tmpTimeOut = notTimeout*2;
    notTimeout = (tmpTimeOut < maxNotificationInterval?
            tmpTimeOut : maxNotificationInterval);
    LOG.info("Notification time out: " + notTimeout);
         }
     //....
    }
  //...
}
【Java】
//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)


private boolean startConnection(Socket sock, Long sid)
        throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        // Use BufferedOutputStream to reduce the number of IP packets. This is
        // important for x-DC scenarios.
        BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
        dout = new DataOutputStream(buf);


        // Sending id and challenge
        // represents protocol version (in other words - message type)
        dout.writeLong(PROTOCOL_VERSION);
        dout.writeLong(self.getId());
        String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();


        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }


    // authenticate learner
    QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
    if (qps != null) {
        // TODO - investigate why reconfig makes qps null.
        authLearner.authenticate(sock, qps.hostname);
    }


    // If lost the challenge, then drop the new connection
    //保证集群中所有节点之间只有一个通道连接
    if (sid > self.getId()) {
        LOG.info("Have smaller server identifier, so dropping the " +
                "connection: (" + sid + ", " + self.getId() + ")");
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);


        SendWorker vsw = senderWorkerMap.get(sid);


        if(vsw != null)
            vsw.finish();


        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(
                SEND_CAPACITY));


        sw.start();
        rw.start();


        return true;


    }
    return false;
}

如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap中。对应第2步中的sid

图4 节点之间连接方式

【Java】


public Vote lookForLeader() throws InterruptedException {
//...
    if (n.electionEpoch > logicalclock.get()) {
        //当前选举周期小于选票周期,重置recvset选票池
        //大于当前周期更新当前选票信息,再次发送投票
        logicalclock.set(n.electionEpoch);
        recvset.clear();
        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
        } else {
            updateProposal(getInitId(),
                    getInitLastLoggedZxid(),
                    getPeerEpoch());
        }
        sendNotifications();
    } else if (n.electionEpoch < logicalclock.get()) {
        if(LOG.isDebugEnabled()){
            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                    + Long.toHexString(n.electionEpoch)
                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
        }
        break;
    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
            proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期
        //接收的选票与当前选票PK成功后,替换当前选票
        updateProposal(n.leader, n.zxid, n.peerEpoch);
        sendNotifications();
    }
//...


}

在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举:

  • 判断当前选票和接收过来的选票周期是否一致
  • 大于当前周期更新当前选票信息,再次发送投票
  • 周期相等:当前选票信息和接收的选票信息进行PK
【Java】
//接收的选票与当前选票PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }


        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
  }

在上述代码中的totalOrderPredicate方法逻辑如下:

  • 竞选周期大于当前周期为true
  • 竞选周期相等,竞选zxid大于当前zxid为true
  • 竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true
  • 经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
   //存储节点对应的选票信息
    // key:选票来源sid  value:选票推举的Leader sid
    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));


    //半数选举开始
    if (termPredicate(recvset,
            new Vote(proposedLeader, proposedZxid,
                    logicalclock.get(), proposedEpoch))) {
        // Verify if there is any change in the proposed leader
        while((n = recvqueue.poll(finalizeWait,
                TimeUnit.MILLISECONDS)) != null){
            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)){
                recvqueue.put(n);
                break;
            }
        }
        /*WorkerSender
         * This predicate is true once we don't read any new
         * relevant message from the reception queue
         */
        if (n == null) {
            //已选举出leader 更新当前节点是否为leader 
            self.setPeerState((proposedLeader == self.getId()) ?
                    ServerState.LEADING: learningState());


            Vote endVote = new Vote(proposedLeader,
                    proposedZxid, proposedEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }
//...
}
/**
     * Termination predicate. Given a set of votes, determines if have
     * sufficient to declare the end of the election round.
     *
     * @param votes
     *            Set of votes
     * @param vote
     *            Identifier of the vote received last  PK后的选票
     */
private boolean termPredicate(HashMap votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }
    /*
     * First make the views consistent. Sometimes peers will have different
     * zxids for a server depending on timing.
     */
    //votes 来源于recvset 存储各个节点推举出来的选票信息
    for (Map.Entry entry : votes.entrySet()) {
//选举出的sid和其它节点选择的sid相同存储到voteSet变量中。
        if (vote.equals(entry.getValue())) {
//保存推举出来的sid
            voteSet.addAck(entry.getKey());
        }
    }
    //判断选举出来的选票数量是否过半
    return voteSet.hasAllQuorums();
}
//QuorumMaj#containsQuorum
public boolean containsQuorum(Set ackSet) {
    return (ackSet.size() > half);
   }

在上述代码中:recvset是存储每个sid推举的选票信息。

第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。

  • setPeerState更新当前节点角色;
  • proposedLeader选举出来的sid和自己sid相等,设置为Leader;
  • 上述条件不相等,设置为Follower或Observing;
  • 更新currentVote当前选票为Leader的选票vote(2,0,1)。


07总结

通过对Leader选举源码的解析,可以了解到:

1. 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。

2. 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。

3. 为BIO在多线程技术上的实践带来了宝贵的经验。

相关推荐

驱动网卡(怎么从新驱动网卡)
驱动网卡(怎么从新驱动网卡)

网卡一般是指为电脑主机提供有线无线网络功能的适配器。而网卡驱动指的就是电脑连接识别这些网卡型号的桥梁。网卡只有打上了网卡驱动才能正常使用。并不是说所有的网卡一插到电脑上面就能进行数据传输了,他都需要里面芯片组的驱动文件才能支持他进行数据传输...

2026-01-30 00:37 liuian

win10更新助手装系统(微软win10更新助手)

1、点击首页“系统升级”的按钮,给出弹框,告诉用户需要上传IMEI码才能使用升级服务。同时给出同意和取消按钮。华为手机助手2、点击同意,则进入到“系统升级”功能华为手机助手华为手机助手3、在检测界面,...

windows11专业版密钥最新(windows11专业版激活码永久)

 Windows11专业版的正版密钥,我们是对windows的激活所必备的工具。该密钥我们可以通过微软商城或者通过计算机的硬件供应商去购买获得。获得了windows11专业版的正版密钥后,我...

手机删过的软件恢复(手机删除过的软件怎么恢复)
手机删过的软件恢复(手机删除过的软件怎么恢复)

操作步骤:1、首先,我们需要先打开手机。然后在许多图标中找到带有[文件管理]文本的图标,然后单击“文件管理”进入页面。2、进入页面后,我们将在顶部看到一行文本:手机,最新信息,文档,视频,图片,音乐,收藏,最后是我们正在寻找的[更多],单击...

2026-01-29 23:55 liuian

一键ghost手动备份系统步骤(一键ghost 备份)

  步骤1、首先把装有一键GHOST装系统的U盘插在电脑上,然后打开电脑马上按F2或DEL键入BIOS界面,然后就选择BOOT打USDHDD模式选择好,然后按F10键保存,电脑就会马上重启。  步骤...

怎么创建局域网(怎么创建局域网打游戏)

  1、购买路由器一台。进入路由器把dhcp功能打开  2、购买一台交换机。从路由器lan端口拉出一条网线查到交换机的任意一个端口上。  3、两台以上电脑。从交换机任意端口拉出网线插到电脑上(电脑设置...

精灵驱动器官方下载(精灵驱动手机版下载)

是的。驱动精灵是一款集驱动管理和硬件检测于一体的、专业级的驱动管理和维护工具。驱动精灵为用户提供驱动备份、恢复、安装、删除、在线更新等实用功能。1、全新驱动精灵2012引擎,大幅提升硬件和驱动辨识能力...

一键还原系统步骤(一键还原系统有哪些)

1、首先需要下载安装一下Windows一键还原程序,在安装程序窗口中,点击“下一步”,弹出“用户许可协议”窗口,选择“我同意该许可协议的条款”,并点击“下一步”。  2、在弹出的“准备安装”窗口中,可...

电脑加速器哪个好(电脑加速器哪款好)

我认为pp加速器最好用,飞速土豆太懒,急速酷六根本不工作。pp加速器什么网页都加速,太任劳任怨了!以上是个人观点,具体性能请自己试。ps:我家电脑性能很好。迅游加速盒子是可以加速电脑的。因为有过之...

任何u盘都可以做启动盘吗(u盘必须做成启动盘才能装系统吗)

是的,需要注意,U盘的大小要在4G以上,最好是8G以上,因为启动盘里面需要装系统,内存小的话,不能用来安装系统。内存卡或者U盘或者移动硬盘都可以用来做启动盘安装系统。普通的U盘就可以,不过最好U盘...

u盘怎么恢复文件(u盘文件恢复的方法)

开360安全卫士,点击上面的“功能大全”。点击文件恢复然后点击“数据”下的“文件恢复”功能。选择驱动接着选择需要恢复的驱动,选择接入的U盘。点击开始扫描选好就点击中间的“开始扫描”,开始扫描U盘数据。...

系统虚拟内存太低怎么办(系统虚拟内存占用过高什么原因)

1.检查系统虚拟内存使用情况,如果发现有大量的空闲内存,可以尝试释放一些不必要的进程,以释放内存空间。2.如果系统虚拟内存使用率较高,可以尝试增加系统虚拟内存的大小,以便更多的应用程序可以使用更多...

剪贴板权限设置方法(剪贴板访问权限)
剪贴板权限设置方法(剪贴板访问权限)

1、首先打开iphone手机,触碰并按住单词或图像直到显示选择选项。2、其次,然后选取“拷贝”或“剪贴板”。3、勾选需要的“权限”,最后选择开启,即可完成苹果剪贴板权限设置。仅参考1.打开苹果手机设置按钮,点击【通用】。2.点击【键盘】,再...

2026-01-29 21:37 liuian

平板系统重装大师(平板重装win系统)

如果你的平板开不了机,但可以连接上电脑,那就能好办,楼主下载安装个平板刷机王到你的个人电脑上,然后连接你的平板,平板刷机王会自动识别你的平板,平板刷机王上有你平板的我刷机包,楼主点击下载一个,下载完成...

联想官网售后服务网点(联想官网售后服务热线)

联想3c服务中心是联想旗下的官方售后,是基于互联网O2O模式开发的全新服务平台。可以为终端用户提供多品牌手机、电脑以及其他3C类产品的维修、保养和保险服务。根据客户需求层次,联想服务针对个人及家庭客户...