Bläddra i källkod

ZOOKEEPER-3421: Better insight into Observer connections

Author: Brian Nixon <nixon@fb.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Michael Han <hanm@apache.org>

Closes #978 from enixon/observer-cmds-and-metrics
Brian Nixon 6 år sedan
förälder
incheckning
02c203fa48

+ 10 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -179,6 +179,11 @@ public final class ServerMetrics {
 
         COMMIT_PROCESS_TIME = metricsContext.getSummary("commit_process_time", DetailLevel.BASIC);
 
+        /**
+         * Observer Master processing metrics.
+         */
+        OM_PROPOSAL_PROCESS_TIME = metricsContext.getSummary("om_proposal_process_time_ms", DetailLevel.ADVANCED);
+        OM_COMMIT_PROCESS_TIME = metricsContext.getSummary("om_commit_process_time_ms", DetailLevel.ADVANCED);
 
         /**
          * Time spent by the final processor. This is tracked in the commit processor.
@@ -382,6 +387,11 @@ public final class ServerMetrics {
 
     public final Summary COMMIT_PROCESS_TIME;
 
+    /**
+     * Observer Master processing metrics.
+     */
+    public final Summary OM_PROPOSAL_PROCESS_TIME;
+    public final Summary OM_COMMIT_PROCESS_TIME;
 
     /**
      * Time spent by the final processor. This is tracked in the commit processor.

+ 64 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java

@@ -38,6 +38,8 @@ import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.persistence.SnapshotInfo;
+import org.apache.zookeeper.server.quorum.Follower;
+import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -126,11 +128,13 @@ public class Commands {
         registerCommand(new LastSnapshotCommand());
         registerCommand(new LeaderCommand());
         registerCommand(new MonitorCommand());
+        registerCommand(new ObserverCnxnStatResetCommand());
         registerCommand(new RuokCommand());
         registerCommand(new SetTraceMaskCommand());
         registerCommand(new SrvrCommand());
         registerCommand(new StatCommand());
         registerCommand(new StatResetCommand());
+        registerCommand(new SyncedObserverConsCommand());
         registerCommand(new WatchCommand());
         registerCommand(new WatchesByPathCommand());
         registerCommand(new WatchSummaryCommand());
@@ -381,6 +385,28 @@ public class Commands {
 
         }}
 
+    /**
+     * Reset all observer connection statistics.
+     */
+    public static class ObserverCnxnStatResetCommand extends CommandBase {
+        public ObserverCnxnStatResetCommand() {
+            super(Arrays.asList("observer_connection_stat_reset", "orst"));
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            CommandResponse response = initializeResponse();
+            if (zkServer instanceof LeaderZooKeeperServer) {
+                Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
+                leader.resetObserverConnectionStats();
+            } else if (zkServer instanceof FollowerZooKeeperServer) {
+                Follower follower = ((FollowerZooKeeperServer) zkServer).getFollower();
+                follower.resetObserverConnectionStats();
+            }
+            return response;
+        }
+    }
+
     /**
      * No-op command, check if the server is running
      */
@@ -496,6 +522,44 @@ public class Commands {
         }
     }
 
