- 服务器角色
- 服务器状态
- 几个属性
- 源码分析
- FastLeaderElection
- 票据比较
- 投票结果 确认
Leader选举中涉及的概念 服务器角色
- leader 处理读写请求
- follower 参与leader选举,只能处理读请求并可将事务请求转交给leader来处理
- observer 不参与leader选举,只处理读请求
- LOOKING 服务器正处在选举状态 说明集群还在投票选举 还没有选出leader
- LEADING 服务器处于leading状态,当前server角色是leader
- FOLLOWING 服务器处于following状态,表示当前server的角色是follower
- OBSERVING 服务器处于observing状态,当前server角色是observer
myid
这个代表服务器ID,其值例如1、2、3等,在$ZK_HOME/data/myid文件中配置。myid值越大 在leader选举过程中的权重越大
zxid
最近一次处理成功的事务ID,zxid越大 说明数据越新,在leader选举过程中的权重越大
例如下面例子中的 mZxid
[zk: localhost:2181(CONNECTED) 3] create /wojiushiwo 123 Created /wojiushiwo [zk: localhost:2181(CONNECTED) 7] stat /wojiushiwo cZxid = 0x1400000002 ctime = Wed Aug 03 10:55:58 CST 2022 mZxid = 0x1400000002 mtime = Wed Aug 03 10:55:58 CST 2022 pZxid = 0x1400000002 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 0 [zk: localhost:2181(CONNECTED) 8] set /wojiushiwo 1234 cZxid = 0x1400000002 ctime = Wed Aug 03 10:55:58 CST 2022 mZxid = 0x1400000003 mtime = Wed Aug 03 10:56:59 CST 2022 pZxid = 0x1400000002 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 0
epoch
代表投票轮数,从0开始,每投完一次票 投票轮数+1。一般而言 同一轮投票的epoch是相同的,当然也有不同的情况,那就是有节点宕机后重新参与leader选举。
源码分析以zookeeper3.6.1版本为例 分析下Leader选举过程
zookeeper3.6.1 Leader选举算法的实现类为FastLeaderElection
对流程图一些调用进行分析
1、QuorumPeer是个线程实现类,其run方法中有leader选举的操作
2、WorkerSender#run 将当前节点票据从sendqueue队列取出 广播给集群中的节点
3、WorkerReceiver#run 收到集群中其他节点的票据 将其放到recvqueue队列 用于PK投票
FastLeaderElection// 与集群中其他节点进行通信 QuorumCnxManager manager; //发送票据的队列 LinkedBlockingQueuesendqueue; //接受票据的队列 LinkedBlockingQueue recvqueue; //当前节点 QuorumPeer self; Messenger messenger; //逻辑时钟 代表选举轮数 AtomicLong logicalclock = new AtomicLong(); //数据id 对应myid long proposedLeader; //事务id long proposedZxid; //选举轮数 long proposedEpoch; volatile boolean stop; private SyncedLearnerTracker leadingVoteSet;
public Vote lookForLeader() throws InterruptedException { //省略无关代码 self.start_fle = Time.currentElapsedTime(); try { Map票据比较recvset = new HashMap (); int notTimeout = minNotificationInterval; synchronized (this) { //逻辑时钟+1 表示开启新一轮投票 logicalclock.incrementAndGet(); //更新当前节点的票据 //getInitId 从myid中取值 //getInitLastLoggedZxid 获得上一次成功执行的事务id // getPeerEpoch 从currentEpoch文件中取值 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //省略日志 //发送票据 这里是自己给自己发 先投自己一票 sendNotifications(); SyncedLearnerTracker voteSet; //当集群刚启动时,集群节点状态是LOOKING 正忙着leader选举 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { //前面说过 recvqueue队列存储其他节点广播过来的票据 //这里 取出其他节点的票据 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //当没有取到票据时 if (n == null) { //如果queueSendMap集合为空 说明票据都发送出去了 if (manager.haveDelivered()) { //发送一次票据 //笔者不太理解这里,为什么要再发一次票据 而且这票据还是当前节点的票据,前面不是发过了吗? sendNotifications(); } else { //如果queueSendMap集合中还有数据 说明还有票据没有发送出去 可能集群连接断开了 需要重新连接 manager.connectAll(); } int tmpTimeOut = notTimeout * 2; notTimeout = Math.min(tmpTimeOut, maxNotificationInterval); LOG.info("Notification time out: {}", notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { //当取到了票据后 ,这里else if中的逻辑判断是:当前节点的myid是否属于集群节点 switch (n.state) { //集群启动时 只有节点状态是LOOKING的 才有资格参与选举 case LOOKING: if (getInitLastLoggedZxid() == -1) { LOG.debug("Ignoring notification as our zxid is -1"); break; } if (n.zxid == -1) { LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid); break; } //当接收到的选票 轮次高于当前节点选票轮次 说明当前节点"落后了" if (n.electionEpoch > logicalclock.get()) { System.out.println(">"); //重新设置当前节点选票轮次 logicalclock.set(n.electionEpoch); //并清空接受到的票据集合(改朝换代了 从头开始) recvset.clear(); //比较当前票据与其他节点的票据 比出高低后 重新为当前票据赋值 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //然后重新发送最新票据 sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { //当接收到的选票 轮次低于当前节点选票轮次 说明别的那个节点"落后了",选票直接丢弃不管了 //省略日志 break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //当前节点与别的节点在同一个选举轮次 大多数情况下 会走到这个循环里 //比出高低后 重新为当前票据赋值 updateProposal(n.leader, n.zxid, n.peerEpoch); //然后重新发送最新票据 sendNotifications(); } //省略日志 //记录接收到的票据 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //构造 SyncedLearnerTracker 查看收到的票据中 与当前机器票据一样的票据 并存储在SyncedLearnerTracker里 //统计选票 voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); //判断投票结果 如果当前节点获取了一半以上的投票 if (voteSet.hasAllQuorums()) { //如果在finalizeWait时间后再没有投票出现 则认为本轮选举结束 会设置LEADER //如果依然有投票出现 即下面n不为null,则再次执行上面的大while循环 再次比较票据、PK等 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } if (n == null) { //统计的选票中 当前节点票据超过总节点数量票据的一半 则当前节点是LEADER 状态是LEADING setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); //清空接受到的票据集合 leaveInstance(endVote); return endVote; } } break; //省略无关代码 }
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { //省略无关代码 return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
票据PK的比较流程:
1、选举轮数较大的直接获胜
2、选举轮数相同的,事务id较大的 直接获胜
3、选举轮数、事务id都相同的,myid值较大的 获胜
投票结果 确认zk 有两种leader选举衡量方式:基于权重和基于投票数,默认基于投票数
public QuorumMaj(MapallMembers) { this.allMembers = allMembers; for (QuorumServer qs : allMembers.values()) { //参与投票的节点 if (qs.type == LearnerType.PARTICIPANT) { votingMembers.put(Long.valueOf(qs.id), qs); } else { observingMembers.put(Long.valueOf(qs.id), qs); } } //half=参与投票节点数的一半 half = votingMembers.size() / 2; }
public boolean containsQuorum(SetackSet) { //ackSet 是 投给当前节点的节点数 return (ackSet.size() > half); }
针对前面的代码分析基础,这里简述下3个节点的zk集群启动时选主流程
zk节点1 myid=1、zk节点2 myid=2、zk节点3 myid=3
集群启动时 假设其投票轮数是一致的,并且由于数据同步的存在 其zxid也是一致的
1、节点1启动后 先给自己投一票
2、节点2启动后 先给自己投一票,并且与节点1 交换票据。
- 节点1 收到节点2的票据后 由于自己的myid较小 pk失败 更新自己的票据(投节点2一票)广播票据
- 节点2 收到节点1的票据后 由于自己的myid较大 完胜 广播票据。
- 此时节点2 获得两票 > (3/2=1) 因此节点2 晋升为leader,节点1成为follower
3、节点3启动后 先给自己投一票,并且与节点1、节点2交换票据。虽然节点3的myid比较大,但是节点2已经是leader了,因此节点3也就变成follower了。