Browse Source

ZOOKEEPER-410. address all findbugs warnings in client/server classes.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@776806 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 16 năm trước cách đây
mục cha
commit
cff0aedb18

+ 2 - 0
CHANGES.txt

@@ -76,6 +76,8 @@ BUGFIXES:
 
   ZOOKEEPER-405. nullpointer exception in zookeeper java shell. (mahadev via breed)
 
+  ZOOKEEPER-410. address all findbugs warnings in client/server classes. (phunt via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.
   (breed via mahadev)

+ 26 - 5
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -240,6 +240,20 @@ public class ClientCnxn {
     }
 
 
+    /**
+     * Creates a connection object. The actual network connect doesn't get
+     * established until needed. The start() instance method must be called
+     * subsequent to construction.
+     *
+     * @param hosts
+     *                a comma separated list of hosts that can be connected to.
+     * @param sessionTimeout
+     *                the timeout for connections.
+     * @param zooKeeper
+     *                the zookeeper object that this connection is related to.
+     * @param watcher watcher for this connection
+     * @throws IOException
+     */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
             ClientWatchManager watcher)
         throws IOException
@@ -249,7 +263,8 @@ public class ClientCnxn {
 
     /**
      * Creates a connection object. The actual network connect doesn't get
-     * established until needed.
+     * established until needed. The start() instance method must be called
+     * subsequent to construction.
      *
      * @param hosts
      *                a comma separated list of hosts that can be connected to.
@@ -257,12 +272,15 @@ public class ClientCnxn {
      *                the timeout for connections.
      * @param zooKeeper
      *                the zookeeper object that this connection is related to.
-     * @throws KeeperException
+     * @param watcher watcher for this connection
+     * @param sessionId session id if re-establishing session
+     * @param sessionPasswd session passwd if re-establishing session
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
             ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
-        throws IOException {
+        throws IOException
+    {
         this.zooKeeper = zooKeeper;
         this.watcher = watcher;
         this.sessionId = sessionId;
@@ -286,10 +304,13 @@ public class ClientCnxn {
         Collections.shuffle(serverAddrs);
         sendThread = new SendThread();
         eventThread = new EventThread();
+    }
+
+    public void start() {
         sendThread.start();
         eventThread.start();
     }
-    
+
     Object eventOfDeath = new Object();
 
     final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
@@ -298,7 +319,7 @@ public class ClientCnxn {
         }
     };
 
-    private class WatcherSetEventPair {
+    private static class WatcherSetEventPair {
         private final Set<Watcher> watchers;
         private final WatchedEvent event;
         

+ 2 - 0
src/java/main/org/apache/zookeeper/ZooKeeper.java

@@ -344,6 +344,7 @@ public class ZooKeeper {
 
         watchManager.defaultWatcher = watcher;
         cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager);
+        cnxn.start();
     }
 
     /**
@@ -388,6 +389,7 @@ public class ZooKeeper {
         watchManager.defaultWatcher = watcher;
         cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager,
                 sessionId, sessionPasswd);
+        cnxn.start();
     }
 
     /**

+ 1 - 0
src/java/main/org/apache/zookeeper/ZooKeeperMain.java

@@ -447,6 +447,7 @@ public class ZooKeeperMain {
             try {
                 children = zk.getChildren(quotaPath, false);
             } catch(KeeperException.NoNodeException ne) {
+                LOG.debug("child removed during quota check", ne);
                 return;
             }
             if (children.size() == 0) {

+ 5 - 5
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -22,9 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.KeeperException.Code;
@@ -48,6 +47,7 @@ import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
 
@@ -83,12 +83,12 @@ public class FinalRequestProcessor implements RequestProcessor {
         synchronized (zks.outstandingChanges) {
             while (!zks.outstandingChanges.isEmpty()
                     && zks.outstandingChanges.get(0).zxid <= request.zxid) {
-                if (zks.outstandingChanges.get(0).zxid < request.zxid) {
+                ChangeRecord cr = zks.outstandingChanges.remove(0);
+                if (cr.zxid < request.zxid) {
                     LOG.warn("Zxid outstanding "
-                            + zks.outstandingChanges.get(0).zxid
+                            + cr.zxid
                             + " is less than current " + request.zxid);
                 }
-                ZooKeeperServer.ChangeRecord cr = zks.outstandingChanges.remove(0);
                 if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                     zks.outstandingChangesForPath.remove(cr.path);
                 }

+ 32 - 13
src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -87,6 +87,10 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
 
         int outstandingLimit = 1;
 
+        /** Create the factory, startup(zks) must be called subsequently.
+         * @param port listener port
+         * @throws IOException
+         */
         public Factory(int port) throws IOException {
             super("NIOServerCxn.Factory:" + port);
             setDaemon(true);
@@ -95,11 +99,20 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             ss.socket().bind(new InetSocketAddress(port));
             ss.configureBlocking(false);
             ss.register(selector, SelectionKey.OP_ACCEPT);
-            start();
         }
 
-        public void startup(ZooKeeperServer zks) throws IOException,
-                InterruptedException {
+        @Override
+        public void start() {
+            // ensure thread is started once and only once
+            if (getState() == Thread.State.NEW) {
+                super.start();
+            }
+        }
+
+        public void startup(ZooKeeperServer zks)
+            throws IOException, InterruptedException
+        {
+            start();
             zks.startup();
             setZooKeeperServer(zks);
         }
@@ -482,15 +495,17 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
         }
         if (h.getXid() >= 0) {
             synchronized (this) {
-                outstandingRequests++;
-                // check throttling
-                if (zk.getInProcess() > factory.outstandingLimit) {
-                    LOG.debug("Throttling recv " + zk.getInProcess());
-                    disableRecv();
-                    // following lines should not be needed since we are already
-                    // reading
-                    // } else {
-                    // enableRecv();
+                synchronized (this.factory) {
+                    outstandingRequests++;
+                    // check throttling
+                    if (zk.getInProcess() > factory.outstandingLimit) {
+                        LOG.debug("Throttling recv " + zk.getInProcess());
+                        disableRecv();
+                        // following lines should not be needed since we are
+                        // already reading
+                        // } else {
+                        // enableRecv();
+                    }
                 }
             }
         }
@@ -934,7 +949,11 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
          * The number of requests that have been submitted but not yet responded to.
          */
         public long getOutstandingRequests() {
-            return outstandingRequests;
+            synchronized (NIOServerCnxn.this) {
+                synchronized (NIOServerCnxn.this.factory) {
+                    return outstandingRequests;
+                }
+            }
         }
 
         public long getPacketsReceived() {

+ 0 - 2
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -86,8 +86,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
         super("ProcessThread:" + zks.getClientPort());
         this.nextProcessor = nextProcessor;
         this.zks = zks;
-
-        start();
     }
 
     @Override

+ 6 - 8
src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java

@@ -23,10 +23,10 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
-
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -68,8 +68,6 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     }
 
     static class SessionSet {
-        long expireTime;
-
         HashSet<Session> sessions = new HashSet<Session>();
     }
 
@@ -81,7 +79,9 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     }
 
     public SessionTrackerImpl(SessionExpirer expirer,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime, long sid) {
+            ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
+            long sid)
+    {
         super("SessionTracker");
         this.expirer = expirer;
         this.expirationInterval = tickTime;
@@ -89,10 +89,9 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         nextExpirationTime = roundToInterval(System.currentTimeMillis());
         this.serverId = sid;
         this.nextSessionId = initializeNextSession(sid);
-        for (long id : sessionsWithTimeout.keySet()) {
-            addSession(id, sessionsWithTimeout.get(id));
+        for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
+            addSession(e.getKey(), e.getValue());
         }
-        start();
     }
 
     volatile boolean running = true;
