Browse Source

ZOOKEEPER-3385: Add admin command to display leader

Author: Brian Nixon <nixon@fb.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Michael Han <hanm@apache.org>

Closes #939 from enixon/cmd-leader
Brian Nixon 6 years ago
parent
commit
f63a831d67

+ 25 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

@@ -126,6 +126,7 @@ public class Commands {
         registerCommand(new GetTraceMaskCommand());
         registerCommand(new IsroCommand());
         registerCommand(new LastSnapshotCommand());
+        registerCommand(new LeaderCommand());
         registerCommand(new MonitorCommand());
         registerCommand(new RuokCommand());
         registerCommand(new SetTraceMaskCommand());
@@ -318,6 +319,30 @@ public class Commands {
         }
     }
 
+    /**
+     * Returns the leader status of this instance and the leader host string.
+     */
+    public static class LeaderCommand extends CommandBase {
+        public LeaderCommand() {
+            super(Arrays.asList("leader", "lead"));
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            CommandResponse response = initializeResponse();
+            if (zkServer instanceof QuorumZooKeeperServer) {
+                response.put("is_leader", zkServer instanceof LeaderZooKeeperServer);
+                QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
+                response.put("leader_id", peer.getLeaderId());
+                String leaderAddress = peer.getLeaderAddress();
+                response.put("leader_ip", leaderAddress != null ? leaderAddress : "");
+            } else {
+                response.put("error", "server is not initialized");
+            }
+            return response;
+        }
+    }
+
     /**
      * Some useful info for monitoring. Returned map contains:
      *   - "version": String

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

@@ -93,6 +93,7 @@ public class Follower extends Learner{
                 }
                 long startTime = Time.currentElapsedTime();
                 try {
+                    self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                     syncWithLeader(newEpochZxid);
                 } finally {
                     long syncTime = Time.currentElapsedTime() - startTime;

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -581,6 +581,7 @@ public class Leader implements LearnerMaster {
 
              waitForEpochAck(self.getId(), leaderStateSummary);
              self.setCurrentEpoch(epoch);
+            self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
 
              try {
                  waitForNewLeaderAck(self.getId(), zk.getZxid());

+ 5 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

@@ -109,9 +109,11 @@ public class Observer extends Learner{
             try {
                 connectToLeader(master.addr, master.hostname);
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
-                if (self.isReconfigStateChange())
-                   throw new Exception("learned about role change");
- 
+                if (self.isReconfigStateChange()) {
+                    throw new Exception("learned about role change");
+                }
+
+                self.setLeaderAddressAndId(master.addr, master.getId());
                 syncWithLeader(newLeaderZxid);
                 QuorumPacket qp = new QuorumPacket();
                 while (this.isRunning() && nextLearnerMaster.get() == null) {

+ 28 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -41,6 +41,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.sasl.SaslException;
@@ -746,12 +747,36 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     }
 
     private ServerState state = ServerState.LOOKING;
-    
+
+    private AtomicReference<String> leaderAddress = new AtomicReference<String>("");
+    private AtomicLong leaderId = new AtomicLong(-1);
+
     private boolean reconfigFlag = false; // indicates that a reconfig just committed
 
-    public synchronized void setPeerState(ServerState newState){
-        state=newState;
+    public synchronized void setPeerState(ServerState newState) {
+        state = newState;
+        if (newState == ServerState.LOOKING) {
+            setLeaderAddressAndId(null, -1);
+        }
     }
+
+    public void setLeaderAddressAndId(InetSocketAddress addr, long newId) {
+        if (addr != null) {
+            leaderAddress.set(addr.getHostString());
+        } else {
+            leaderAddress.set(null);
+        }
+        leaderId.set(newId);
+    }
+
+    public String getLeaderAddress() {
+        return leaderAddress.get();
+    }
+
+    public long getLeaderId() {
+        return leaderId.get();
+    }
+
     public synchronized void reconfigFlagSet(){
        reconfigFlag = true;
     }