浏览代码

ZOOKEEPER-3253: client should not send requests with cxid=-4, -2, or -1

- Add test for cxid rollover to 1
- Modify ClientCnxn.SendThread.getXid() to increment from MAX to 1.

Author: Samuel Just <sjust@salesforce.com>

Reviewers: phunt@apache.org

Closes #787 from athanatos/forupstream/ZOOKEEPER-3253

Change-Id: Ib3d111170bb086d6982f2cf0ee5cf8afd5157588
Samuel Just 6 年之前
父节点
当前提交
e10c93a590

+ 8 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -1513,7 +1513,8 @@ public class ClientCnxn {
         }
     }
 
-    private int xid = 1;
+    // @VisibleForTesting
+    protected int xid = 1;
 
     // @VisibleForTesting
     volatile States state = States.NOT_CONNECTED;
@@ -1523,6 +1524,12 @@ public class ClientCnxn {
      * the server. Thus, getXid() must be public.
      */
     synchronized public int getXid() {
+        // Avoid negative cxid values.  In particular, cxid values of -4, -2, and -1 are special and
+        // must not be used for requests -- see SendThread.readResponse.
+        // Skip from MAX to 1.
+        if (xid == Integer.MAX_VALUE) {
+            xid = 1;
+        }
         return xid++;
     }
 

+ 34 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/TestableZooKeeper.java

@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 
@@ -35,6 +36,39 @@ public class TestableZooKeeper extends ZooKeeperAdmin {
             Watcher watcher) throws IOException {
         super(host, sessionTimeout, watcher);
     }
+
+    class TestableClientCnxn extends ClientCnxn {
+        TestableClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
+                throws IOException {
+            super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
+                clientCnxnSocket, 0, new byte[16], canBeReadOnly);
+        }
+
+        void setXid(int newXid) {
+            xid = newXid;
+        }
+
+        int checkXid() {
+            return xid;
+        }
+    }
+
+    protected ClientCnxn createConnection(String chrootPath,
+            HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+            boolean canBeReadOnly) throws IOException {
+        return new TestableClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
+                watcher, clientCnxnSocket, canBeReadOnly);
+    }
+
+    public void setXid(int xid) {
+        ((TestableClientCnxn)cnxn).setXid(xid);
+    }
+
+    public int checkXid() {
+        return ((TestableClientCnxn)cnxn).checkXid();
+    }
     
     @Override
     public List<String> getChildWatches() {

+ 35 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java

@@ -26,6 +26,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -868,4 +870,37 @@ public class ClientTest extends ClientBase {
 
         Assert.assertFalse(zooKeeper.getState().isAlive());
     }
+
+    @Test
+    public void testCXidRollover() throws Exception {
+        TestableZooKeeper zk = null;
+        try {
+            zk = createClient();
+            zk.setXid(Integer.MAX_VALUE - 10);
+
+            zk.create("/testnode", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+            for (int i = 0; i < 20; ++i) {
+                final CountDownLatch latch = new CountDownLatch(1);
+                final AtomicInteger rc = new AtomicInteger(0);
+                zk.setData("/testnode", "".getBytes(), -1,
+                    new AsyncCallback.StatCallback() {
+                        @Override
+                        public void processResult(int retcode, String path, Object ctx, Stat stat) {
+                            rc.set(retcode);
+                            latch.countDown();
+                        }
+                    }, null);
+                Assert.assertTrue("setData should complete within 5s",
+                    latch.await(zk.getSessionTimeout(), TimeUnit.MILLISECONDS));
+                Assert.assertEquals("setData should have succeeded", Code.OK.intValue(), rc.get());
+            }
+            zk.delete("/testnode", -1);
+            Assert.assertTrue("xid should be positive", zk.checkXid() > 0);
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
 }