@@ -164,7 +163,6 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         set = sessionSets.get(s.tickTime);
         if (set == null) {
             set = new SessionSet();
-            set.expireTime = expireTime;
             sessionSets.put(expireTime, set);
         }
         set.sessions.add(s);

+ 0 - 1
src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -60,7 +60,6 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
         super("SyncThread:" + zks.getServerId());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
-        start();
     }
 
     @Override

+ 9 - 3
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -200,8 +200,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         };
         sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
         dataTree = treeBuilder.build();
-        long zxid = txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener);
-        this.hzxid = zxid;
+        setZxid(txnLogFactory.restore(dataTree,sessionsWithTimeouts,listener));
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         for (long session : dataTree.getSessions()) {
@@ -283,7 +282,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     /**
      * This should be called from a synchronized block on this!
      */
-    public long getZxid() {
+    synchronized public long getZxid() {
         return hzxid;
     }
 
@@ -291,6 +290,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return ++hzxid;
     }
 
+    synchronized public void setZxid(long zxid) {
+        hzxid = zxid;
+    }
+
     long getTime() {
         return System.currentTimeMillis();
     }
@@ -371,12 +374,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                 finalProcessor);
+        ((SyncRequestProcessor)syncProcessor).start();
         firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+        ((PrepRequestProcessor)firstProcessor).start();
     }
 
     protected void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts,
                 tickTime, 1);
