Просмотр исходного кода

ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1441922 13f79535-47bb-0310-9956-ffa450edef68
Camille Fournier 12 лет назад
Родитель
Сommit
c35f9b0303

+ 2 - 0
CHANGES.txt

@@ -9,6 +9,8 @@ Backward compatible changes:
 NEW FEATURES:
   ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)
 
+  ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)
+  
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString

+ 5 - 0
src/java/main/org/apache/zookeeper/AsyncCallback.java

@@ -59,4 +59,9 @@ public interface AsyncCallback {
     interface VoidCallback extends AsyncCallback {
         public void processResult(int rc, String path, Object ctx);
     }
+
+    interface MultiCallback extends AsyncCallback {
+        public void processResult(int rc, String path, Object ctx,
+                List<OpResult> opResults);
+    }
 }

+ 21 - 2
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -46,9 +46,11 @@ import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -614,8 +616,25 @@ public class ClientCnxn {
                                     .substring(chrootPath.length())), rsp.getStat());
                       } else {
                           cb.processResult(rc, clientPath, p.ctx, null, null);
-                      }
-                  } else if (p.cb instanceof VoidCallback) {
+                      }                   
+                  } else if (p.response instanceof MultiResponse) {
+                	  MultiCallback cb = (MultiCallback) p.cb;
+                	  MultiResponse rsp = (MultiResponse) p.response;
+                	  if (rc == 0) {
+                		  List<OpResult> results = rsp.getResultList();
+                		  int newRc = rc;
+                		  for (OpResult result : results) {
+                			  if (result instanceof ErrorResult
+                					  && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result)
+                					  .getErr())) {
+                				  break;
+                			  }
+                		  }
+                		  cb.processResult(newRc, clientPath, p.ctx, results);
+                	  } else {
+                		  cb.processResult(rc, clientPath, p.ctx, null);
+                	  }
+                  }  else if (p.cb instanceof VoidCallback) {
                       VoidCallback cb = (VoidCallback) p.cb;
                       cb.processResult(rc, clientPath, p.ctx);
                   }

+ 5 - 0
src/java/main/org/apache/zookeeper/Transaction.java

@@ -17,6 +17,7 @@
 
 package org.apache.zookeeper;
 
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.data.ACL;
 import java.util.ArrayList;
 import java.util.List;
@@ -60,4 +61,8 @@ public class Transaction {
     public List<OpResult> commit() throws InterruptedException, KeeperException {
         return zk.multi(ops);
     }
+
+    public void commit(MultiCallback cb, Object ctx) {
+        zk.multi(ops, cb, ctx);
+    }
 }

+ 22 - 2
src/java/main/org/apache/zookeeper/ZooKeeper.java

@@ -1079,14 +1079,27 @@ public class ZooKeeper {
      * @since 3.4.0
      */
     public List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException {
+        return multiInternal(generateMultiTransaction(ops));
+    }
+
+    /**
+     * The asynchronous version of multi.
+     *
+     * @see #multi(Iterable)
+     */
+    public void multi(Iterable<Op> ops, MultiCallback cb, Object ctx) {
+        multiInternal(generateMultiTransaction(ops), cb, ctx);
+    }
+
+    private MultiTransactionRecord generateMultiTransaction(Iterable<Op> ops) {
         // reconstructing transaction with the chroot prefix
         List<Op> transaction = new ArrayList<Op>();
         for (Op op : ops) {
             transaction.add(withRootPrefix(op));
         }
-        return multiInternal(new MultiTransactionRecord(transaction));
+        return new MultiTransactionRecord(transaction);
     }
-    
+
     private Op withRootPrefix(Op op) {
         if (null != op.getPath()) {
             final String serverPath = prependChroot(op.getPath());
@@ -1097,6 +1110,13 @@ public class ZooKeeper {
         return op;
     }
 
+    protected void multiInternal(MultiTransactionRecord request, MultiCallback cb, Object ctx) {
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.multi);
+        MultiResponse response = new MultiResponse();
+        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, null, null, ctx, null);
+    }
+
     protected List<OpResult> multiInternal(MultiTransactionRecord request)
         throws InterruptedException, KeeperException {
         RequestHeader h = new RequestHeader();

+ 163 - 54
src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java

@@ -21,12 +21,14 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -46,18 +48,103 @@ import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 public class MultiTransactionTest extends ClientBase {
     private static final Logger LOG = Logger.getLogger(MultiTransactionTest.class);
     private ZooKeeper zk;
     private ZooKeeper zk_chroot;
 
+    private final boolean useAsync;
+
+    public MultiTransactionTest(boolean useAsync) {
+        this.useAsync = useAsync;
+    }
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] {
+            { false }, { true },
+        });
+    }
+
     @Before
     public void setUp() throws Exception {
         SyncRequestProcessor.setSnapCount(150);
         super.setUp();
         zk = createClient();
     }