+    /**
+     * Information on observer connections to server. Returned Map contains:
+     *   - "synced_observers": Integer (leader/follower only)
+     *   - "observers": list of observer learner handler info objects (leader/follower only)
+     * @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo()
+     */
+    public static class SyncedObserverConsCommand extends CommandBase {
+        public SyncedObserverConsCommand() {
+            super(Arrays.asList("observers", "obsr"));
+        }
+
+        @Override
+        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+
+            CommandResponse response = initializeResponse();
+
+            if (zkServer instanceof LeaderZooKeeperServer) {
+                Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
+
+                response.put("synced_observers", leader.getObservingLearners().size());
+                response.put("observers", leader.getObservingLearnersInfo());
+                return response;
+            } else if (zkServer instanceof FollowerZooKeeperServer) {
+                Follower follower = ((FollowerZooKeeperServer) zkServer).getFollower();
+                Integer syncedObservers = follower.getSyncedObserverSize();
+                if (syncedObservers != null) {
+                    response.put("synced_observers", syncedObservers);
+                    response.put("observers", follower.getSyncedObserversInfo());
+                    return response;
+                }
+            }
+
+            response.put("synced_observers", 0);
+            response.put("observers", Collections.emptySet());
+            return response;
+        }
+    }
+
     /**
      * Watch information aggregated by session. Returned Map contains:
      *   - "session_id_to_watched_paths": Map&lt;Long, Set&lt;String&gt;&gt; session ID -&gt; watched paths

+ 19 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

@@ -19,8 +19,9 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.ZooDefs.OpCode;
@@ -174,14 +175,18 @@ public class Follower extends Learner{
                 }
             }
             if (om != null) {
+                final long startTime = Time.currentElapsedTime();
                 om.proposalReceived(qp);
+                ServerMetrics.getMetrics().OM_PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
             }
             break;
         case Leader.COMMIT:
             ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
             fzk.commit(qp.getZxid());
             if (om != null) {
+                final long startTime = Time.currentElapsedTime();
                 om.proposalCommitted(qp.getZxid());
+                ServerMetrics.getMetrics().OM_COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - startTime);
             }
             break;
             
@@ -251,6 +256,19 @@ public class Follower extends Learner{
         return  om == null ? null : om.getNumActiveObservers();
     }
 
+    public Iterable<Map<String, Object>> getSyncedObserversInfo() {
+        if (om != null && om.getNumActiveObservers() > 0) {
+            return om.getActiveObservers();
+        }
+        return Collections.emptySet();
+    }
+
+    public void resetObserverConnectionStats() {
+        if (om != null && om.getNumActiveObservers() > 0) {
+            om.resetObserverConnectionStats();
+        }
+    }
+
     @Override
     public void shutdown() {    
         LOG.info("shutdown called", new Exception("shutdown Follower"));

+ 18 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -204,6 +204,24 @@ public class Leader implements LearnerMaster {
         }
     }
 
+    public Iterable<Map<String, Object>> getObservingLearnersInfo() {
+        Set<Map<String,Object>> info = new HashSet<>();
+        synchronized (observingLearners) {
+            for (LearnerHandler lh: observingLearners) {
+                info.add(lh.getLearnerHandlerInfo());
+            }
+        }
+        return info;
+    }
+
+    public void resetObserverConnectionStats() {
+        synchronized (observingLearners) {
+            for (LearnerHandler lh : observingLearners) {
+                lh.resetObserverConnectionStats();
+            }
+        }
+    }
+
     // Pending sync requests. Must access under 'this' lock.
     private final Map<Long,List<LearnerSyncRequest>> pendingSyncs =
         new HashMap<Long,List<LearnerSyncRequest>>();

+ 56 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -23,13 +23,18 @@ import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.lang.System;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
@@ -99,6 +104,24 @@ public class LearnerHandler extends ZooKeeperThread {
      */
     final LinkedBlockingQueue<QuorumPacket> queuedPackets =
         new LinkedBlockingQueue<QuorumPacket>();