+        ((SessionTrackerImpl)sessionTracker).start();
     }
 
     public boolean isRunning() {

+ 1 - 2
src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java

@@ -94,9 +94,8 @@ public class ZooKeeperServerMain {
                    File(config.dataLogDir), new File(config.dataDir));
             zkServer.setTxnLogFactory(ftxn);
             zkServer.setTickTime(config.tickTime);
-            zkServer.startup();
             cnxnFactory = new NIOServerCnxn.Factory(config.clientPort);
-            cnxnFactory.setZooKeeperServer(zkServer);
+            cnxnFactory.startup(zkServer);
             cnxnFactory.join();
             if (zkServer.isRunning()) {
                 zkServer.shutdown();

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -87,6 +87,7 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
         firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
         syncProcessor = new SyncRequestProcessor(this,
                 new SendAckRequestProcessor(getFollower()));
+        syncProcessor.start();
     }
 
     @Override

+ 2 - 4
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -66,6 +66,7 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
         RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                 commitProcessor);
         firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
+        ((PrepRequestProcessor)firstProcessor).start();
     }
 
     @Override
@@ -77,6 +78,7 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
     protected void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, sessionsWithTimeouts,
                 tickTime, self.getId());
+        ((SessionTrackerImpl)sessionTracker).start();
     }
 
 
@@ -84,10 +86,6 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
         return sessionTracker.touchSession(sess, to);
     }
 
-    public void setZxid(long zxid) {
-        hzxid = zxid;
-    }
-
     @Override
     protected void registerJMX() {
         // register with JMX

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

@@ -39,6 +39,7 @@ public class ProposalRequestProcessor implements RequestProcessor {
         this.nextProcessor = nextProcessor;
         AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
         syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
+        syncProcessor.start();
     }
 
     public void processRequest(Request request) {

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -306,6 +306,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     
     @Override
     public synchronized void start() {
+        cnxnFactory.start();
         startLeaderElection();
         super.start();
     }

+ 38 - 3
src/java/test/config/findbugsExcludeFile.xml

@@ -1,5 +1,40 @@
 <FindBugsFilter>
-     <Match>
-       <Package name="org.apache.jute.compiler.generated" />
-     </Match>
+  <!-- Allow command line utilities, which follow pattern *Main.java, to call
+       system exit -->
+  <Match>
+    <Class name="~org\.apache\.zookeeper\..*Main" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
+
+  <!-- This is too complicated to resolve/ingrained into the architecture
+       In particular we want to make sure we exit if this occurs
+       Also notice logged as fatal error -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.SyncRequestProcessor" />
+    <Method name="run" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
+
+  <!-- In particular we want to make sure we exit if this occurs, unrecoverable.
+       Also notice logged as fatal error -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.ZooKeeperServer" />
+    <Method name="takeSnapshot" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
+
+
+  <!-- We want to catch all exceptions and cleanup, regardless of source
+       (incl runtime) -->
+  <Match>
+    <Class name="org.apache.zookeeper.ClientCnxn$SendThread" />
+    <Method name="run" />
+    <Bug pattern="REC_CATCH_EXCEPTION" />
+  </Match>
+
+
+  <Match>
+    <Package name="org.apache.jute.compiler.generated" />
+  </Match>
+
 </FindBugsFilter>