+
+    static class MultiResult {
+        int rc;
+        List<OpResult> results;
+        boolean finished = false;
+    }
+
+    private List<OpResult> multi(ZooKeeper zk, Iterable<Op> ops)
+    throws KeeperException, InterruptedException {
+        if (useAsync) {
+            final MultiResult res = new MultiResult();
+            zk.multi(ops, new MultiCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                                          List<OpResult> opResults) {
+                    synchronized (res) {
+                        res.rc = rc;
+                        res.results = opResults;
+                        res.finished = true;
+                        res.notifyAll();
+                    }
+                }
+            }, null);
+            synchronized (res) {
+                while (!res.finished) {
+                    res.wait();
+                }
+            }
+            if (KeeperException.Code.OK.intValue() != res.rc) {
+                KeeperException ke = KeeperException.create(KeeperException.Code.get(res.rc));
+                throw ke;
+            }
+            return res.results;
+        } else {
+            return zk.multi(ops);
+        }
+    }
+
+    private List<OpResult> commit(Transaction txn)
+    throws KeeperException, InterruptedException {
+        if (useAsync) {
+            final MultiResult res = new MultiResult();
+            txn.commit(new MultiCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                                          List<OpResult> opResults) {
+                    synchronized (res) {
+                        res.rc = rc;
+                        res.results = opResults;
+                        res.finished = true;
+                        res.notifyAll();
+                    }
+                }
+            }, null);
+            synchronized (res) {
+                while (!res.finished) {
+                    res.wait();
+                }
+            }
+            if (KeeperException.Code.OK.intValue() != res.rc) {
+                KeeperException ke = KeeperException.create(KeeperException.Code.get(res.rc));
+                throw ke;
+            }
+            return res.results;
+        } else {
+            return txn.commit();
+        }
+    }
     
     @Test
     public void testChRootCreateDelete() throws Exception {
@@ -67,7 +154,7 @@ public class MultiTransactionTest extends ClientBase {
         zk_chroot = createClient(this.hostPort + chRoot);
         Op createChild = Op.create("/myid", new byte[0],
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk_chroot.multi(Arrays.asList(createChild));
+        multi(zk_chroot, Arrays.asList(createChild));
         
         Assert.assertNotNull("zNode is not created under chroot:" + chRoot, zk
                 .exists(chRoot + "/myid", false));
@@ -78,7 +165,7 @@ public class MultiTransactionTest extends ClientBase {
         
         // Deleting child using chRoot client.
         Op deleteChild = Op.delete("/myid", 0);
-        zk_chroot.multi(Arrays.asList(deleteChild));
+        multi(zk_chroot, Arrays.asList(deleteChild));
         Assert.assertNull("zNode exists under chroot:" + chRoot, zk.exists(
                 chRoot + "/myid", false));
         Assert.assertNull("zNode exists under chroot:" + chRoot, zk_chroot
@@ -100,7 +187,7 @@ public class MultiTransactionTest extends ClientBase {
             ops.add(Op.setData(names[i], names[i].getBytes(), 0));
         }
 
-        zk_chroot.multi(ops) ;
+        multi(zk_chroot, ops) ;
 
         for (int i = 0; i < names.length; i++) {
             Assert.assertArrayEquals("zNode data not matching", names[i]
@@ -124,7 +211,7 @@ public class MultiTransactionTest extends ClientBase {
         for (int i = 0; i < names.length; i++) {
             ops.add(Op.check(names[i], 0));
         }
-        zk_chroot.multi(ops) ;
+        multi(zk_chroot, ops) ;
     }
 
     @Test
@@ -139,7 +226,7 @@ public class MultiTransactionTest extends ClientBase {
                 CreateMode.PERSISTENT);
         transaction.check(childPath, 0);
         transaction.setData(childPath, childPath.getBytes(), 0);
-        transaction.commit();
+        commit(transaction);
 
         Assert.assertNotNull("zNode is not created under chroot:" + chRoot, zk
                 .exists(chRoot + childPath, false));
@@ -153,7 +240,7 @@ public class MultiTransactionTest extends ClientBase {
         transaction = zk_chroot.transaction();
         // Deleting child using chRoot client.
         transaction.delete(childPath, 1);
-        transaction.commit();
+        commit(transaction);
 
         Assert.assertNull("chroot:" + chRoot + " exists after delete", zk
                 .exists(chRoot + "/myid", false));
@@ -167,14 +254,14 @@ public class MultiTransactionTest extends ClientBase {
         String chRoot = "/appsX";
         Op createChRoot = Op.create(chRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT);
-        zk.multi(Arrays.asList(createChRoot));
+        multi(zk, Arrays.asList(createChRoot));
         return chRoot;
     }
 
 
     @Test
     public void testCreate() throws Exception {
-        zk.multi(Arrays.asList(
+        multi(zk, Arrays.asList(
                 Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
@@ -187,7 +274,7 @@ public class MultiTransactionTest extends ClientBase {
     @Test
     public void testCreateDelete() throws Exception {
 
-        zk.multi(Arrays.asList(
+        multi(zk, Arrays.asList(
                 Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.delete("/multi", 0)
                 ));
@@ -200,7 +287,7 @@ public class MultiTransactionTest extends ClientBase {
     public void testInvalidVersion() throws Exception {
 
         try {
-            zk.multi(Arrays.asList(
+            multi(zk, Arrays.asList(
                     Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                     Op.delete("/multi", 1)
             ));
@@ -213,7 +300,7 @@ public class MultiTransactionTest extends ClientBase {
     @Test
     public void testNestedCreate() throws Exception {
 
-        zk.multi(Arrays.asList(
+        multi(zk, Arrays.asList(
                 /* Create */
                 Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.create("/multi/a", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
@@ -242,7 +329,7 @@ public class MultiTransactionTest extends ClientBase {
             ops.add(Op.setData(names[i], names[i].getBytes(), 0));
         }
 
-        zk.multi(ops) ;
+        multi(zk, ops) ;
 
         for (int i = 0; i < names.length; i++) {
             Assert.assertArrayEquals(names[i].getBytes(), zk.getData(names[i], false, null));
@@ -255,7 +342,7 @@ public class MultiTransactionTest extends ClientBase {
         Assert.assertNull(zk.exists("/multi", null));
 
         try {
-            zk.multi(Arrays.asList(
+            multi(zk, Arrays.asList(
                     Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                     Op.setData("/multi", "X".getBytes(), 0),
                     Op.setData("/multi", "Y".getBytes(), 0)
@@ -269,7 +356,7 @@ public class MultiTransactionTest extends ClientBase {
         Assert.assertNull(zk.exists("/multi", null));
 
         //Updating version solves conflict -- order matters
-        zk.multi(Arrays.asList(
+        multi(zk, Arrays.asList(
                 Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.setData("/multi", "X".getBytes(), 0),
                 Op.setData("/multi", "Y".getBytes(), 1)
@@ -283,7 +370,7 @@ public class MultiTransactionTest extends ClientBase {
 
         /* Delete of a node folowed by an update of the (now) deleted node */
         try {
-            zk.multi(Arrays.asList(
+            multi(zk, Arrays.asList(
                 Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.delete("/multi", 0),
                 Op.setData("/multi", "Y".getBytes(), 0)
@@ -300,24 +387,53 @@ public class MultiTransactionTest extends ClientBase {
     @Test
     public void testGetResults() throws Exception {
         /* Delete of a node folowed by an update of the (now) deleted node */
-        try {
-            zk.multi(Arrays.asList(
-                    Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
-                    Op.delete("/multi", 0),
-                    Op.setData("/multi", "Y".getBytes(), 0),
-                    Op.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
-            ));
-            Assert.fail("/multi should have been deleted so setData should have failed");
-        } catch (KeeperException e) {
-            // '/multi' should never have been created as entire op should fail
+        Iterable<Op> ops = Arrays.asList(
+                Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.delete("/multi", 0),
+                Op.setData("/multi", "Y".getBytes(), 0),
+                Op.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+        );
+        List<OpResult> results = null;
+        if (useAsync) {
+            final MultiResult res = new MultiResult();
+            zk.multi(ops, new MultiCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx,
+                                          List<OpResult> opResults) {
+                    synchronized (res) {
+                        res.rc = rc;
+                        res.results = opResults;
+                        res.finished = true;
+                        res.notifyAll();
+                    }
+                }
+            }, null);
+            synchronized (res) {
+                while (!res.finished) {
+                    res.wait();
+                }
+            }
+            Assert.assertFalse("/multi should have been deleted so setData should have failed",
+                               KeeperException.Code.OK.intValue() == res.rc);
             Assert.assertNull(zk.exists("/multi", null));
+            results = res.results;
+        } else {
+            try {
+                zk.multi(ops);
+                Assert.fail("/multi should have been deleted so setData should have failed");
+            } catch (KeeperException e) {
+                // '/multi' should never have been created as entire op should fail
+                Assert.assertNull(zk.exists("/multi", null));
+                results = e.getResults();
+            }
+        }
 
-            for (OpResult r : e.getResults()) {
-                LOG.info("RESULT==> " + r);
-                if (r instanceof ErrorResult) {
-                    ErrorResult er = (ErrorResult) r;
-                    LOG.info("ERROR RESULT: " + er + " ERR=>" + KeeperException.Code.get(er.getErr()));
-                }
+        Assert.assertNotNull(results);
+        for (OpResult r : results) {
+            LOG.info("RESULT==> " + r);
+            if (r instanceof ErrorResult) {
+                ErrorResult er = (ErrorResult) r;
+                LOG.info("ERROR RESULT: " + er + " ERR=>" + KeeperException.Code.get(er.getErr()));
             }
         }
     }
@@ -377,7 +493,7 @@ public class MultiTransactionTest extends ClientBase {
     public void testWatchesTriggered() throws KeeperException, InterruptedException {
         HasTriggeredWatcher watcher = new HasTriggeredWatcher();
         zk.getChildren("/", watcher);
-        zk.multi(Arrays.asList(
+        multi(zk, Arrays.asList(
                 Op.create("/t", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                 Op.delete("/t", -1)
         ));
@@ -389,7 +505,7 @@ public class MultiTransactionTest extends ClientBase {
         HasTriggeredWatcher watcher = new HasTriggeredWatcher();
         zk.getChildren("/", watcher);
         try {
-            zk.multi(Arrays.asList(
+            multi(zk, Arrays.asList(
                     Op.create("/t", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
                     Op.delete("/nonexisting", -1)
             ));
@@ -407,11 +523,10 @@ public class MultiTransactionTest extends ClientBase {
     
     @Test
     public void testTransactionBuilder() throws Exception {
-        List<OpResult> results = zk.transaction()
+        List<OpResult> results = commit(zk.transaction()
                 .create("/t1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
                 .create("/t1/child", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
-                .create("/t2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
-                .commit();
+                .create("/t2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
         assertEquals(3, results.size());
         for (OpResult r : results) {
             CreateResult c = (CreateResult)r;
@@ -422,11 +537,10 @@ public class MultiTransactionTest extends ClientBase {
         assertNotNull(zk.exists("/t1/child", false));
         assertNotNull(zk.exists("/t2", false));
         
-        results = zk.transaction()
+        results = commit(zk.transaction()
                 .check("/t1", 0)
                 .check("/t1/child", 0)
-                .check("/t2", 0)
-                .commit();
+                .check("/t2", 0));
         assertEquals(3, results.size());
         for (OpResult r : results) {
             CheckResult c = (CheckResult)r;
@@ -434,46 +548,41 @@ public class MultiTransactionTest extends ClientBase {
         }
         
         try {
-            results = zk.transaction()
+            results = commit(zk.transaction()
                     .check("/t1", 0)
                     .check("/t1/child", 0)
-                    .check("/t2", 1)
-                    .commit();
+                    .check("/t2", 1));
             fail();
         } catch (KeeperException.BadVersionException e) {
             // expected
         }
         
-        results = zk.transaction()
+        results = commit(zk.transaction()
                 .check("/t1", 0)
-                .setData("/t1", new byte[0], 0)
-                .commit();
+                .setData("/t1", new byte[0], 0));
         assertEquals(2, results.size());
         for (OpResult r : results) {
             assertNotNull(r.toString());
         }
 
         try {
-            results = zk.transaction()
+            results = commit(zk.transaction()
                     .check("/t1", 1)
-                    .setData("/t1", new byte[0], 2)
-                    .commit();
+                    .setData("/t1", new byte[0], 2));
             fail();
         } catch (KeeperException.BadVersionException e) {
             // expected
         }
         
-        results = zk.transaction()
+        results = commit(zk.transaction()
                 .check("/t1", 1)
                 .check("/t1/child", 0)
-                .check("/t2", 0)
-                .commit();
+                .check("/t2", 0));
         assertEquals(3, results.size());
 
-        results = zk.transaction()
+        results = commit(zk.transaction()
                 .delete("/t2", -1)
-                .delete("/t1/child", -1)
-                .commit();
+                .delete("/t1/child", -1));
         assertEquals(2, results.size());
         for (OpResult r : results) {
             DeleteResult d = (DeleteResult)r;