浏览代码

ZOOKEEPER-3253: client should not send requests with cxid=-4, -2, or -1

- Add test for cxid rollover to 1
- Modify ClientCnxn.SendThread.getXid() to increment from MAX to 1.

Author: Samuel Just <sjustsalesforce.com>

Reviewers: phuntapache.org

Closes #787 from athanatos/forupstream/ZOOKEEPER-3253

Change-Id: Ib3d111170bb086d6982f2cf0ee5cf8afd5157588
(cherry picked from commit e10c93a590cc1b73eebad48d18cfcbceb3ec0d4d)

Includes backport of createConnection testability refactor
from 9f82798415351a20136ceb1640b1781723e51cc1.

Signed-off-by: Samuel Just <sjustsalesforce.com>

Author: Samuel Just <sjust@salesforce.com>

Reviewers: phunt@apache.org

Closes #844 from athanatos/forupstream/ZOOKEEPER-3235-3.4
Samuel Just 6 年之前
父节点
当前提交
6604f36d93

+ 8 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -1383,7 +1383,8 @@ public class ClientCnxn {
         }
     }
 
-    private int xid = 1;
+    // @VisibleForTesting
+    protected int xid = 1;
 
     // @VisibleForTesting
     volatile States state = States.NOT_CONNECTED;
@@ -1393,6 +1394,12 @@ public class ClientCnxn {
      * the server. Thus, getXid() must be public.
      */
     synchronized public int getXid() {
+        // Avoid negative cxid values.  In particular, cxid values of -4, -2, and -1 are special and
+        // must not be used for requests -- see SendThread.readResponse.
+        // Skip from MAX to 1.
+        if (xid == Integer.MAX_VALUE) {
+            xid = 1;
+        }
         return xid++;
     }
 

+ 9 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java

@@ -448,12 +448,20 @@ public class ZooKeeper {
                 connectString);
         HostProvider hostProvider = new StaticHostProvider(
                 connectStringParser.getServerAddresses());
-        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
+        cnxn = createConnection(connectStringParser.getChrootPath(),
                 hostProvider, sessionTimeout, this, watchManager,
                 getClientCnxnSocket(), canBeReadOnly);
         cnxn.start();
     }
 
+    // @VisibleForTesting
+    protected ClientCnxn createConnection(String chrootPath,
+            HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+            boolean canBeReadOnly) throws IOException {
+        return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
+                watchManager, clientCnxnSocket, canBeReadOnly);
+    }
     /**
      * To create a ZooKeeper client object, the application needs to pass a
      * connection string containing a comma separated list of host:port pairs,

+ 34 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java

@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jute.Record;
+import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 
@@ -34,6 +35,39 @@ public class TestableZooKeeper extends ZooKeeper {
             Watcher watcher) throws IOException {
         super(host, sessionTimeout, watcher);
     }
+
+    class TestableClientCnxn extends ClientCnxn {
+        TestableClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
+                throws IOException {
+            super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
+                clientCnxnSocket, 0, new byte[16], canBeReadOnly);
+        }
+
+        void setXid(int newXid) {
+            xid = newXid;
+        }
+
+        int checkXid() {
+            return xid;
+        }
+    }
+
+    protected ClientCnxn createConnection(String chrootPath,
+            HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+            boolean canBeReadOnly) throws IOException {
+        return new TestableClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
+                watcher, clientCnxnSocket, canBeReadOnly);
+    }
+
+    public void setXid(int xid) {
+        ((TestableClientCnxn)cnxn).setXid(xid);
+    }
+
+    public int checkXid() {
+        return ((TestableClientCnxn)cnxn).checkXid();
+    }
     
     @Override
     public List<String> getChildWatches() {

+ 35 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java

@@ -26,6 +26,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -802,4 +804,37 @@ public class ClientTest extends ClientBase {
         Assert.assertTrue("failed to disconnect",
                 clientDisconnected.await(5000, TimeUnit.MILLISECONDS));
     }
+
+    @Test
+    public void testCXidRollover() throws Exception {
+        TestableZooKeeper zk = null;
+        try {
+            zk = createClient();
+            zk.setXid(Integer.MAX_VALUE - 10);
+
+            zk.create("/testnode", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+            for (int i = 0; i < 20; ++i) {
+                final CountDownLatch latch = new CountDownLatch(1);
+                final AtomicInteger rc = new AtomicInteger(0);
+                zk.setData("/testnode", "".getBytes(), -1,
+                    new AsyncCallback.StatCallback() {
+                        @Override
+                        public void processResult(int retcode, String path, Object ctx, Stat stat) {
+                            rc.set(retcode);
+                            latch.countDown();
+                        }
+                    }, null);
+                Assert.assertTrue("setData should complete within 5s",
+                    latch.await(zk.getSessionTimeout(), TimeUnit.MILLISECONDS));
+                Assert.assertEquals("setData should have succeeded", Code.OK.intValue(), rc.get());
+            }
+            zk.delete("/testnode", -1);
+            Assert.assertTrue("xid should be positive", zk.checkXid() > 0);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
 }