瀏覽代碼

ZOOKEEPER-907. Spurious "KeeperErrorCode = Session moved" messages

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@1031051 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 14 年之前
父節點
當前提交
dad9d5791b

+ 2 - 0
CHANGES.txt

@@ -145,6 +145,8 @@ BUGFIXES:
   ZOOKEEPER-898. C Client might not cleanup correctly during close 
   (jared cantwell via mahadev)
 
+  ZOOKEEPER-907. Spurious "KeeperErrorCode = Session moved" messages (vishal k via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

+ 21 - 0
src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java

@@ -56,6 +56,8 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String RETENTION_SECS = "retention_secs";
     protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
     protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
+    protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
+    protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
 
     // these are the derived attributes
     protected ByteString myRegionByteString = null;
@@ -244,6 +246,20 @@ public class ServerConfiguration extends AbstractConfiguration {
         return conf.getInt(MESSAGES_CONSUMED_THREAD_RUN_INTERVAL, 60000);
     }
 
+    // This parameter is used when Bookkeeper is the persistence store
+    // and indicates what the ensemble size is (i.e. how many bookie
+    // servers to stripe the ledger entries across).
+    public int getBkEnsembleSize() {
+        return conf.getInt(BK_ENSEMBLE_SIZE, 3);
+    }
+
+    // This parameter is used when Bookkeeper is the persistence store
+    // and indicates what the quorum size is (i.e. how many redundant
+    // copies of each ledger entry is written).
+    public int getBkQuorumSize() {
+        return conf.getInt(BK_QUORUM_SIZE, 2);
+    }
+
     /*
      * Is this a valid configuration that we can run with? This code might grow
      * over time.
@@ -262,6 +278,11 @@ public class ServerConfiguration extends AbstractConfiguration {
                     throw new ConfigurationException("Region defined does not have required SSL port: " + hubString);
             }
         }
+        // Validate that the Bookkeeper ensemble size >= quorum size.
+        if (getBkEnsembleSize() < getBkQuorumSize()) {
+            throw new ConfigurationException("BK ensemble size (" + getBkEnsembleSize()
+                    + ") is less than the quorum size (" + getBkQuorumSize() + ")");
+        }
 
         // add other checks here
     }

+ 1 - 4
src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java

@@ -608,10 +608,7 @@ public class BookkeeperPersistenceManager implements PersistenceManagerWithRange
          *            the same when we try to write
          */
         private void openNewTopicLedger(final int expectedVersionOfLedgersNode, final TopicInfo topicInfo) {
-            final int ENSEMBLE_SIZE = 3;
-            final int QUORUM_SIZE = 2;
-
-            bk.asyncCreateLedger(ENSEMBLE_SIZE, QUORUM_SIZE, DigestType.CRC32, passwd,
+            bk.asyncCreateLedger(cfg.getBkEnsembleSize(), cfg.getBkQuorumSize(), DigestType.CRC32, passwd,
                     new SafeAsynBKCallback.CreateCallback() {
                         boolean processed = false;
 

+ 3 - 0
src/java/main/org/apache/zookeeper/ZooKeeperMain.java

@@ -782,6 +782,9 @@ public class ZooKeeperMain {
             }
         } else if (cmd.equals("close")) {
                 zk.close();            
+        } else if (cmd.equals("sync") && args.length >= 2) {
+            path = args[1];
+            zk.sync(path, new AsyncCallback.VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("Sync returned " + rc); } }, null );
         } else if (cmd.equals("addauth") && args.length >=2 ) {
             byte[] b = null;
             if (args.length >= 3)

+ 5 - 5
src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -449,14 +449,14 @@ public class LearnerHandler extends Thread {
                     cxid = bb.getInt();
                     type = bb.getInt();
                     bb = bb.slice();
+                    Request si;
                     if(type == OpCode.sync){
-                     	leader.zk.submitRequest(new LearnerSyncRequest(this, sessionId, cxid, type, bb,
-                                qp.getAuthinfo()));
+                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                     } else {
-                        Request si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
-                        si.setOwner(this);
-                        leader.zk.submitRequest(si);
+                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                     }
+                    si.setOwner(this);
+                    leader.zk.submitRequest(si);
                     break;
                 default:
                 }

+ 14 - 1
src/java/test/org/apache/zookeeper/test/QuorumTest.java

@@ -186,7 +186,6 @@ public class QuorumTest extends QuorumBase {
      * @throws KeeperException
      */
     @Test
-    @Ignore
     public void testSessionMoved() throws Exception {
         String hostPorts[] = qb.hostPort.split(",");
         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
@@ -206,6 +205,20 @@ public class QuorumTest extends QuorumBase {
                     zk.getSessionId(),
                     zk.getSessionPasswd());
             zknew.setData("/", new byte[1], -1);
+            final int result[] = new int[1];
+            result[0] = Integer.MAX_VALUE;
+            zknew.sync("/", new AsyncCallback.VoidCallback() {
+                    public void processResult(int rc, String path, Object ctx) {
+                        synchronized(result) { result[0] = rc; result.notify(); }
+                    }
+                }, null);
+            synchronized(result) {
+                if(result[0] == Integer.MAX_VALUE) {
+                    result.wait(5000);
+                }
+            }
+            LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+            Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
             try {
                 zk.setData("/", new byte[1], -1);
                 Assert.fail("Should have lost the connection");

+ 14 - 1
src/java/test/org/apache/zookeeper/test/SessionTest.java

@@ -307,7 +307,6 @@ public class SessionTest extends ZKTestCase {
      * @throws KeeperException
      */
     @Test
-    @Ignore
     public void testSessionMove() throws Exception {
         String hostPorts[] = HOSTPORT.split(",");
         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
@@ -324,6 +323,20 @@ public class SessionTest extends ZKTestCase {
                     new MyWatcher(Integer.toString(i+1)),
                     zk.getSessionId(),
                     zk.getSessionPasswd());
+            final int result[] = new int[1];
+            result[0] = Integer.MAX_VALUE;
+            zknew.sync("/", new AsyncCallback.VoidCallback() {
+                    public void processResult(int rc, String path, Object ctx) {
+                        synchronized(result) { result[0] = rc; result.notify(); }
+                    }
+                }, null);
+            synchronized(result) {
+                if(result[0] == Integer.MAX_VALUE) {
+                    result.wait(5000);
+                }
+            }
+            LOG.info(hostPorts[(i+1)%hostPorts.length] + " Sync returned " + result[0]);
+            Assert.assertTrue(result[0] == KeeperException.Code.OK.intValue());
             zknew.setData("/", new byte[1], -1);
             try {
                 zk.setData("/", new byte[1], -1);