Explorar el Código

ZOOKEEPER-483. ZK fataled on me, and ugly

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@802995 13f79535-47bb-0310-9956-ffa450edef68
Patrick D. Hunt hace 15 años
padre
commit
b12aad6381

+ 3 - 2
CHANGES.txt

@@ -57,11 +57,12 @@ BUGFIXES:
 
   ZOOKEEPER-501. CnxManagerTest failed on hudson. (flavio via mahadev)
   
-  ZOOKEEPER-499. electionAlg should default to FLE (3) - regression (phunt via
-  mahadev) 
+  ZOOKEEPER-499. electionAlg should default to FLE (3) - regression
+  (phunt via mahadev) 
 
   ZOOKEEPER-477. zkCleanup.sh is flaky (fernando via mahadev)
 
+  ZOOKEEPER-483. ZK fataled on me, and ugly (breed via phunt)
 
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to

+ 22 - 2
src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java

@@ -43,13 +43,33 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
             try {
                 follower.writePacket(qp, false);
             } catch (IOException e) {
-                LOG.warn("Ignoring unexpected exception during packet send", e);
+                LOG.warn("Closing connection to leader, exception during packet send", e);
+                try {
+                    if (!follower.sock.isClosed()) {
+                        follower.sock.close();
+                    }
+                } catch (IOException e1) {
+                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
+                    LOG.debug("Ignoring error closing the connection", e1);
+                }
             }
         }
     }
     
     public void flush() throws IOException {
-        follower.writePacket(null, true);
+        try {
+            follower.writePacket(null, true);
+        } catch(IOException e) {
+            LOG.warn("Closing connection to leader, exception during packet send", e);
+            try {
+                if (!follower.sock.isClosed()) {
+                    follower.sock.close();
+                }
+            } catch (IOException e1) {
+                    // Nothing to do, we are shutting things down, so an exception here is irrelevant
+                    LOG.debug("Ignoring error closing the connection", e1);
+            }
+        }
     }
 
     public void shutdown() {

+ 40 - 0
src/java/test/org/apache/zookeeper/test/QuorumTest.java

@@ -23,12 +23,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.Before;
 import org.junit.Test;
@@ -93,6 +97,42 @@ public class QuorumTest extends QuorumBase {
     {
         ct.testClientWithWatcherObj();
     }
+    volatile int counter = 0;
+    volatile int errors = 0;
+    @Test
+    public void testLeaderShutdown() throws IOException, InterruptedException, KeeperException {
+        ZooKeeper zk = new DisconnectableZooKeeper(qb.hostPort, ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+            public void process(WatchedEvent event) {
+        }});
+        zk.create("/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/blah/blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Leader leader = qb.s1.leader;
+        if (leader == null) leader = qb.s2.leader;
+        if (leader == null) leader = qb.s3.leader;
+        if (leader == null) leader = qb.s4.leader;
+        if (leader == null) leader = qb.s5.leader;
+        assertNotNull(leader);
+        for(int i = 0; i < 10000; i++) {
+            zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                }
+            }, null);
+        }
+        ArrayList<FollowerHandler> fhs = new ArrayList<FollowerHandler>(leader.forwardingFollowers);
+        for(FollowerHandler f: fhs) {
+            f.sock.shutdownInput();
+        }
+        while(counter + errors < 10000) {
+            Thread.sleep(200);
+        }
+        assertTrue("We should have had some errors", errors != 0);
+        zk.close();
+    }
     @Test
     public void testMultipleWatcherObjs() throws IOException,
             InterruptedException, KeeperException