+    private final AtomicLong queuedPacketsSize = new AtomicLong();
+
+    protected final AtomicLong packetsReceived = new AtomicLong();
+    protected final AtomicLong packetsSent = new AtomicLong();
+
+    protected final AtomicLong requestsReceived = new AtomicLong();
+
+    protected volatile long lastZxid = -1;
+
+    public synchronized long getLastZxid() {
+        return lastZxid;
+    }
+
+    protected final Date established = new Date();
+
+    public Date getEstablished() {
+        return (Date)established.clone();
+    }
 
     /**
      * Marker packets would be added to quorum packet queue after every
@@ -297,6 +320,7 @@ public class LearnerHandler extends ZooKeeperThread {
                     continue;
                 }
 
+                queuedPacketsSize.addAndGet(-packetSize(p));
                 if (p == proposalOfDeath) {
                     // Packet of death!
                     break;
@@ -310,7 +334,13 @@ public class LearnerHandler extends ZooKeeperThread {
                 if (LOG.isTraceEnabled()) {
                     ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                 }
+
+                // Log the zxid of the last request, if it is a valid zxid.
+                if (p.getZxid() > 0) {
+                    lastZxid = p.getZxid();
+                }
                 oa.writeRecord(p, "packet");
+                packetsSent.incrementAndGet();
             } catch (IOException e) {
                 if (!sock.isClosed()) {
                     LOG.warn("Unexpected exception at " + this, e);
@@ -602,6 +632,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 }
                 tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();
 
+                packetsReceived.incrementAndGet();
 
                 ByteBuffer bb;
                 long sessionId;
@@ -647,6 +678,7 @@ public class LearnerHandler extends ZooKeeperThread {
                     }
                     si.setOwner(this);
                     learnerMaster.submitLearnerRequest(si);
+                    requestsReceived.incrementAndGet();
                     break;
                 default:
                     LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
@@ -1025,6 +1057,7 @@ public class LearnerHandler extends ZooKeeperThread {
                 packetCounter.getAndIncrement() % markerPacketInterval == 0) {
             queuedPackets.add(new MarkerQuorumPacket(System.nanoTime()));
         }
+        queuedPacketsSize.addAndGet(packetSize(p));
     }
 
     static long packetSize(QuorumPacket p) {
@@ -1041,6 +1074,29 @@ public class LearnerHandler extends ZooKeeperThread {
         return isAlive() && learnerMaster.getCurrentTick() <= tickOfNextAckDeadline;
     }
 
+    public synchronized Map<String, Object> getLearnerHandlerInfo() {
+        Map<String, Object> info = new LinkedHashMap<>(9);
+        info.put("remote_socket_address", getRemoteAddress());
+        info.put("sid", getSid());
+        info.put("established", getEstablished());
+        info.put("queued_packets", queuedPackets.size());
+        info.put("queued_packets_size", queuedPacketsSize.get());
+        info.put("packets_received", packetsReceived.longValue());
+        info.put("packets_sent", packetsSent.longValue());
+        info.put("requests", requestsReceived.longValue());
+        info.put("last_zxid", getLastZxid());
+
+        return info;
+    }
+
+    public synchronized void resetObserverConnectionStats() {
+        packetsReceived.set(0);
+        packetsSent.set(0);
+        requestsReceived.set(0);
+
+        lastZxid = -1;
+    }
+
     /**
      * For testing, return packet queue
      * @return

+ 16 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverMaster.java

@@ -33,6 +33,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -510,6 +512,20 @@ public class ObserverMaster implements LearnerMaster, Runnable {
         return activeObservers.size();
     }
 
+    public Iterable<Map<String, Object>> getActiveObservers() {
+        Set<Map<String,Object>> info = new HashSet<>();
+        for (LearnerHandler lh:activeObservers) {
+            info.add(lh.getLearnerHandlerInfo());
+        }
+        return info;
+    }
+
+    public void resetObserverConnectionStats() {
+        for (LearnerHandler lh:activeObservers) {
+            lh.resetObserverConnectionStats();
+        }
+    }
+
     int getPktsSizeLimit() {
         return pktsSizeLimit;
     }

+ 12 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java

@@ -119,6 +119,18 @@ public class CommandsTest extends ClientBase {
                 );
     }
 
+    @Test
+    public void testObservers() throws IOException, InterruptedException {
+        testCommand("observers",
+                new Field("synced_observers", Integer.class),
+                new Field("observers", Iterable.class));
+    }
+
+    @Test
+    public void testObserverConnectionStatReset() throws IOException, InterruptedException {
+        testCommand("observer_connection_stat_reset");
+    }
+
     @Test
     public void testConnectionStatReset() throws IOException, InterruptedException {
         testCommand("connection_stat_reset");