Browse Source

ZOOKEEPER-3459: Add admin command to display synced state of peer

Author: Brian Nixon <nixon@fb.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Norbert Kalmar <nkalmar@apache.org>

Closes #1012 from enixon/cmd-sync-state
Brian Nixon 6 years ago
parent
commit
cc900a3b05

+ 35 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

@@ -44,6 +44,7 @@ import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,6 +139,7 @@ public class Commands {
         registerCommand(new WatchCommand());
         registerCommand(new WatchesByPathCommand());
         registerCommand(new WatchSummaryCommand());
+        registerCommand(new ZabStateCommand());
         registerCommand(new SystemPropertiesCommand());
         registerCommand(new InitialConfigurationCommand());
     }
@@ -616,6 +618,39 @@ public class Commands {
         }
     }
 
+    /**
+     * Returns the current phase of Zab protocol that peer is running.
+     * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
+     */
+    public static class ZabStateCommand extends CommandBase {
+        public ZabStateCommand() {
+            super(Arrays.asList("zabstate"), false);
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            CommandResponse response = initializeResponse();
+            if (zkServer instanceof QuorumZooKeeperServer) {
+                QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
+                QuorumPeer.ZabState zabState = peer.getZabState();
+                QuorumVerifier qv = peer.getQuorumVerifier();
+
+                QuorumPeer.QuorumServer voter = qv.getVotingMembers().get(peer.getId());
+                boolean voting = (
+                        voter != null
+                                && voter.addr.equals(peer.getQuorumAddress())
+                                && voter.electionAddr.equals(peer.getElectionAddress())
+                );
+                response.put("voting", voting);
+                response.put("zabstate", zabState.name().toLowerCase());
+            } else {
+                response.put("voting", false);
+                response.put("zabstate", "");
+            }
+            return response ;
+        }
+    }
+
     /**
      * All defined system properties.
      */

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

@@ -78,6 +78,7 @@ public class Follower extends Learner{
         self.end_fle = 0;
         fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer leaderServer = findLeader();
             try {
                 connectToLeader(leaderServer.addr, leaderServer.hostname);
@@ -95,7 +96,9 @@ public class Follower extends Learner{
                 long startTime = Time.currentElapsedTime();
                 try {
                     self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
+                    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                     syncWithLeader(newEpochZxid);
+                    self.setZabState(QuorumPeer.ZabState.BROADCAST);
                 } finally {
                     long syncTime = Time.currentElapsedTime() - startTime;
                     ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -546,6 +546,7 @@ public class Leader implements LearnerMaster {
         zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
 
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             self.tick.set(0);
             zk.loadData();
 
@@ -616,6 +617,7 @@ public class Leader implements LearnerMaster {
              waitForEpochAck(self.getId(), leaderStateSummary);
              self.setCurrentEpoch(epoch);
             self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
+            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
 
              try {
                  waitForNewLeaderAck(self.getId(), zk.getZxid());
@@ -667,6 +669,7 @@ public class Leader implements LearnerMaster {
                 self.setZooKeeperServer(zk);
             }
 
+            self.setZabState(QuorumPeer.ZabState.BROADCAST);
             self.adminServer.setZooKeeperServer(zk);
 
             // Everything is a go, simply start counting the ticks

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -409,9 +409,11 @@ public class Learner {
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+                self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                 snapshotNeeded = false;
             }
             else if (qp.getType() == Leader.SNAP) {
+                self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                 LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                 // The leader is going to dump the database
                 // db is clear as part of deserializeSnapshot()
@@ -434,6 +436,7 @@ public class Learner {
                 syncSnapshot = true;
             } else if (qp.getType() == Leader.TRUNC) {
                 //we need to truncate the log to the lastzxid of the leader
+                self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                 LOG.warn("Truncating log to get in sync with the leader 0x"
                         + Long.toHexString(qp.getZxid()));
                 boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
@@ -592,6 +595,7 @@ public class Learner {
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
         sock.setSoTimeout(self.tickTime * self.syncLimit);
+        self.setSyncMode(QuorumPeer.SyncMode.NONE);
         zk.startup();
         /*
          * Update the election vote here to ensure that all members of the

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

@@ -105,6 +105,7 @@ public class Observer extends Learner{
         zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
 
         try {
+            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
             QuorumServer master = findLearnerMaster();
             try {
                 connectToLeader(master.addr, master.hostname);
@@ -114,7 +115,9 @@ public class Observer extends Learner{
                 }
 
                 self.setLeaderAddressAndId(master.addr, master.getId());
+                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                 syncWithLeader(newLeaderZxid);
+                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning() && nextLearnerMaster.get() == null) {
                     readPacket(qp);

+ 52 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -417,6 +417,22 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         LOOKING, FOLLOWING, LEADING, OBSERVING;
     }
 
+    /**
+     * (Used for monitoring) shows the current phase of
+     * Zab protocol that peer is running.
+     */
+    public enum ZabState {
+        ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST;
+    }
+
+    /**
+     * (Used for monitoring) When peer is in synchronization phase, this shows
+     * which synchronization mechanism is being used
+     */
+    public enum SyncMode {
+        NONE, DIFF, SNAP, TRUNC;
+    }
+
     /*
      * A peer can either be participating, which implies that it is willing to
      * both vote in instances of consensus and to elect or become a Leader, or
@@ -754,6 +770,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     private ServerState state = ServerState.LOOKING;
 
+    private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
+    private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
     private AtomicReference<String> leaderAddress = new AtomicReference<String>("");
     private AtomicLong leaderId = new AtomicLong(-1);
 
@@ -763,9 +781,30 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         state = newState;
         if (newState == ServerState.LOOKING) {
             setLeaderAddressAndId(null, -1);
+            setZabState(ZabState.ELECTION);
+        } else {
+            LOG.info("Peer state changed: {}", getDetailedPeerState());
         }
     }
 
+    public void setZabState(ZabState zabState) {
+        this.zabState.set(zabState);
+        LOG.info("Peer state changed: {}", getDetailedPeerState());
+    }
+
+    public void setSyncMode(SyncMode syncMode) {
+        this.syncMode.set(syncMode);
+        LOG.info("Peer state changed: {}", getDetailedPeerState());
+    }
+
+    public ZabState getZabState() {
+        return zabState.get();
+    }
+
+    public SyncMode getSyncMode() {
+        return syncMode.get();
+    }
+
     public void setLeaderAddressAndId(InetSocketAddress addr, long newId) {
         if (addr != null) {
             leaderAddress.set(addr.getHostString());
@@ -783,6 +822,19 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         return leaderId.get();
     }
 
+    public String getDetailedPeerState() {
+        final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
+        final ZabState zabState = getZabState();
+        if (!ZabState.ELECTION.equals(zabState)) {
+            sb.append(" - ").append(zabState.toString().toLowerCase());
+        }
+        final SyncMode syncMode = getSyncMode();
+        if (!SyncMode.NONE.equals(syncMode)) {
+            sb.append(" - ").append(syncMode.toString().toLowerCase());
+        }
+        return sb.toString();
+    }
+
     public synchronized void reconfigFlagSet(){
        reconfigFlag = true;
     }

+ 6 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java

@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.util.function.BiConsumer;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -215,4 +216,9 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
         rootContext.unregisterGauge("quorum_size");
     }
 
+    @Override
+    public void dumpMonitorValues(BiConsumer<String, Object> response) {
+        super.dumpMonitorValues(response);
+        response.accept("peer_state", self.getDetailedPeerState());
+    }
 }