Browse Source

ZOOKEEPER-407. address all findbugs warnings in org.apache.zookeeper.server.quorum.** packages.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@776826 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 16 years ago
parent
commit
e936eceb9e

+ 2 - 0
CHANGES.txt

@@ -80,6 +80,8 @@ BUGFIXES:
   
   ZOOKEEPER-403. cleanup javac compiler warnings. (flavio via breed)
 
+  ZOOKEEPER-407. address all findbugs warnings in org.apache.zookeeper.server.quorum.** packages. (flavio via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.
   (breed via mahadev)

+ 9 - 1
src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java

@@ -18,14 +18,18 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import org.apache.log4j.Logger;
+
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 
+
 /**
  * This is a very simple RequestProcessor that simply forwards a request from a
  * previous stage to the leader as an ACK.
  */
 class AckRequestProcessor implements RequestProcessor {
+    private static final Logger LOG = Logger.getLogger(AckRequestProcessor.class);
     Leader leader;
 
     AckRequestProcessor(Leader leader) {
@@ -36,7 +40,11 @@ class AckRequestProcessor implements RequestProcessor {
      * Forward the request as an ACK to the leader
      */
     public void processRequest(Request request) {
-        leader.processAck(leader.self.getId(), request.zxid, null);
+        QuorumPeer self = leader.self;
+        if(self != null)
+            leader.processAck(self.getId(), request.zxid, null);
+        else
+            LOG.error("Null QuorumPeer");
     }
 
     public void shutdown() {

+ 33 - 36
src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java

@@ -27,6 +27,8 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
 import java.util.concurrent.TimeUnit;
 import java.util.Random;
 
@@ -38,6 +40,7 @@ import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 
+
 public class AuthFastLeaderElection implements Election {
     private static final Logger LOG = Logger.getLogger(AuthFastLeaderElection.class);
 
@@ -192,8 +195,8 @@ public class AuthFastLeaderElection implements Election {
         long lastEpoch;
         LinkedBlockingQueue<Long> acksqueue;
         HashMap<Long, Long> challengeMap;
-        HashMap<Long, Long> challengeMutex;
-        HashMap<Long, Long> ackMutex;
+        HashMap<Long, Semaphore> challengeMutex;
+        HashMap<Long, Semaphore> ackMutex;
         HashMap<InetSocketAddress, HashMap<Long, Long>> addrChallengeMap;
 
         class WorkerReceiver implements Runnable {
@@ -208,18 +211,20 @@ public class AuthFastLeaderElection implements Election {
 
             boolean saveChallenge(long tag, long challenge) {
 
-                Long l = challengeMutex.get(tag);
+                //Long l = challengeMutex.get(tag);
+                Semaphore s = challengeMutex.get(tag);
+                if (s != null) {
+                        synchronized (challengeMap) {
+                            challengeMap.put(tag, challenge);
+                            challengeMutex.remove(tag);
+                        }
 
-                synchronized (challengeMap) {
-                    challengeMap.put(tag, challenge);
-                    challengeMutex.remove(tag);
-                }
-
-                if (l != null) {
-                    synchronized(l){
-                        l.notify();
-                    }
+                
+                        s.release();
+                } else {
+                    LOG.error("No challenge mutex object");
                 }
+                
 
                 return true;
             }
@@ -343,12 +348,12 @@ public class AuthFastLeaderElection implements Election {
                     // Upon reception of an ack message, remove it from the
                     // queue
                     case 3:
-                        Long l = ackMutex.get(tag);
-                        if (l != null) {
-                            synchronized(l){
-                                l.notify();
-                            }
-                        }
+                        Semaphore s = ackMutex.get(tag);
+                        
+                        if(s != null)
+                            s.release();
+                        else LOG.error("Empty ack semaphore");
+                        
                         acksqueue.offer(tag);
 
                         if (authEnabled) {
@@ -568,10 +573,11 @@ public class AuthFastLeaderElection implements Election {
                                     double timeout = ackWait
                                             * java.lang.Math.pow(2, attempts);
 
-                                    Long l = Long.valueOf(m.tag);
-                                    synchronized (l) {
-                                        challengeMutex.put(m.tag, l);
-                                        l.wait((long) timeout);
+                                    //Long l = new Long(m.tag);
+                                    Semaphore s = new Semaphore(0);
+                                    synchronized (s) {
+                                        challengeMutex.put(m.tag, s);
+                                        s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
                                         myChallenge = challengeMap
                                                 .containsKey(m.tag);
                                     }
@@ -596,13 +602,11 @@ public class AuthFastLeaderElection implements Election {
                             }
                             mySocket.send(requestPacket);
                             try {
-                                Long l = Long.valueOf(m.tag);
+                                Semaphore s = new Semaphore(0);
                                 double timeout = ackWait
                                         * java.lang.Math.pow(10, attempts);
-                                synchronized (l) {
-                                    ackMutex.put(m.tag, l);
-                                    l.wait((int) timeout);
-                                }
+                                ackMutex.put(m.tag, s);
+                                s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS);
                             } catch (InterruptedException e) {
                                 LOG.warn("Ack exception: ", e);
                             }
@@ -694,8 +698,8 @@ public class AuthFastLeaderElection implements Election {
             mySocket = s;
             acksqueue = new LinkedBlockingQueue<Long>();
             challengeMap = new HashMap<Long, Long>();
-            challengeMutex = new HashMap<Long, Long>();
-            ackMutex = new HashMap<Long, Long>();
+            challengeMutex = new HashMap<Long, Semaphore>();
+            ackMutex = new HashMap<Long, Semaphore>();
             addrChallengeMap = new HashMap<InetSocketAddress, HashMap<Long, Long>>();
             lastProposedLeader = 0;
             lastProposedZxid = 0;
@@ -759,14 +763,10 @@ public class AuthFastLeaderElection implements Election {
 
     private void leaveInstance() {
         logicalclock++;
-        // sendqueue.clear();
-        // recvqueue.clear();
     }
 
     private void sendNotifications() {
         for (QuorumServer server : self.quorumPeers.values()) {
-            //InetSocketAddress saddr = new InetSocketAddress(server.addr
-            //        .getAddress(), port);
 
             ToSend notmsg = new ToSend(ToSend.mType.notification,
                     AuthFastLeaderElection.sequencer++, proposedLeader,
@@ -906,9 +906,6 @@ public class AuthFastLeaderElection implements Election {
                                         (proposedLeader == self.getId()) ? 
                                          ServerState.LEADING :
                                          ServerState.FOLLOWING);
-                                // if (self.state == ServerState.FOLLOWING) {
-                                // Thread.sleep(100);
-                                // }
     
                                 leaveInstance();
                                 return new Vote(proposedLeader, proposedZxid);

+ 2 - 3
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -58,7 +58,6 @@ public class CommitProcessor extends Thread implements RequestProcessor {
         super("CommitProcessor:" + id);
         this.nextProcessor = nextProcessor;
         this.matchSyncs = matchSyncs;
-        start();
     }
 
     volatile boolean finished = false;
@@ -141,8 +140,8 @@ public class CommitProcessor extends Thread implements RequestProcessor {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("Unexpected exception causing exit", e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted exception while waiting", e);
         }
         LOG.info("CommitProcessor exited loop!");
     }

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/Follower.java

@@ -228,6 +228,7 @@ public class Follower {
                         LOG.fatal("Got unexpected packet from leader "
                                 + qp.getType() + " exiting ... " );
                         System.exit(13);
+
                     }
                     zk.dataTree.lastProcessedZxid = newLeaderZxid;
                 }

+ 0 - 1
src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java

@@ -79,7 +79,6 @@ public class FollowerHandler extends Thread {
         this.sock = sock;
         this.leader = leader;
         leader.addFollowerHandler(this);
-        start();
     }
     
     @Override

+ 0 - 1
src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java

@@ -48,7 +48,6 @@ public class FollowerRequestProcessor extends Thread implements
         super("FollowerRequestProcessor:" + zks.getServerId());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
-        start();
     }
 
     @Override

+ 2 - 0
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -84,7 +84,9 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         commitProcessor = new CommitProcessor(finalProcessor,
                 Long.toString(getServerId()), true);
+        commitProcessor.start();
         firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
+        ((FollowerRequestProcessor) firstProcessor).start();
         syncProcessor = new SyncRequestProcessor(this,
                 new SendAckRequestProcessor(getFollower()));
         syncProcessor.start();

+ 12 - 4
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -220,7 +220,8 @@ public class Leader {
                         Socket s = ss.accept();
                         s.setSoTimeout(self.tickTime * self.syncLimit);
                         s.setTcpNoDelay(nodelay);
-                        new FollowerHandler(s, Leader.this);
+                        FollowerHandler fh = new FollowerHandler(s, Leader.this);
+                        fh.start();
                     } catch (SocketException e) {
                         if (stop) {
                             LOG.info("exception while shutting down acceptor: "
@@ -262,7 +263,11 @@ public class Leader {
             epoch++;
             zk.setZxid(epoch << 32L);
             zk.dataTree.lastProcessedZxid = zk.getZxid();
-            lastProposed = zk.getZxid();
+            
+            synchronized(this){
+                lastProposed = zk.getZxid();
+            }
+            
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                     null, null);
             if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
@@ -547,7 +552,9 @@ public class Leader {
      * @param zxid
      */
     public void commit(long zxid) {
-        lastCommitted = zxid;
+        synchronized(this){
+            lastCommitted = zxid;
+        }
         QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
         sendPacket(qp);
     }
@@ -657,8 +664,9 @@ public class Leader {
         }
         synchronized (forwardingFollowers) {
             forwardingFollowers.add(handler);
-            return lastProposed;
         }
+        
+        return lastProposed;
     }
 
 }

+ 1 - 1
src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java

@@ -134,7 +134,7 @@ public class LeaderElection implements Election  {
                 s = new DatagramSocket();
                 s.setSoTimeout(200);
             } catch (SocketException e1) {
-                e1.printStackTrace();
+                LOG.error("Socket exception when creating socket for leader election", e1);
                 System.exit(4);
             }
             DatagramPacket requestPacket = new DatagramPacket(requestBytes,

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -63,6 +63,7 @@ public class LeaderZooKeeperServer extends ZooKeeperServer {
                 finalProcessor, getLeader().toBeApplied);
         commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                 Long.toString(getServerId()), false);
+        commitProcessor.start();
         RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                 commitProcessor);
         firstProcessor = new PrepRequestProcessor(this, proposalProcessor);

+ 1 - 1
src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

@@ -28,7 +28,7 @@ import org.apache.zookeeper.server.SyncRequestProcessor;
  */
 public class ProposalRequestProcessor implements RequestProcessor {
     LeaderZooKeeperServer zks;
-
+    
     RequestProcessor nextProcessor;
 
     SyncRequestProcessor syncProcessor;

+ 32 - 15
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -123,7 +123,6 @@ public class QuorumCnxManager {
 
         // Starts listener thread that waits for connection requests 
         listener = new Listener();
-        listener.start();
     }
 
     /**
@@ -162,7 +161,10 @@ public class QuorumCnxManager {
         	
         	if (senderWorkerMap
         			.containsKey(sid)) {
-        		senderWorkerMap.get(sid).finish();
+        	    SendWorker vsw = senderWorkerMap.get(sid);
+        	    if(vsw != null)
+        	        vsw.finish();
+        	    else LOG.error("No SendWorker for this identifier (" + sid + ")");
         	}
 
         	if (!queueSendMap.containsKey(sid)) {
@@ -226,7 +228,10 @@ public class QuorumCnxManager {
         	sw.setRecv(rw);
 
         	if (senderWorkerMap.containsKey(sid)) {
-        		senderWorkerMap.get(sid).finish();
+        	    SendWorker vsw = senderWorkerMap.get(sid);
+        	    if(vsw != null)
+        	        vsw.finish();
+        	    else LOG.error("No SendWorker for this identifier (" + sid + ")");
         	}
                     
         	senderWorkerMap.put(sid, sw);
@@ -267,15 +272,21 @@ public class QuorumCnxManager {
                  * Start a new connection if doesn't have one already.
                  */
                 if (!queueSendMap.containsKey(sid)) {
-                    queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
-                            CAPACITY));
-                    queueSendMap.get(sid).put(b);
+                    ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
+                            CAPACITY);
+                    queueSendMap.put(sid, bq);
+                    bq.put(b);
 
                 } else {
-                    if (queueSendMap.get(sid).remainingCapacity() == 0) {
-                        queueSendMap.get(sid).take();
+                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                    if(bq != null){
+                        if (bq.remainingCapacity() == 0) {
+                            bq.take();
+                        }
+                        bq.put(b);
+                    } else {
+                        LOG.error("No queue for server " + sid);
                     }
-                    queueSendMap.get(sid).put(b);
                 } 
                 
                 connectOne(sid);
@@ -443,7 +454,13 @@ public class QuorumCnxManager {
 
                 ByteBuffer b = null;
                 try {
-                    b = queueSendMap.get(sid).take();
+                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);                    
+                    if(bq != null) 
+                        b = bq.take();
+                    else {
+                        LOG.error("No queue of incoming messages for server " + sid);
+                        this.finish();
+                    }
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted while waiting for message on queue",
                             e);
@@ -472,11 +489,11 @@ public class QuorumCnxManager {
                         recvWorker = null;
                     
                         senderWorkerMap.remove(sid);
-                    
-                        if (queueSendMap.get(sid)
-                                    .size() == 0)
-                            queueSendMap.get(sid)
-                                    .offer(b);
+                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                        if(bq != null){
+                            if (bq.size() == 0)
+                                bq.offer(b);
+                        } else LOG.error("No queue for server " + sid);
                     }
                 }
             }

+ 9 - 3
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -313,7 +313,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
 
     ResponderThread responder;
     
-    public void stopLeaderElection() {
+    synchronized public void stopLeaderElection() {
         responder.running = false;
         responder.interrupt();
     }
@@ -401,8 +401,14 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
-            le = new FastLeaderElection(this,
-                        new QuorumCnxManager(this));
+            QuorumCnxManager mng = new QuorumCnxManager(this);
+            QuorumCnxManager.Listener listener = mng.listener;
+            if(listener != null){
+                listener.start();
+                le = new FastLeaderElection(this,mng);
+            } else {
+                LOG.error("Null listener when initializing cnx manager");
+            }
             break;
         default:
             assert false;

+ 23 - 0
src/java/test/config/findbugsExcludeFile.xml

@@ -33,6 +33,29 @@
   </Match>
 
 
+   <!-- If we cannot open a socket to elect a leader, then we should
+            simply exit -->
+   <Match>
+     <Class name="org.apache.zookeeper.server.quorum.LeaderElection" />
+       <Method name="lookForLeader" />
+       <Bug pattern="DM_EXIT" />
+   </Match>
+
+   <!-- Committing out of order is an unrecoverable error, so we should
+              really exit  -->
+   <Match>
+     <Class name="org.apache.zookeeper.server.quorum.FollowerZooKeeperServer" />
+       <Method name="commit" />
+       <Bug pattern="DM_EXIT" />
+     </Match>
+
+   <!-- Two unrecoverable errors while following the leader  -->
+   <Match>
+     <Class name="org.apache.zookeeper.server.quorum.Follower" />
+       <Method name="followLeader" />
+       <Bug pattern="DM_EXIT" />
+   </Match>
+
   <Match>
     <Package name="org.apache.jute.compiler.generated" />
   </Match>