Kaynağa Gözat

ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election) (flavio via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@761816 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 16 yıl önce
ebeveyn
işleme
c9e8662617

+ 3 - 1
CHANGES.txt

@@ -35,7 +35,9 @@ BUGFIXES:
   tickTime from config is lost, cannot start quorum (phunt via mahadev)
   tickTime from config is lost, cannot start quorum (phunt via mahadev)
 
 
   ZOOKEEPER-360. WeakHashMap in Bookie.java causes NPE (flavio via mahadev)
   ZOOKEEPER-360. WeakHashMap in Bookie.java causes NPE (flavio via mahadev)
-
+  
+  ZOOKEEPER-362. Issues with FLENewEpochTest. (fix bug in Fast leader election)
+(flavio via mahadev)
 
 
 IMPROVEMENTS:
 IMPROVEMENTS:
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java

@@ -562,6 +562,7 @@ public class FastLeaderElection implements Election {
                             n.epoch + ", " + self.getId() + ", " + self.getPeerState() + 
                             n.epoch + ", " + self.getId() + ", " + self.getPeerState() + 
                             ", " + n.state + ", " + n.sid);
                             ", " + n.state + ", " + n.sid);
                     if (n.epoch > logicalclock) {
                     if (n.epoch > logicalclock) {
+                        LOG.debug("Increasing logical clock: " + n.epoch);
                         logicalclock = n.epoch;
                         logicalclock = n.epoch;
                         recvset.clear();
                         recvset.clear();
                         if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid()))
                         if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid()))

+ 10 - 11
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -238,13 +238,8 @@ public class QuorumCnxManager {
                 SendWorker sw = senderWorkerMap.get(sid);
                 SendWorker sw = senderWorkerMap.get(sid);
 
 
                 LOG.info("Create new connection to server: " + sid);
                 LOG.info("Create new connection to server: " + sid);
-                //sw.connect();
                 s.socket().close();
                 s.socket().close();
-                if(sw != null) sw.finish();
-                SocketChannel channel = SocketChannel.open(self.quorumPeers.get(sid).electionAddr);
-                if (channel.isConnected()) {
-                    initiateConnection(channel, sid);
-                }
+                connectOne(sid);
                 
                 
             } catch (IOException e) {
             } catch (IOException e) {
                 LOG.info("Error when closing socket or trying to reopen connection: "
                 LOG.info("Error when closing socket or trying to reopen connection: "
@@ -329,7 +324,7 @@ public class QuorumCnxManager {
      *  @param sid  server id
      *  @param sid  server id
      */
      */
     
     
-    void connectOne(long sid){
+    synchronized void connectOne(long sid){
         if ((senderWorkerMap.get(sid) == null)) {
         if ((senderWorkerMap.get(sid) == null)) {
             SocketChannel channel;
             SocketChannel channel;
             try {
             try {
@@ -395,13 +390,13 @@ public class QuorumCnxManager {
      */
      */
     class Listener extends Thread {
     class Listener extends Thread {
 
 
-        ServerSocketChannel ss = null;
+        volatile ServerSocketChannel ss = null;
         /**
         /**
          * Sleeps on accept().
          * Sleeps on accept().
          */
          */
         @Override
         @Override
         public void run() {
         public void run() {
-            ServerSocketChannel ss = null;
+            //ss = null;
             try {
             try {
                 ss = ServerSocketChannel.open();
                 ss = ServerSocketChannel.open();
                 int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                 int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
@@ -421,13 +416,17 @@ public class QuorumCnxManager {
                     receiveConnection(client);
                     receiveConnection(client);
                 }
                 }
             } catch (IOException e) {
             } catch (IOException e) {
-                System.err.println("Listener.run: " + e.getMessage());
+                LOG.error("Listener.run: " + e.getMessage());
             }
             }
         }
         }
         
         
         void halt(){
         void halt(){
             try{
             try{
-                if((ss != null) && (ss.isOpen())) ss.close();
+                LOG.debug("Trying to close listener: " + ss);
+                if(ss != null)/* && (ss.isOpen()))*/{
+                    LOG.debug("Closing listener: " + self.getId());
+                    ss.close();
+                }
             } catch (IOException e){
             } catch (IOException e){
                 LOG.warn("Exception when shutting down listener: " + e);
                 LOG.warn("Exception when shutting down listener: " + e);
             }
             }

+ 13 - 4
src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java

@@ -47,7 +47,7 @@ public class FLENewEpochTest extends TestCase {
     volatile int [] round;
     volatile int [] round;
     
     
     Semaphore start0;
     Semaphore start0;
-    Semaphore finish3;
+    Semaphore finish3, finish0;
     
     
     @Override
     @Override
     public void setUp() throws Exception {
     public void setUp() throws Exception {
@@ -66,6 +66,7 @@ public class FLENewEpochTest extends TestCase {
         round[2] = 0;
         round[2] = 0;
         
         
         start0 = new Semaphore(0);
         start0 = new Semaphore(0);
+        finish0 = new Semaphore(0);
         finish3 = new Semaphore(0);
         finish3 = new Semaphore(0);
         
         
         LOG.info("SetUp " + getName());
         LOG.info("SetUp " + getName());
@@ -117,11 +118,18 @@ public class FLENewEpochTest extends TestCase {
             		switch(i){
             		switch(i){
             		case 0:
             		case 0:
             			LOG.info("First peer, do nothing, just join");
             			LOG.info("First peer, do nothing, just join");
-            			flag = false;
+            			if(finish0.tryAcquire(1000, java.util.concurrent.TimeUnit.MILLISECONDS)){
+            			//if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+            			    LOG.info("Setting flag to false");
+            			    flag = false;
+            			}
             			break;
             			break;
             		case 1:
             		case 1:
             			LOG.info("Second entering case");
             			LOG.info("Second entering case");
-            			if(round[1] != 0) flag = false;
+            			if(round[1] != 0){
+            			    finish0.release();
+            			    flag = false;
+            			}
             			else{
             			else{
             				finish3.acquire();
             				finish3.acquire();
             				start0.release();
             				start0.release();
@@ -167,7 +175,8 @@ public class FLENewEpochTest extends TestCase {
               thread.start();
               thread.start();
               threads.add(thread);
               threads.add(thread);
           }
           }
-          start0.acquire();
+          if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS))
+              fail("First leader election failed");
 
 
           QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
           QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
           peer.startLeaderElection();
           peer.startLeaderElection();