Forráskód Böngészése

ZOOKEEPER-822. Leader election taking a long time to complete

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@1005123 13f79535-47bb-0310-9956-ffa450edef68
Patrick D. Hunt 14 éve
szülő
commit
4720482978

+ 3 - 0
CHANGES.txt

@@ -107,6 +107,9 @@ BUGFIXES:
   ZOOKEEPER-844. handle auth failure in java client
   (Camille Fournier via phunt)
 
+  ZOOKEEPER-822. Leader election taking a long time to complete
+  (Vishal K via phunt)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

+ 17 - 0
src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml

@@ -919,6 +919,23 @@ server.3=zoo3:2888:3888</programlisting>
               </para>
             </listitem>
           </varlistentry>
+          
+          <varlistentry>
+            <term>cnxTimeout</term>
+
+            <listitem>
+              <para>(Java system property: zookeeper.<emphasis
+              role="bold">cnxTimeout</emphasis>)</para>
+
+              <para>Sets the timeout value for opening connections for leader election notifications. 
+              Only applicable if you are using electionAlg 3. 
+              </para>
+
+              <note>
+                <para>Default value is 5 seconds.</para>
+              </note>
+            </listitem>
+          </varlistentry>
         </variablelist>
         <para></para>
       </section>

+ 57 - 17
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
 
 /**
  * This class implements a connection manager for leader election using TCP. It
- * maintains one coonection for every pair of servers. The tricky part is to
+ * maintains one connection for every pair of servers. The tricky part is to
  * guarantee that there is exactly one connection for every pair of servers that
  * are operating correctly and that can communicate over the network.
  * 
@@ -74,6 +74,12 @@ public class QuorumCnxManager {
     
     private long observerCounter = -1;
     
+    /*
+     * Connection time out value in milliseconds 
+     */
+    
+    private int cnxTO = 5000;
+    
     /*
      * Local IP address
      */
@@ -118,6 +124,11 @@ public class QuorumCnxManager {
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
         this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
         
+        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
+        if(cnxToValue != null){
+            this.cnxTO = new Integer(cnxToValue); 
+        }
+        
         this.self = self;
 
         // Starts listener thread that waits for connection requests 
@@ -131,9 +142,12 @@ public class QuorumCnxManager {
      */
     public void testInitiateConnection(long sid) throws Exception {
         SocketChannel channel;
-        LOG.debug("Opening channel to server "  + sid);
-        channel = SocketChannel
-                .open(self.getVotingView().get(sid).electionAddr);
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Opening channel to server "  + sid);
+        }
+        
+        channel = SocketChannel.open();
+        channel.socket().connect(self.getVotingView().get(sid).electionAddr, cnxTO);
         channel.socket().setTcpNoDelay(true);
         initiateConnection(channel, sid);
     }
@@ -173,11 +187,11 @@ public class QuorumCnxManager {
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
-            senderWorkerMap.put(sid, sw);
             
             if(vsw != null)
                 vsw.finish();
-
+            
+            senderWorkerMap.put(sid, sw);
             if (!queueSendMap.containsKey(sid)) {
                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                         CAPACITY));
@@ -258,11 +272,12 @@ public class QuorumCnxManager {
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
-            senderWorkerMap.put(sid, sw);
             
             if(vsw != null)
                 vsw.finish();
-
+            
+            senderWorkerMap.put(sid, sw);
+            
             if (!queueSendMap.containsKey(sid)) {
                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                         CAPACITY));
@@ -343,9 +358,12 @@ public class QuorumCnxManager {
             }
             try {
                 SocketChannel channel;
-                LOG.debug("Opening channel to server "  + sid);
-                channel = SocketChannel
-                        .open(self.getView().get(sid).electionAddr);
+                if(LOG.isDebugEnabled()){
+                    LOG.debug("Opening channel to server "  + sid);
+                }
+                
+                channel = SocketChannel.open();
+                channel.socket().connect(self.getView().get(sid).electionAddr, cnxTO);                
                 channel.socket().setTcpNoDelay(true);
                 initiateConnection(channel, sid);
             } catch (UnresolvedAddressException e) {
@@ -520,10 +538,19 @@ public class QuorumCnxManager {
         }
                 
         synchronized boolean finish() {
+            if(LOG.isDebugEnabled()){
+                LOG.debug("Calling finish");
+            }
+            
+            if(!running){
+                /*
+                 * Avoids running finish() twice. 
+                 */
+                return running;
+            }
+            
             running = false;
-
-            LOG.debug("Calling finish");
-            this.interrupt();
+            
             try{
                 channel.close();
             } catch (IOException e) {
@@ -534,6 +561,10 @@ public class QuorumCnxManager {
             this.interrupt();
             if (recvWorker != null)
                 recvWorker.finish();
+            
+            if(LOG.isDebugEnabled()){
+                LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
+            }
             senderWorkerMap.remove(sid);
             return running;
         }
@@ -585,7 +616,8 @@ public class QuorumCnxManager {
                     }
                 }
             } catch (Exception e) {
-                LOG.warn("Exception when using channel: " + sid, e);
+                LOG.warn("Exception when using channel: for id " + sid + " my id = " + 
+                        self.getId() + " error = " + e);
             }
             this.finish();
             LOG.warn("Send worker leaving thread");
@@ -612,7 +644,14 @@ public class QuorumCnxManager {
          * @return boolean  Value of variable running
          */
         synchronized boolean finish() {
-            running = false;
+            if(!running){
+                /*
+                 * Avoids running finish() twice. 
+                 */
+                return running;
+            }
+            running = false;            
+
             this.interrupt();
             return running;
         }
@@ -657,7 +696,8 @@ public class QuorumCnxManager {
                 }
 
             } catch (Exception e) {
-                LOG.warn("Connection broken: ", e);
+                LOG.warn("Connection broken for id " + sid + ", my id = " + 
+                        self.getId() + ", error = " + e);
             } finally {
                 try{
                     channel.socket().close();

+ 33 - 2
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

@@ -22,6 +22,7 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
@@ -161,8 +162,38 @@ public class CnxManagerTest extends ZKTestCase {
             if(thread.failed)
                 Assert.fail("Did not receive expected message");
         }
+        
     }
+    @Test
+    public void testCnxManagerTimeout() throws Exception {
+        Random rand = new Random();
+        byte b = (byte) rand.nextInt();
+        int deadPort = PortAssignment.unique();
+        String deadAddress = new String("10.1.1." + b);
+            
+        LOG.info("This is the dead address I'm trying: " + deadAddress);
+            
+        peers.put(Long.valueOf(2),
+                new QuorumServer(2,
+                        new InetSocketAddress(deadAddress, deadPort),
+                        new InetSocketAddress(deadAddress, PortAssignment.unique())));
+        tmpdir[2] = ClientBase.createTmpDir();
+        port[2] = deadPort;
+            
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
 
-
-
+        long begin = System.currentTimeMillis();
+        cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+        long end = System.currentTimeMillis();
+            
+        if((end - begin) > 6000) Assert.fail("Waited more than necessary");
+        
+    }       
 }