Browse Source

ZOOKEEPER-965. Need a multi-update command to allow multiple znodes to be updated safely

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1141746 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 14 years ago
parent
commit
0b3d1c6a7b
30 changed files with 2750 additions and 445 deletions
  1. 2 0
      CHANGES.txt
  2. 2 2
      src/c/Makefile.am
  3. 17 15
      src/c/include/proto.h
  4. 165 0
      src/c/include/zookeeper.h
  5. 404 312
      src/c/src/zookeeper.c
  6. 1 1
      src/c/tests/TestOperations.cc
  7. 1 1
      src/c/tests/ZKMocks.cc
  8. 22 0
      src/java/main/org/apache/zookeeper/KeeperException.java
  9. 166 0
      src/java/main/org/apache/zookeeper/MultiResponse.java
  10. 160 0
      src/java/main/org/apache/zookeeper/MultiTransactionRecord.java
  11. 306 0
      src/java/main/org/apache/zookeeper/Op.java
  12. 185 0
      src/java/main/org/apache/zookeeper/OpResult.java
  13. 60 0
      src/java/main/org/apache/zookeeper/Transaction.java
  14. 4 0
      src/java/main/org/apache/zookeeper/ZooDefs.java
  15. 65 42
      src/java/main/org/apache/zookeeper/ZooKeeper.java
  16. 94 5
      src/java/main/org/apache/zookeeper/server/DataTree.java
  17. 50 1
      src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
  18. 243 51
      src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
  19. 8 0
      src/java/main/org/apache/zookeeper/server/Request.java
  20. 1 1
      src/java/main/org/apache/zookeeper/server/RequestProcessor.java
  21. 2 0
      src/java/main/org/apache/zookeeper/server/TraceFormatter.java
  22. 6 6
      src/java/main/org/apache/zookeeper/server/package.html
  23. 1 0
      src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
  24. 6 0
      src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
  25. 72 0
      src/java/test/org/apache/zookeeper/MultiResponseTest.java
  26. 50 0
      src/java/test/org/apache/zookeeper/MultiTransactionRecordTest.java
  27. 114 0
      src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
  28. 289 0
      src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig
  29. 230 0
      src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java
  30. 24 8
      src/zookeeper.jute

+ 2 - 0
CHANGES.txt

@@ -325,6 +325,8 @@ IMPROVEMENTS:
   ZOOKEEPER-1074. zkServer.sh is missing nohup/sleep, which are necessary 
   for remote invocation. (phunt via mahadev)
 
+  ZOOKEEPER-965. Need a multi-update command to allow multiple znodes to be updated safely (Marshall McMullen and Ted Dunning via breed)
+
 NEW FEATURES:
   ZOOKEEPER-729. Java client API to recursively delete a subtree.
   (Kay Kay via henry)

+ 2 - 2
src/c/Makefile.am

@@ -7,7 +7,7 @@ AM_CXXFLAGS = -Wall $(USEIPV6)
 
 LIB_LDFLAGS = -no-undefined -version-info 2
 
-pkginclude_HEADERS = include/zookeeper.h include/zookeeper_version.h include/zookeeper_log.h include/recordio.h generated/zookeeper.jute.h
+pkginclude_HEADERS = include/zookeeper.h include/zookeeper_version.h include/zookeeper_log.h include/proto.h include/recordio.h generated/zookeeper.jute.h
 EXTRA_DIST=LICENSE
 
 HASHTABLE_SRC = src/hashtable/hashtable_itr.h src/hashtable/hashtable_itr.c \
@@ -75,7 +75,7 @@ TEST_SOURCES = tests/TestDriver.cc tests/LibCMocks.cc tests/LibCSymTable.cc \
     tests/TestClientRetry.cc \
     tests/TestOperations.cc tests/TestZookeeperInit.cc \
     tests/TestZookeeperClose.cc tests/TestClient.cc \
-    tests/TestWatchers.cc
+    tests/TestMulti.cc tests/TestWatchers.cc
 
 
 SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)

+ 17 - 15
src/c/include/proto.h

@@ -22,21 +22,23 @@
 extern "C" {
 #endif
 
-static const int NOTIFY_OP=0;
-static const int CREATE_OP=1;
-static const int DELETE_OP=2;
-static const int EXISTS_OP=3;
-static const int GETDATA_OP=4;
-static const int SETDATA_OP=5;
-static const int GETACL_OP=6;
-static const int SETACL_OP=7;
-static const int GETCHILDREN_OP=8;
-static const int SYNC_OP=9;
-static const int PING_OP=11;
-static const int GETCHILDREN2_OP=12;
-static const int CLOSE_OP=-11;
-static const int SETAUTH_OP=100;
-static const int SETWATCHES_OP=101;
+#define ZOO_NOTIFY_OP 0
+#define ZOO_CREATE_OP 1
+#define ZOO_DELETE_OP 2
+#define ZOO_EXISTS_OP 3
+#define ZOO_GETDATA_OP 4
+#define ZOO_SETDATA_OP 5
+#define ZOO_GETACL_OP 6
+#define ZOO_SETACL_OP 7
+#define ZOO_GETCHILDREN_OP 8
+#define ZOO_SYNC_OP 9
+#define ZOO_PING_OP 11
+#define ZOO_GETCHILDREN2_OP 12
+#define ZOO_CHECK_OP 13
+#define ZOO_MULTI_OP 14
+#define ZOO_CLOSE_OP -11
+#define ZOO_SETAUTH_OP 100
+#define ZOO_SETWATCHES_OP 101
 
 #ifdef __cplusplus
 }

+ 165 - 0
src/c/include/zookeeper.h

@@ -25,6 +25,7 @@
 #include <stdio.h>
 #include <ctype.h>
 
+#include "proto.h"
 #include "zookeeper_version.h"
 #include "recordio.h"
 #include "zookeeper.jute.h"
@@ -254,6 +255,138 @@ typedef struct {
     char passwd[16];
 } clientid_t;
 
+/**
+ * \brief zoo_op structure.
+ *
+ * This structure holds all the arguments necessary for one op as part
+ * of a containing multi_op via \ref zoo_multi or \ref zoo_amulti.
+ * This structure should be treated as opaque and initialized via 
+ * \ref zoo_create_op_init, \ref zoo_delete_op_init, \ref zoo_set_op_init
+ * and \ref zoo_check_op_init.
+ */
+typedef struct zoo_op {
+    int type;
+    union {
+        // CREATE
+        struct {
+            const char *path;
+            const char *data;
+            int datalen;
+	        char *buf;
+            int buflen;
+            const struct ACL_vector *acl;
+            int flags;
+        } create_op;
+
+        // DELETE 
+        struct {
+            const char *path;
+            int version;
+        } delete_op;
+        
+        // SET
+        struct {
+            const char *path;
+            const char *data;
+            int datalen;
+            int version;
+            struct Stat *stat;
+        } set_op;
+        
+        // CHECK
+        struct {
+            const char *path;
+            int version;
+        } check_op;
+    };
+} zoo_op_t;
+
+/**
+ * \brief zoo_create_op_init.
+ *
+ * This function initializes a zoo_op_t with the arguments for a ZOO_CREATE_OP.
+ *
+ * \param op A pointer to the zoo_op_t to be initialized.
+ * \param path The name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param value The data to be stored in the node.
+ * \param valuelen The number of bytes in data. To set the data to be NULL use
+ * value as NULL and valuelen as -1.
+ * \param acl The initial ACL of the node. The ACL must not be null or empty.
+ * \param flags this parameter can be set to 0 for normal create or an OR
+ *    of the Create Flags
+ * \param path_buffer Buffer which will be filled with the path of the
+ *    new node (this might be different than the supplied path
+ *    because of the ZOO_SEQUENCE flag).  The path string will always be
+ *    null-terminated. This parameter may be NULL if path_buffer_len = 0.
+ * \param path_buffer_len Size of path buffer; if the path of the new
+ *    node (including space for the null terminator) exceeds the buffer size,
+ *    the path string will be truncated to fit.  The actual path of the
+ *    new node in the server will not be affected by the truncation.
+ *    The path string will always be null-terminated.
+ */
+void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
+        int valuelen,  const struct ACL_vector *acl, int flags, 
+        char *path_buffer, int path_buffer_len);
+
+/**
+ * \brief zoo_delete_op_init.
+ *
+ * This function initializes a zoo_op_t with the arguments for a ZOO_DELETE_OP.
+ *
+ * \param op A pointer to the zoo_op_t to be initialized.
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param version the expected version of the node. The function will fail if the
+ *    actual version of the node does not match the expected version.
+ *  If -1 is used the version check will not take place. 
+ */
+void zoo_delete_op_init(zoo_op_t *op, const char *path, int version);
+
+/**
+ * \brief zoo_set_op_init.
+ *
+ * This function initializes an zoo_op_t with the arguments for a ZOO_SETDATA_OP.
+ *
+ * \param op A pointer to the zoo_op_t to be initialized.
+ * \param path the name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param buffer the buffer holding data to be written to the node.
+ * \param buflen the number of bytes from buffer to write. To set NULL as data 
+ * use buffer as NULL and buflen as -1.
+ * \param version the expected version of the node. The function will fail if 
+ * the actual version of the node does not match the expected version. If -1 is 
+ * used the version check will not take place. 
+ */
+void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer, 
+        int buflen, int version, struct Stat *stat);
+
+/**
+ * \brief zoo_check_op_init.
+ *
+ * This function initializes an zoo_op_t with the arguments for a ZOO_CHECK_OP.
+ *
+ * \param op A pointer to the zoo_op_t to be initialized.
+ * \param path The name of the node. Expressed as a file name with slashes 
+ * separating ancestors of the node.
+ * \param version the expected version of the node. The function will fail if the
+ *    actual version of the node does not match the expected version.
+ */
+void zoo_check_op_init(zoo_op_t *op, const char *path, int version);
+
+/**
+ * \brief zoo_op_result structure.
+ *
+ * This structure holds the result for an op submitted as part of a multi_op
+ * via \ref zoo_multi or \ref zoo_amulti.
+ */
+typedef struct zoo_op_result {
+    int err;
+    char *value;
+	int valuelen;
+    struct Stat *stat;
+} zoo_op_result_t; 
+
 /**
  * \brief signature of a watch function.
  * 
@@ -974,6 +1107,25 @@ ZOOAPI int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t comple
 ZOOAPI int zoo_aset_acl(zhandle_t *zh, const char *path, int version, 
         struct ACL_vector *acl, void_completion_t, const void *data);
 
+/**
+ * \brief atomically commits multiple zookeeper operations.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param count the number of operations
+ * \param ops an array of operations to commit
+ * \param results an array to hold the results of the operations
+ * \param completion the routine to invoke when the request completes. The completion
+ * will be triggered with any of the error codes that can that can be returned by the 
+ * ops supported by a multi op (see \ref zoo_acreate, \ref zoo_adelete, \ref zoo_aset).
+ * \param data the data that will be passed to the completion routine when
+ * the function completes.
+ * \return the return code for the function call. This can be any of the
+ * values that can be returned by the ops supported by a multi op (see
+ * \ref zoo_acreate, \ref zoo_adelete, \ref zoo_aset).
+ */
+ZOOAPI int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops, 
+        zoo_op_result_t *results, void_completion_t, const void *data);
+
 /**
  * \brief return an error string.
  * 
@@ -1404,6 +1556,19 @@ ZOOAPI int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
 ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version,
                            const struct ACL_vector *acl);
 
+/**
+ * \brief atomically commits multiple zookeeper operations synchronously.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param count the number of operations
+ * \param ops an array of operations to commit
+ * \param results an array to hold the results of the operations
+ * \return the return code for the function call. This can be any of the
+ * values that can be returned by the ops supported by a multi op (see
+ * \ref zoo_acreate, \ref zoo_adelete, \ref zoo_aset).
+ */ 
+ZOOAPI int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results);
+
 #ifdef __cplusplus
 }
 #endif

File diff suppressed because it is too large
+ 404 - 312
src/c/src/zookeeper.c


+ 1 - 1
src/c/tests/TestOperations.cc

@@ -220,7 +220,7 @@ public:
         PingCountingServer():pingCount_(0){}
         // called when a client request is received
         virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
-           if(rh.type==PING_OP){
+           if(rh.type==ZOO_PING_OP){
                pingCount_++;
            }
         }

+ 1 - 1
src/c/tests/ZKMocks.cc

@@ -487,7 +487,7 @@ void ZookeeperServer::notifyBufferSent(const std::string& buffer){
         onMessageReceived(rh,ia);
     }
     close_buffer_iarchive(&ia);
-    if(rh.type==CLOSE_OP){
+    if(rh.type==ZOO_CLOSE_OP){
         ++closeSent;
         return; // no reply for close requests
     }

+ 22 - 0
src/java/main/org/apache/zookeeper/KeeperException.java

@@ -18,12 +18,21 @@
 
 package org.apache.zookeeper;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 @SuppressWarnings("serial")
 public abstract class KeeperException extends Exception {
+    /**
+     * All multi-requests that result in an exception retain the results
+     * here so that it is possible to examine the problems in the catch
+     * scope.  Non-multi requests will get a null if they try to access
+     * these results.
+     */
+    private List<OpResult> results;
 
     /**
      * All non-specific keeper exceptions should be constructed via
@@ -465,6 +474,19 @@ public abstract class KeeperException extends Exception {
         return "KeeperErrorCode = " + getCodeMessage(code) + " for " + path;
     }
 
+    void setMultiResults(List<OpResult> results) {
+        this.results = results;
+    }
+
+    /**
+     * If this exception was thrown by a multi-request then the (partial) results
+     * and error codes can be retrieved using this getter.
+     * @return A copy of the list of results from the operations in the multi-request.
+     */
+    public List<OpResult> getResults() {
+        return results != null ? new ArrayList<OpResult>(results) : null;
+    }
+
     /**
      *  @see Code#APIERROR
      */

+ 166 - 0
src/java/main/org/apache/zookeeper/MultiResponse.java

@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.proto.CreateResponse;
+import org.apache.zookeeper.proto.MultiHeader;
+import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.ErrorResponse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Handles the response from a multi request.  Such a response consists of
+ * a sequence of responses each prefixed by a MultiResponse that indicates
+ * the type of the response.  The end of the list is indicated by a MultiHeader
+ * with a negative type.  Each individual response is in the same format as
+ * with the corresponding operation in the original request list.
+ */
+public class MultiResponse implements Record, Iterable<OpResult> {
+    private List<OpResult> results = new ArrayList<OpResult>();
+
+    public void add(OpResult x) {
+        results.add(x);
+    }
+
+    @Override
+    public Iterator<OpResult> iterator() {
+        return results.iterator();
+    }
+
+    public int size() {
+        return results.size();
+    }
+
+    @Override
+    public void serialize(OutputArchive archive, String tag) throws IOException {
+        archive.startRecord(this, tag);
+
+        int index = 0;
+        for (OpResult result : results) {
+            int err = result.getType() == ZooDefs.OpCode.error ? ((OpResult.ErrorResult)result).getErr() : 0;
+
+            new MultiHeader(result.getType(), false, err).serialize(archive, tag);
+
+            switch (result.getType()) {
+                case ZooDefs.OpCode.create:
+                    new CreateResponse(((OpResult.CreateResult) result).getPath()).serialize(archive, tag);
+                    break;
+                case ZooDefs.OpCode.delete:
+                case ZooDefs.OpCode.check:
+                    break;
+                case ZooDefs.OpCode.setData:
+                    new SetDataResponse(((OpResult.SetDataResult) result).getStat()).serialize(archive, tag);
+                    break;
+                case ZooDefs.OpCode.error:
+                    new ErrorResponse(((OpResult.ErrorResult) result).getErr()).serialize(archive, tag);
+                    break;
+                default:
+                    throw new IOException("Invalid type " + result.getType() + " in MultiResponse");
+            }
+        }
+        new MultiHeader(-1, true, -1).serialize(archive, tag);
+        archive.endRecord(this, tag);
+    }
+
+    @Override
+    public void deserialize(InputArchive archive, String tag) throws IOException {
+        results = new ArrayList<OpResult>();
+
+        archive.startRecord(tag);
+        MultiHeader h = new MultiHeader();
+        h.deserialize(archive, tag);
+        while (!h.getDone()) {
+            switch (h.getType()) {
+                case ZooDefs.OpCode.create:
+                    CreateResponse cr = new CreateResponse();
+                    cr.deserialize(archive, tag);
+                    results.add(new OpResult.CreateResult(cr.getPath()));
+                    break;
+
+                case ZooDefs.OpCode.delete:
+                    results.add(new OpResult.DeleteResult());
+                    break;
+
+                case ZooDefs.OpCode.setData:
+                    SetDataResponse sdr = new SetDataResponse();
+                    sdr.deserialize(archive, tag);
+                    results.add(new OpResult.SetDataResult(sdr.getStat()));
+                    break;
+
+                case ZooDefs.OpCode.check:
+                    results.add(new OpResult.CheckResult());
+                    break;
+
+                case ZooDefs.OpCode.error:
+                    //FIXME: need way to more cleanly serialize/deserialize exceptions
+                    ErrorResponse er = new ErrorResponse();
+                    er.deserialize(archive, tag);
+                    results.add(new OpResult.ErrorResult(er.getErr()));
+                    break;
+
+                default:
+                    throw new IOException("Invalid type " + h.getType() + " in MultiResponse");
+            }
+            h.deserialize(archive, tag);
+        }
+        archive.endRecord(tag);
+    }
+
+    public List<OpResult> getResultList() {
+        return results;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof MultiResponse)) return false;
+
+        MultiResponse other = (MultiResponse) o;
+
+        if (results != null) {
+            Iterator<OpResult> i = other.results.iterator();
+            for (OpResult result : results) {
+                if (i.hasNext()) {
+                    if (!result.equals(i.next())) {
+                        return false;
+                    }
+                } else {
+                    return false;
+                }
+            }
+            return !i.hasNext();
+        }
+        else return other.results == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = results.size();
+        for (OpResult result : results) {
+            hash = (hash * 35) + result.hashCode();
+        }
+        return hash;
+    }
+}

+ 160 - 0
src/java/main/org/apache/zookeeper/MultiTransactionRecord.java

@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.proto.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Encodes a composite transaction.  In the wire format, each transaction
+ * consists of a single MultiHeader followed by the appropriate request.
+ * Each of these MultiHeaders has a type which indicates
+ * the type of the following transaction or a negative number if no more transactions
+ * are included.
+ */
+public class MultiTransactionRecord implements Record, Iterable<Op> {
+    private List<Op> ops = new ArrayList<Op>();
+
+    public MultiTransactionRecord() {
+    }
+
+    public MultiTransactionRecord(Iterable<Op> ops) {
+        for (Op op : ops) {
+            add(op);
+        }
+    }
+
+    @Override
+    public Iterator<Op> iterator() {
+        return ops.iterator() ;
+    }
+
+    public void add(Op op) {
+        ops.add(op);
+    }
+
+    public int size() {
+        return ops.size();
+    }
+
+    @Override
+    public void serialize(OutputArchive archive, String tag) throws IOException {
+        archive.startRecord(this, tag);
+        int index = 0 ;
+        for (Op op : ops) {
+            MultiHeader h = new MultiHeader(op.getType(), false, -1);
+            h.serialize(archive, tag);
+            switch (op.getType()) {
+               case ZooDefs.OpCode.create:
+                    op.toRequestRecord().serialize(archive, tag);
+                    break;
+                case ZooDefs.OpCode.delete:
+                    op.toRequestRecord().serialize(archive, tag);
+                    break;
+                case ZooDefs.OpCode.setData:
+                    op.toRequestRecord().serialize(archive, tag);
+                    break;
+                case ZooDefs.OpCode.check:
+                    op.toRequestRecord().serialize(archive, tag);
+                    break;
+                default:
+                    throw new IOException("Invalid type of op");
+            }
+        }
+        new MultiHeader(-1, true, -1).serialize(archive, tag);
+        archive.endRecord(this, tag);
+    }
+
+    @Override
+    public void deserialize(InputArchive archive, String tag) throws IOException {
+        archive.startRecord(tag);
+        MultiHeader h = new MultiHeader();
+        h.deserialize(archive, tag);
+
+        while (!h.getDone()) {
+            switch (h.getType()) {
+               case ZooDefs.OpCode.create:
+                    CreateRequest cr = new CreateRequest();
+                    cr.deserialize(archive, tag);
+                    add(Op.create(cr.getPath(), cr.getData(), cr.getAcl(), cr.getFlags()));
+                    break;
+                case ZooDefs.OpCode.delete:
+                    DeleteRequest dr = new DeleteRequest();
+                    dr.deserialize(archive, tag);
+                    add(Op.delete(dr.getPath(), dr.getVersion()));
+                    break;
+                case ZooDefs.OpCode.setData:
+                    SetDataRequest sdr = new SetDataRequest();
+                    sdr.deserialize(archive, tag);
+                    add(Op.setData(sdr.getPath(), sdr.getData(), sdr.getVersion()));
+                    break;
+                case ZooDefs.OpCode.check:
+                    CheckVersionRequest cvr = new CheckVersionRequest();
+                    cvr.deserialize(archive, tag);
+                    add(Op.check(cvr.getPath(), cvr.getVersion()));
+                    break;
+                default:
+                    throw new IOException("Invalid type of op");
+            }
+            h.deserialize(archive, tag);
+        }
+        archive.endRecord(tag);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof MultiTransactionRecord)) return false;
+
+        MultiTransactionRecord that = (MultiTransactionRecord) o;
+
+        if (ops != null) {
+            Iterator<Op> other = that.ops.iterator();
+            for (Op op : ops) {
+                boolean hasMoreData = other.hasNext();
+                if (!hasMoreData) {
+                    return false;
+                }
+                Op otherOp = other.next();
+                if (!op.equals(otherOp)) {
+                    return false;
+                }
+            }
+            return !other.hasNext();
+        } else {
+            return that.ops == null;
+        }
+
+    }
+
+    @Override
+    public int hashCode() {
+        int h = 1023;
+        for (Op op : ops) {
+            h = h * 25 + op.hashCode();
+        }
+        return h;
+    }
+}

+ 306 - 0
src/java/main/org/apache/zookeeper/Op.java

@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.proto.CheckVersionRequest;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Represents a single operation in a multi-operation transaction.  Each operation can be a create, update
+ * or delete or can just be a version check.
+ *
+ * Sub-classes of Op each represent each detailed type but should not normally be referenced except via
+ * the provided factory methods.
+ *
+ * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode)
+ * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode, org.apache.zookeeper.AsyncCallback.StringCallback, Object)
+ * @see ZooKeeper#delete(String, int)
+ * @see ZooKeeper#setData(String, byte[], int)
+ */
+public abstract class Op {
+    private int type;
+    private String path;
+
+    // prevent untyped construction
+    private Op(int type, String path) {
+        this.type = type;
+        this.path = path;
+    }
+
+    /**
+     * Constructs a create operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode)
+     * @see CreateMode#fromFlag(int)
+     *
+     * @param path
+     *                the path for the node
+     * @param data
+     *                the initial data for the node
+     * @param acl
+     *                the acl for the node
+     * @param flags
+     *                specifying whether the node to be created is ephemeral
+     *                and/or sequential but using the integer encoding.
+     */
+    public static Op create(String path, byte[] data, List<ACL> acl, int flags) {
+        return new Create(path, data, acl, flags);
+    }
+
+    /**
+     * Constructs a create operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode)
+     *
+     * @param path
+     *                the path for the node
+     * @param data
+     *                the initial data for the node
+     * @param acl
+     *                the acl for the node
+     * @param createMode
+     *                specifying whether the node to be created is ephemeral
+     *                and/or sequential
+     */
+    public static Op create(String path, byte[] data, List<ACL> acl, CreateMode createMode) {
+        return new Create(path, data, acl, createMode);
+    }
+
+    /**
+     * Constructs a delete operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#delete(String, int)
+     *
+     * @param path
+     *                the path of the node to be deleted.
+     * @param version
+     *                the expected node version.
+     */
+    public static Op delete(String path, int version) {
+        return new Delete(path, version);
+    }
+
+    /**
+     * Constructs an update operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#setData(String, byte[], int)
+     *
+     * @param path
+     *                the path of the node
+     * @param data
+     *                the data to set
+     * @param version
+     *                the expected matching version
+     */
+    public static Op setData(String path, byte[] data, int version) {
+        return new SetData(path, data, version);
+    }
+
+
+    /**
+     * Constructs an version check operation.  Arguments are as for the ZooKeeper.setData method except that
+     * no data is provided since no update is intended.  The purpose for this is to allow read-modify-write
+     * operations that apply to multiple znodes, but where some of the znodes are involved only in the read,
+     * not the write.  A similar effect could be achieved by writing the same data back, but that leads to
+     * way more version updates than are necessary and more writing in general.
+     *
+     * @param path
+     *                the path of the node
+     * @param version
+     *                the expected matching version
+     * @return
+     */
+    public static Op check(String path, int version) {
+        return new Check(path, version);
+    }
+
+    /**
+     * Gets the integer type code for an Op.  This code should be as from ZooDefs.OpCode
+     * @see ZooDefs.OpCode
+     * @return  The type code.
+     */
+    public int getType() {
+        return type;
+    }
+
+    /**
+     * Gets the path for an Op.
+     * @return  The path.
+     */
+    public String getPath() {
+        return path;
+    }
+
+    /**
+     * Encodes an op for wire transmission.
+     * @return An appropriate Record structure.
+     */
+    public abstract Record toRequestRecord() ;
+
+    //////////////////
+    // these internal classes are public, but should not generally be referenced.
+    //
+    public static class Create extends Op {
+        private byte[] data;
+        private List<ACL> acl;
+        private int flags;
+
+        private Create(String path, byte[] data, List<ACL> acl, int flags) {
+            super(ZooDefs.OpCode.create, path);
+            this.data = data;
+            this.acl = acl;
+            this.flags = flags;
+        }
+
+        private Create(String path, byte[] data, List<ACL> acl, CreateMode createMode) {
+            super(ZooDefs.OpCode.create, path);
+            this.data = data;
+            this.acl = acl;
+            this.flags = createMode.toFlag();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Create)) return false;
+
+            Create op = (Create) o;
+
+            boolean aclEquals = true;
+            Iterator<ACL> i = op.acl.iterator();
+            for (ACL acl : op.acl) {
+                boolean hasMoreData = i.hasNext();
+                if (!hasMoreData) {
+                    aclEquals = false;
+                    break;
+                }
+                ACL otherAcl = i.next();
+                if (!acl.equals(otherAcl)) {
+                    aclEquals = false;
+                    break;
+                }
+            }
+            return !i.hasNext() && getType() == op.getType() && Arrays.equals(data, op.data) && flags == op.flags && aclEquals;
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + Arrays.hashCode(data);
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new CreateRequest(getPath(), data, acl, flags);
+        }
+    }
+
+    public static class Delete extends Op {
+        private int version;
+
+        private Delete(String path, int version) {
+            super(ZooDefs.OpCode.delete, path);
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Delete)) return false;
+
+            Delete op = (Delete) o;
+
+            return getType() == op.getType() && version == op.version 
+                   && getPath().equals(op.getPath());
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + version;
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new DeleteRequest(getPath(), version);
+        }
+    }
+
+    public static class SetData extends Op {
+        private byte[] data;
+        private int version;
+
+        private SetData(String path, byte[] data, int version) {
+            super(ZooDefs.OpCode.setData, path);
+            this.data = data;
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof SetData)) return false;
+
+            SetData op = (SetData) o;
+
+            return getType() == op.getType() && version == op.version 
+                   && getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + Arrays.hashCode(data) + version;
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new SetDataRequest(getPath(), data, version);
+        }
+    }
+
+    public static class Check extends Op {
+        private int version;
+
+        private Check(String path, int version) {
+            super(ZooDefs.OpCode.check, path);
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Check)) return false;
+
+            Check op = (Check) o;
+
+            return getType() == op.getType() && getPath().equals(op.getPath()) && version == op.version;
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + version;
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new CheckVersionRequest(getPath(), version);
+        }
+    }
+}

+ 185 - 0
src/java/main/org/apache/zookeeper/OpResult.java

@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Encodes the result of a single part of a multiple operation commit.
+ */
+public class OpResult {
+    private int type;
+
+    private OpResult(int type) {
+        this.type = type;
+    }
+
+    /**
+     * Encodes the return type as from ZooDefs.OpCode.  Can be used
+     * to dispatch to the correct cast needed for getting the desired
+     * additional result data.
+     * @see ZooDefs.OpCode
+     * @return an integer identifying what kind of operation this result came from.
+     */
+    public int getType() {
+        return type;
+    }
+
+    /**
+     * A result from a create operation.  This kind of result allows the
+     * path to be retrieved since the create might have been a sequential
+     * create.
+     */
+    public static class CreateResult extends OpResult {
+        private String path;
+
+        public CreateResult(String path) {
+            super(ZooDefs.OpCode.create);
+            this.path = path;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            CreateResult other = (CreateResult) o;
+            return getType() == other.getType() && path.equals(other.path);
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() * 35 + path.hashCode();
+        }
+    }
+
+    /**
+     * A result from a delete operation.  No special values are available.
+     */
+    public static class DeleteResult extends OpResult {
+        public DeleteResult() {
+            super(ZooDefs.OpCode.delete);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            OpResult opResult = (OpResult) o;
+            return getType() == opResult.getType();
+        }
+
+        @Override
+        public int hashCode() {
+            return getType();
+        }
+    }
+
+    /**
+     * A result from a setData operation.  This kind of result provides access
+     * to the Stat structure from the update.
+     */
+    public static class SetDataResult extends OpResult {
+        private Stat stat;
+
+        public SetDataResult(Stat stat) {
+            super(ZooDefs.OpCode.setData);
+            this.stat = stat;
+        }
+
+        public Stat getStat() {
+            return stat;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            SetDataResult other = (SetDataResult) o;
+            return getType() == other.getType() && stat.getMzxid() == other.stat.getMzxid();
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) (getType() * 35 + stat.getMzxid());
+        }
+    }
+
+    /**
+     * A result from a version check operation.  No special values are available.
+     */
+    public static class CheckResult extends OpResult {
+        public CheckResult() {
+            super(ZooDefs.OpCode.check);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            CheckResult other = (CheckResult) o;
+            return getType() == other.getType();
+        }
+
+        @Override
+        public int hashCode() {
+            return getType();
+        }
+    }
+
+    /**
+     * An error result from any kind of operation.  The point of error results
+     * is that they contain an error code which helps understand what happened.
+     * @see KeeperException.Code
+     *
+     */
+    public static class ErrorResult extends OpResult {
+        private int err;
+
+        public ErrorResult(int err) {
+            super(ZooDefs.OpCode.error);
+            this.err = err;
+        }
+
+        public int getErr() {
+            return err;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            ErrorResult other = (ErrorResult) o;
+            return getType() == other.getType() && err == other.getErr();
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() * 35 + err;
+        }
+    }
+}

+ 60 - 0
src/java/main/org/apache/zookeeper/Transaction.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+/**
+ * Provides a builder style interface for doing multiple updates.  This is
+ * really just a thin layer on top of Zookeeper.multi().
+ */
+public class Transaction {
+    private ZooKeeper zk;
+    private MultiTransactionRecord request = new MultiTransactionRecord();
+
+    protected Transaction(ZooKeeper zk) {
+        this.zk = zk;
+    }
+
+    public Transaction create(final String path, byte data[], List<ACL> acl,
+                              CreateMode createMode) {
+        request.add(Op.create(path, data, acl, createMode.toFlag()));
+        return this;
+    }
+
+    public Transaction delete(final String path, int version) {
+        request.add(Op.delete(path, version));
+        return this;
+    }
+
+    public Transaction check(String path, int version) {
+        request.add(Op.check(path, version));
+        return this;
+    }
+
+    public Transaction setData(final String path, byte data[], int version) {
+        request.add(Op.setData(path, data, version));
+        return this;
+    }
+
+    public List<OpResult> commit() throws InterruptedException, KeeperException {
+        return zk.multiInternal(request);
+    }
+}

+ 4 - 0
src/java/main/org/apache/zookeeper/ZooDefs.java

@@ -50,6 +50,10 @@ public class ZooDefs {
 
         public final int getChildren2 = 12;
 
+        public final int check = 13;
+
+        public final int multi = 14;
+
         public final int auth = 100;
 
         public final int setWatches = 101;

+ 65 - 42
src/java/main/org/apache/zookeeper/ZooKeeper.java

@@ -18,53 +18,22 @@
 
 package org.apache.zookeeper;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.AsyncCallback.ACLCallback;
-import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.AsyncCallback.*;
+import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.StaticHostProvider;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.proto.CreateResponse;
-import org.apache.zookeeper.proto.DeleteRequest;
-import org.apache.zookeeper.proto.ExistsRequest;
-import org.apache.zookeeper.proto.GetACLRequest;
-import org.apache.zookeeper.proto.GetACLResponse;
-import org.apache.zookeeper.proto.GetChildren2Request;
-import org.apache.zookeeper.proto.GetChildren2Response;
-import org.apache.zookeeper.proto.GetChildrenRequest;
-import org.apache.zookeeper.proto.GetChildrenResponse;
-import org.apache.zookeeper.proto.GetDataRequest;
-import org.apache.zookeeper.proto.GetDataResponse;
-import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.proto.RequestHeader;
-import org.apache.zookeeper.proto.SetACLRequest;
-import org.apache.zookeeper.proto.SetACLResponse;
-import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.zookeeper.proto.SetDataResponse;
-import org.apache.zookeeper.proto.SyncRequest;
-import org.apache.zookeeper.proto.SyncResponse;
+import org.apache.zookeeper.proto.*;
 import org.apache.zookeeper.server.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.*;
 
 /**
  * This is the main class of ZooKeeper client library. To use a ZooKeeper
@@ -885,7 +854,61 @@ public class ZooKeeper {
         }
     }
 
-    
+    /**
+     * Executes multiple Zookeeper operations or none of them.  On success, a list of results is returned.
+     * On failure, only a single exception is returned.  If you want more details, it may be preferable to
+     * use the alternative form of this method that lets you pass a list into which individual results are
+     * placed so that you can zero in on exactly which operation failed and why.
+     * <p>
+     * The maximum allowable size of all of the data arrays in all of the setData operations in this single
+     * request is 1 MB (1,048,576 bytes).
+     * Requests larger than this will cause a KeeperExecption to be thrown.
+     * @param ops  An iterable that contains the operations to be done.  These should be created using the
+     * factory methods on Op.
+     * @see Op
+     * @return A list of results.
+     * @throws InterruptedException  If the operation was interrupted.  The operation may or may not have succeeded, but
+     * will not have partially succeeded if this exception is thrown.
+     * @throws KeeperException If the operation could not be completed due to some error in doing one of the specified
+     * ops.
+     */
+    public List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException {
+        return multiInternal(new MultiTransactionRecord(ops));
+    }
+
+    protected List<OpResult> multiInternal(MultiTransactionRecord request)
+        throws InterruptedException, KeeperException {
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.multi);
+        MultiResponse response = new MultiResponse();
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()));
+        }
+
+        List<OpResult> results = response.getResultList();
+        
+        ErrorResult fatalError = null;
+        for (OpResult result : results) {
+            if (result instanceof ErrorResult && ((ErrorResult)result).getErr() != KeeperException.Code.OK.intValue()) {
+                fatalError = (ErrorResult) result;
+                break;
+            }
+        }
+
+        if (fatalError != null) {
+            KeeperException ex = KeeperException.create(KeeperException.Code.get(fatalError.getErr()));
+            ex.setMultiResults(results);
+            throw ex;
+        }
+
+        return results;
+    }
+
+    public Transaction transaction() {
+        return new Transaction(this);
+    }
+
     /**
      * Recursively delete the node with the given path. 
      * <p>
@@ -1247,7 +1270,7 @@ public class ZooKeeper {
      * thrown if the given version does not match the node's version.
      * <p>
      * The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
-     * Arrays larger than this will cause a KeeperExecption to be thrown.
+     * Arrays larger than this will cause a KeeperException to be thrown.
      *
      * @param path
      *                the path of the node

+ 94 - 5
src/java/main/org/apache/zookeeper/server/DataTree.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +30,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import java.nio.ByteBuffer;
+
 import org.apache.jute.Index;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
@@ -56,8 +59,16 @@ import org.apache.zookeeper.txn.DeleteTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.Txn;
+import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
 /**
  * This class maintains the tree data structure. It doesn't have any networking
  * or client connection code in it so that it can be tested in a stand alone
@@ -736,6 +747,8 @@ public class DataTree {
 
         public Stat stat;
 
+        public List<ProcessTxnResult> multiResult;
+        
         /**
          * Equality is defined as the clientId and the cxid being the same. This
          * allows us to use hash tables to track completion of transactions.
@@ -766,7 +779,8 @@ public class DataTree {
 
     public volatile long lastProcessedZxid = 0;
 
-    public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
+    public ProcessTxnResult processTxn(TxnHeader header, Record txn)
+    {
         ProcessTxnResult rc = new ProcessTxnResult();
 
         String debug = "";
@@ -776,6 +790,7 @@ public class DataTree {
             rc.zxid = header.getZxid();
             rc.type = header.getType();
             rc.err = 0;
+            rc.multiResult = null;
             if (rc.zxid > lastProcessedZxid) {
                 lastProcessedZxid = rc.zxid;
             }
@@ -800,15 +815,16 @@ public class DataTree {
                     break;
                 case OpCode.setData:
                     SetDataTxn setDataTxn = (SetDataTxn) txn;
-                    debug = "Set data for  transaction for "
-                            + setDataTxn.getPath();
+                    debug = "Set data transaction for "
+                            + setDataTxn.getPath()
+                            + " to new value=" + Arrays.toString(setDataTxn.getData());
                     rc.stat = setData(setDataTxn.getPath(), setDataTxn
                             .getData(), setDataTxn.getVersion(), header
                             .getZxid(), header.getTime());
                     break;
                 case OpCode.setACL:
                     SetACLTxn setACLTxn = (SetACLTxn) txn;
-                    debug = "Set ACL for  transaction for "
+                    debug = "Set ACL transaction for "
                             + setACLTxn.getPath();
                     rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
                             setACLTxn.getVersion());
@@ -820,10 +836,83 @@ public class DataTree {
                     ErrorTxn errTxn = (ErrorTxn) txn;
                     rc.err = errTxn.getErr();
                     break;
+                case OpCode.check:
+                    CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
+                    debug = "Check Version transaction for "
+                            + checkTxn.getPath() 
+                            + " and version="
+                            + checkTxn.getVersion();
+                    rc.path = checkTxn.getPath();
+                    break;
+                case OpCode.multi:
+                    MultiTxn multiTxn = (MultiTxn) txn ;
+                    List<Txn> txns = multiTxn.getTxns();
+                    debug = "Multi transaction with " + txns.size() + " operations";
+                    rc.multiResult = new ArrayList<ProcessTxnResult>();
+                    boolean failed = false;
+                    for (Txn subtxn : txns) {
+                        if (subtxn.getType() == OpCode.error) {
+                            failed = true;
+                            break;
+                        }
+                    }
+
+                    boolean post_failed = false;
+                    for (Txn subtxn : txns) {
+                        ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
+                        Record record = null;
+                        switch (subtxn.getType()) {
+                            case OpCode.create:
+                                record = new CreateTxn();
+                                break;
+                            case OpCode.delete:
+                                record = new DeleteTxn();
+                                break;
+                            case OpCode.setData:
+                                record = new SetDataTxn();
+                                break;
+                            case OpCode.error:
+                                record = new ErrorTxn();
+                                post_failed = true;
+                                break;
+                            case OpCode.check:
+                                record = new CheckVersionTxn();
+                                break;
+                            default:
+                                throw new IOException("Invalid type of op: " + subtxn.getType());
+                        }
+                        assert(record != null);
+
+                        ZooKeeperServer.byteBuffer2Record(bb, record);
+                       
+                        if (failed && subtxn.getType() != OpCode.error){
+                            int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() 
+                                                 : Code.OK.intValue();
+
+                            subtxn.setType(OpCode.error);
+                            record = new ErrorTxn(ec);
+                        }
+
+                        if (failed) {
+                            assert(subtxn.getType() == OpCode.error) ;
+                        }
+
+                        TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
+                                                         header.getZxid(), header.getTime(), 
+                                                         subtxn.getType());
+                        ProcessTxnResult subRc = processTxn(subHdr, record);
+                        rc.multiResult.add(subRc);
+                        if (subRc.err != 0 && rc.err == 0) {
+                            rc.err = subRc.err ;
+                        }
+                    }
+                    break;
             }
         } catch (KeeperException e) {
-             LOG.debug("Failed: " + debug, e);
+             LOG.warn("Failed: " + debug, e);
              rc.err = e.code().intValue();
+        } catch (IOException e) {
+            LOG.warn("Failed:" + debug, e);
         }
         return rc;
     }

+ 50 - 1
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -26,6 +26,7 @@ import org.apache.jute.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiResponse;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.SessionMovedException;
@@ -54,6 +55,15 @@ import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
 
+import org.apache.zookeeper.MultiTransactionRecord;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.OpResult.CheckResult;
+import org.apache.zookeeper.OpResult.CreateResult;
+import org.apache.zookeeper.OpResult.DeleteResult;
+import org.apache.zookeeper.OpResult.SetDataResult;
+import org.apache.zookeeper.OpResult.ErrorResult;
+
 /**
  * This Request processor actually applies any transaction associated with a
  * request and services any queries. It is always at the end of a
@@ -151,7 +161,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
 
             KeeperException ke = request.getException();
-            if (ke != null) {
+            if (ke != null && request.type != OpCode.multi) {
                 throw ke;
             }
 
@@ -180,6 +190,39 @@ public class FinalRequestProcessor implements RequestProcessor {
                 zks.finishSessionInit(request.cnxn, true);
                 return;
             }
+            case OpCode.multi: {
+                lastOp = "MULT";
+                rsp = new MultiResponse() ;
+
+                for (ProcessTxnResult subTxnResult : rc.multiResult) {
+
+                    OpResult subResult ;
+
+                    switch (subTxnResult.type) {
+                        case OpCode.check:
+                            subResult = new CheckResult();
+                            break;
+                        case OpCode.create:
+                            subResult = new CreateResult(subTxnResult.path);
+                            break;
+                        case OpCode.delete:
+                            subResult = new DeleteResult();
+                            break;
+                        case OpCode.setData:
+                            subResult = new SetDataResult(subTxnResult.stat);
+                            break;
+                        case OpCode.error:
+                            subResult = new ErrorResult(subTxnResult.err) ;
+                            break;
+                        default:
+                            throw new IOException("Invalid type of op");
+                    }
+
+                    ((MultiResponse)rsp).add(subResult);
+                }
+
+                break;
+            }
             case OpCode.create: {
                 lastOp = "CREA";
                 rsp = new CreateResponse(rc.path);
@@ -217,6 +260,12 @@ public class FinalRequestProcessor implements RequestProcessor {
                 rsp = new SyncResponse(syncRequest.getPath());
                 break;
             }
+            case OpCode.check: {
+                lastOp = "CHEC";
+                rsp = new SetDataResponse(rc.stat);
+                err = Code.get(rc.err);
+                break;
+            }
             case OpCode.exists: {
                 lastOp = "EXIS";
                 // TODO we need to figure out the security requirement for this!

+ 243 - 51
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -18,19 +18,27 @@
 
 package org.apache.zookeeper.server;
 
+import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.Record;
+import org.apache.jute.BinaryOutputArchive;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiTransactionRecord;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs.OpCode;
@@ -42,6 +50,7 @@ import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetACLRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.proto.CheckVersionRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.server.auth.AuthenticationProvider;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
@@ -51,6 +60,9 @@ import org.apache.zookeeper.txn.DeleteTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.Txn;
+import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -161,6 +173,78 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
         }
     }
 
+    /**
+     * Grab current pending change records for each op in a multi-op.
+     * 
+     * This is used inside MultiOp error code path to rollback in the event
+     * of a failed multi-op.
+     *
+     * @param multiRequest
+     */
+    HashMap<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest) {
+    	HashMap<String, ChangeRecord> pendingChangeRecords = new HashMap<String, ChangeRecord>();
+    	
+        for(Op op: multiRequest) {
+    		String path = op.getPath();
+
+    		try {
+    			ChangeRecord cr = getRecordForPath(path);
+    			if (cr != null) {
+    				pendingChangeRecords.put(path, cr);
+    			}
+    		} catch (KeeperException.NoNodeException e) {
+    			// ignore this one
+    		}
+    	}
+        
+        return pendingChangeRecords;
+    }
+
+    /**
+     * Rollback pending changes records from a failed multi-op.
+     *
+     * If a multi-op fails, we can't leave any invalid change records we created
+     * around. We also need to restore their prior value (if any) if their prior
+     * value is still valid.
+     *
+     * @param zxid
+     * @param pendingChangeRecords
+     */
+    void rollbackPendingChanges(long zxid, HashMap<String, ChangeRecord>pendingChangeRecords) {
+
+        synchronized (zks.outstandingChanges) {
+            // Grab a list iterator starting at the END of the list so we can iterate in reverse
+            ListIterator<ChangeRecord> iter = zks.outstandingChanges.listIterator(zks.outstandingChanges.size());
+            while (iter.hasPrevious()) {
+                ChangeRecord c = iter.previous();
+                if (c.zxid == zxid) {
+                    iter.remove();
+                    zks.outstandingChangesForPath.remove(c.path);
+                } else {
+                    break;
+                }
+            }
+           
+            boolean empty = zks.outstandingChanges.isEmpty();
+            long firstZxid = 0;
+            if (!empty) {
+                firstZxid = zks.outstandingChanges.get(0).zxid;
+            }
+
+            Iterator<ChangeRecord> priorIter = pendingChangeRecords.values().iterator();
+            while (priorIter.hasNext()) {
+                ChangeRecord c = priorIter.next();
+                 
+                /* Don't apply any prior change records less than firstZxid */
+                if (!empty && (c.zxid < firstZxid)) {
+                    continue;
+                }
+
+                zks.outstandingChangesForPath.put(c.path, c);
+            }
+        }
+    }
+
     static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
             List<Id> ids) throws KeeperException.NoAuthException {
         if (skipACL) {
@@ -200,23 +284,20 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
      * This method will be called inside the ProcessRequestThread, which is a
      * singleton, so there will be a single thread calling this code.
      *
+     * @param type
+     * @param zxid
      * @param request
+     * @param record
      */
     @SuppressWarnings("unchecked")
-    protected void pRequest(Request request) {
-        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
-        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
-        TxnHeader txnHeader = null;
-        Record txn = null;
-        try {
-            switch (request.type) {
+    protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException {
+        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
+                                    zks.getTime(), type);
+
+        switch (type) {
             case OpCode.create:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.create);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                CreateRequest createRequest = new CreateRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        createRequest);
+                CreateRequest createRequest = (CreateRequest)record;
                 String path = createRequest.getPath();
                 int lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
@@ -257,28 +338,24 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                     throw new KeeperException.NoChildrenForEphemeralsException(path);
                 }
                 int newCversion = parentRecord.stat.getCversion()+1;
-                txn = new CreateTxn(path, createRequest.getData(),
+                request.txn = new CreateTxn(path, createRequest.getData(),
                         createRequest.getAcl(),
                         createMode.isEphemeral(), newCversion);
                 StatPersisted s = new StatPersisted();
                 if (createMode.isEphemeral()) {
                     s.setEphemeralOwner(request.sessionId);
                 }
-                parentRecord = parentRecord.duplicate(txnHeader.getZxid());
+                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                 parentRecord.childCount++;
                 parentRecord.stat.setCversion(newCversion);
                 addChangeRecord(parentRecord);
-                addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path, s,
+                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                         0, createRequest.getAcl()));
 
                 break;
             case OpCode.delete:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.delete);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                DeleteRequest deleteRequest = new DeleteRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        deleteRequest);
+                DeleteRequest deleteRequest = (DeleteRequest)record;
                 path = deleteRequest.getPath();
                 lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1
@@ -297,20 +374,16 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 if (nodeRecord.childCount > 0) {
                     throw new KeeperException.NotEmptyException(path);
                 }
-                txn = new DeleteTxn(path);
-                parentRecord = parentRecord.duplicate(txnHeader.getZxid());
+                request.txn = new DeleteTxn(path);
+                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                 parentRecord.childCount--;
                 addChangeRecord(parentRecord);
-                addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path,
+                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
                         null, -1, null));
                 break;
             case OpCode.setData:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.setData);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                SetDataRequest setDataRequest = new SetDataRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        setDataRequest);
+                SetDataRequest setDataRequest = (SetDataRequest)record;
                 path = setDataRequest.getPath();
                 nodeRecord = getRecordForPath(path);
                 checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
@@ -321,18 +394,14 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                     throw new KeeperException.BadVersionException(path);
                 }
                 version = currentVersion + 1;
-                txn = new SetDataTxn(path, setDataRequest.getData(), version);
-                nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
+                request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
+                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                 nodeRecord.stat.setVersion(version);
                 addChangeRecord(nodeRecord);
                 break;
             case OpCode.setACL:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.setACL);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                SetACLRequest setAclRequest = new SetACLRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        setAclRequest);
+                SetACLRequest setAclRequest = (SetACLRequest)record;
                 path = setAclRequest.getPath();
                 if (!fixupACL(request.authInfo, setAclRequest.getAcl())) {
                     throw new KeeperException.InvalidACLException(path);
@@ -346,24 +415,20 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                     throw new KeeperException.BadVersionException(path);
                 }
                 version = currentVersion + 1;
-                txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
-                nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
+                request.txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
+                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                 nodeRecord.stat.setAversion(version);
                 addChangeRecord(nodeRecord);
                 break;
             case OpCode.createSession:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.createSession);
                 request.request.rewind();
                 int to = request.request.getInt();
-                txn = new CreateSessionTxn(to);
+                request.txn = new CreateSessionTxn(to);
                 request.request.rewind();
                 zks.sessionTracker.addSession(request.sessionId, to);
                 zks.setOwner(request.sessionId, request.getOwner());
                 break;
             case OpCode.closeSession:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.closeSession);
                 // We don't want to do this check since the session expiration thread
                 // queues up this operation without being the session owner.
                 // this request is the last of the session so it should be ok
@@ -380,13 +445,142 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                         }
                     }
                     for (String path2Delete : es) {
-                        addChangeRecord(new ChangeRecord(txnHeader.getZxid(),
+                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                 path2Delete, null, 0, null));
                     }
                 }
                 LOG.info("Processed session termination for sessionid: 0x"
                         + Long.toHexString(request.sessionId));
                 break;
+            case OpCode.check:
+                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+                CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
+                path = checkVersionRequest.getPath();
+                nodeRecord = getRecordForPath(path);
+                checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ,
+                        request.authInfo);
+                version = checkVersionRequest.getVersion();
+                currentVersion = nodeRecord.stat.getVersion();
+                if (version != -1 && version != currentVersion) {
+                    throw new KeeperException.BadVersionException(path);
+                }
+                version = currentVersion + 1;
+                request.txn = new CheckVersionTxn(path, version);
+                break;
+        }
+    }
+
+    /**
+     * This method will be called inside the ProcessRequestThread, which is a
+     * singleton, so there will be a single thread calling this code.
+     *
+     * @param request
+     */
+    @SuppressWarnings("unchecked")
+    protected void pRequest(Request request) {
+        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
+        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
+        request.hdr = null;
+        request.txn = null;
+        
+        try {
+            switch (request.type) {
+                case OpCode.create:
+                CreateRequest createRequest = new CreateRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, createRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest);
+                break;
+            case OpCode.delete:
+                DeleteRequest deleteRequest = new DeleteRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, deleteRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
+                break;
+            case OpCode.setData:
+                SetDataRequest setDataRequest = new SetDataRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, setDataRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
+                break;
+            case OpCode.setACL:
+                SetACLRequest setAclRequest = new SetACLRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, setAclRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
+                break;
+            case OpCode.check:
+                CheckVersionRequest checkRequest = new CheckVersionRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, checkRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
+                break;
+            case OpCode.multi:
+                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
+                ZooKeeperServer.byteBuffer2Record(request.request, multiRequest);
+                List<Txn> txns = new ArrayList<Txn>();
+
+                //Each op in a multi-op must have the same zxid!
+                long zxid = zks.getNextZxid();
+                KeeperException ke = null;
+
+                //Store off current pending change records in case we need to rollback
+                HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
+
+                int index = 0;
+                for(Op op: multiRequest) {
+                    Record subrequest = op.toRequestRecord() ;
+
+                    /* If we've already failed one of the ops, don't bother
+                     * trying the rest as we know it's going to fail and it
+                     * would be confusing in the logfiles.
+                     */
+                    if (ke != null) {
+                        request.hdr.setType(OpCode.error);
+                        request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
+                    } 
+                    
+                    /* Prep the request and convert to a Txn */
+                    else {
+                        try {
+                            pRequest2Txn(op.getType(), zxid, request, subrequest);
+                        } catch (KeeperException e) {
+                            if (ke == null) {
+                                ke = e;
+                            }
+                            request.hdr.setType(OpCode.error);
+                            request.txn = new ErrorTxn(e.code().intValue());
+                            LOG.error(">>>> Got user-level KeeperException when processing "
+                                    + request.toString()
+                                    + " Error Path:" + e.getPath()
+                                    + " Error:" + e.getMessage());
+                            LOG.error(">>>> ABORTING remaing MultiOp ops");
+                            request.setException(e);
+
+                            /* Rollback change records from failed multi-op */
+                            rollbackPendingChanges(zxid, pendingChanges);
+                        }
+                    }
+
+                    //FIXME: I don't want to have to serialize it here and then
+                    //       immediately deserialize in next processor. But I'm 
+                    //       not sure how else to get the txn stored into our list.
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                    request.txn.serialize(boa, "request") ;
+                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+                    txns.add(new Txn(request.hdr.getType(), bb.array()));
+                    index++;
+                }
+
+                request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
+                request.txn = new MultiTxn(txns);
+                
+                break;
+
+            //create/close session don't require request record
+            case OpCode.createSession:
+            case OpCode.closeSession:
+                pRequest2Txn(request.type, zks.getNextZxid(), request, null);
+                break;
+ 
+            //All the rest don't need to create a Txn - just verify session
             case OpCode.sync:
             case OpCode.exists:
             case OpCode.getData:
@@ -400,9 +594,9 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 break;
             }
         } catch (KeeperException e) {
-            if (txnHeader != null) {
-                txnHeader.setType(OpCode.error);
-                txn = new ErrorTxn(e.code().intValue());
+            if (request.hdr != null) {
+                request.hdr.setType(OpCode.error);
+                request.txn = new ErrorTxn(e.code().intValue());
             }
             LOG.info("Got user-level KeeperException when processing "
                     + request.toString()
@@ -426,13 +620,11 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             }
 
             LOG.error("Dumping request buffer: 0x" + sb.toString());
-            if (txnHeader != null) {
-                txnHeader.setType(OpCode.error);
-                txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
+            if (request.hdr != null) {
+                request.hdr.setType(OpCode.error);
+                request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
             }
         }
-        request.hdr = txnHeader;
-        request.txn = txn;
         request.zxid = zks.getZxid();
         nextProcessor.processRequest(request);
     }

+ 8 - 0
src/java/main/org/apache/zookeeper/server/Request.java

@@ -106,6 +106,8 @@ public class Request {
         case OpCode.createSession:
         case OpCode.exists:
         case OpCode.getData:
+        case OpCode.check:
+        case OpCode.multi:
         case OpCode.setData:
         case OpCode.sync:
         case OpCode.getACL:
@@ -136,6 +138,8 @@ public class Request {
         case OpCode.delete:
         case OpCode.setACL:
         case OpCode.setData:
+        case OpCode.check:
+        case OpCode.multi:
             return true;
         default:
             return false;
@@ -156,6 +160,10 @@ public class Request {
             return "exists";
         case OpCode.getData:
             return "getData";
+        case OpCode.check:
+            return "check";
+        case OpCode.multi:
+            return "multi";
         case OpCode.setData:
             return "setData";
         case OpCode.sync:

+ 1 - 1
src/java/main/org/apache/zookeeper/server/RequestProcessor.java

@@ -21,7 +21,7 @@ package org.apache.zookeeper.server;
 /**
  * RequestProcessors are chained together to process transactions. Requests are
  * always processed in order. The standalone server, follower, and leader all
- * have slightly different RequestProcessors changed together.
+ * have slightly different RequestProcessors chained together.
  * 
  * Requests always move forward through the chain of RequestProcessors. Requests
  * are passed to a RequestProcessor through processRequest(). Generally method

+ 2 - 0
src/java/main/org/apache/zookeeper/server/TraceFormatter.java

@@ -43,6 +43,8 @@ public class TraceFormatter {
             return "getDate";
         case OpCode.setData:
             return "setData";
+        case OpCode.multi:
+            return "multi";
         case OpCode.getACL:
             return "getACL";
         case OpCode.setACL:

+ 6 - 6
src/java/main/org/apache/zookeeper/server/package.html

@@ -30,12 +30,12 @@ ZooKeeper maintains a order when processing requests:
 </ul>
 <p>
 We will explain the three aspects of ZooKeeperServer: request processing, data
-structure maintence, and session tracking.
+structure maintenance, and session tracking.
 
 <h2>Request processing</h2>
 
 Requests are received by the ServerCnxn. Demarshalling of a request is
-done by ClientRequestHandler. After a request has be Demarshalled,
+done by ClientRequestHandler. After a request has been demarshalled,
 ClientRequestHandler invokes the relevant method in ZooKeeper and marshals
 the result.
 <p>
@@ -51,10 +51,10 @@ txnQueue of SyncThread via queueItem. When the transaction has been synced to
 disk, its callback will be invoked which will cause the request processing to
 be completed.
 
-<h2>Data structure maintence</h2>
+<h2>Data structure maintenance</h2>
 
 ZooKeeper data is stored in-memory. Each znode is stored in a DataNode object.
-This object is accessed through a hashtable that maps paths to DataNodes.
+This object is accessed through a hash table that maps paths to DataNodes.
 DataNodes also organize themselves into a tree. This tree is only used for
 serializing nodes.
 <p>
@@ -66,7 +66,7 @@ at any point, we need to be careful of partial records.
 <p>
 We address the above problems by
 <ul>
-<li>Preallocating 1M chunks of file space. This allows us to append to the
+<li>Pre-allocating 1M chunks of file space. This allows us to append to the
 file without causing seeks to update file size. It also means that we need
 to check for the end of the log by looking for a zero length transaction
 rather than simply end of file.
@@ -84,7 +84,7 @@ and l as the sequence of transactions that are applied to the tree between
 the time the snapshot begins and the time the snapshot completes, we write
 to disk T+l' where l' is a subset of the transactions in l. While we do not
 have a way of figuring out which transactions make up l', it doesn't really
-matter. T+l'+l = T+l since the transactions we log are idepotent (applying
+matter. T+l'+l = T+l since the transactions we log are idempotent (applying
 the transaction multiple times has the same result as applying the transaction
 once). So when we restore the snapshot we also play all transactions in the log
 that occur after the snapshot was begun. We can easily figure out where to

+ 1 - 0
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -123,6 +123,7 @@ public class CommitProcessor extends Thread implements RequestProcessor {
                         case OpCode.create:
                         case OpCode.delete:
                         case OpCode.setData:
+                        case OpCode.multi:
                         case OpCode.setACL:
                         case OpCode.createSession:
                         case OpCode.closeSession:

+ 6 - 0
src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java

@@ -43,6 +43,7 @@ import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.txn.MultiTxn;
 
 public class SerializeUtils {
     private static final Logger LOG = LoggerFactory.getLogger(SerializeUtils.class);
@@ -78,6 +79,11 @@ public class SerializeUtils {
         case OpCode.error:
             txn = new ErrorTxn();
             break;
+        case OpCode.multi:
+            txn = new MultiTxn();
+            break;
+        default:
+            throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
         }
         if (txn != null) {
             try {

+ 72 - 0
src/java/test/org/apache/zookeeper/MultiResponseTest.java

@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import junit.framework.TestCase;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class MultiResponseTest extends TestCase {
+    public void testRoundTrip() throws IOException {
+        MultiResponse response = new MultiResponse();
+
+        response.add(new OpResult.CheckResult());
+        response.add(new OpResult.CreateResult("foo-bar"));
+        response.add(new OpResult.DeleteResult());
+
+        Stat s = new Stat();
+        s.setCzxid(546);
+        response.add(new OpResult.SetDataResult(s));
+
+        MultiResponse decodedResponse = codeDecode(response);
+
+        assertEquals(response, decodedResponse);
+        assertEquals(response.hashCode(), decodedResponse.hashCode());
+    }
+
+    @Test
+    public void testEmptyRoundTrip() throws IOException {
+        MultiResponse result = new MultiResponse();
+        MultiResponse decodedResult = codeDecode(result);
+
+        assertEquals(result, decodedResult);
+        assertEquals(result.hashCode(), decodedResult.hashCode());
+    }
+
+    private MultiResponse codeDecode(MultiResponse request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        request.serialize(boa, "result");
+        baos.close();
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        bb.rewind();
+
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+        MultiResponse decodedRequest = new MultiResponse();
+        decodedRequest.deserialize(bia, "result");
+        return decodedRequest;
+    }
+
+}

+ 50 - 0
src/java/test/org/apache/zookeeper/MultiTransactionRecordTest.java

@@ -0,0 +1,50 @@
+package org.apache.zookeeper;
+
+import junit.framework.TestCase;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class MultiTransactionRecordTest extends TestCase {
+    @Test
+    public void testRoundTrip() throws IOException {
+        MultiTransactionRecord request = new MultiTransactionRecord();
+        request.add(Op.check("check", 1));
+        request.add(Op.create("create", "create data".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, ZooDefs.Perms.ALL));
+        request.add(Op.delete("delete", 17));
+        request.add(Op.setData("setData", "set data".getBytes(), 19));
+
+        MultiTransactionRecord decodedRequest = codeDecode(request);
+
+        assertEquals(request, decodedRequest);
+        assertEquals(request.hashCode(), decodedRequest.hashCode());
+    }
+
+    @Test
+    public void testEmptyRoundTrip() throws IOException {
+        MultiTransactionRecord request = new MultiTransactionRecord();
+        MultiTransactionRecord decodedRequest = codeDecode(request);
+
+        assertEquals(request, decodedRequest);
+        assertEquals(request.hashCode(), decodedRequest.hashCode());
+    }
+
+    private MultiTransactionRecord codeDecode(MultiTransactionRecord request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        request.serialize(boa, "request");
+        baos.close();
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        bb.rewind();
+
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+        MultiTransactionRecord decodedRequest = new MultiTransactionRecord();
+        decodedRequest.deserialize(bia, "request");
+        return decodedRequest;
+    }
+}

+ 114 - 0
src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java

@@ -0,0 +1,114 @@
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Learner;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LearnerTest extends ZKTestCase {
+	class SimpleLearnerZooKeeperServer extends LearnerZooKeeperServer {
+		boolean startupCalled;
+		
+		public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
+			super(ftsl, 2000, 2000, 2000, null, new ZKDatabase(ftsl), null);
+		}
+		Learner learner;
+		@Override
+		public Learner getLearner() {
+			return learner;
+		}
+		
+		@Override
+		public void startup() {
+			startupCalled = true;
+		}
+	}
+	class SimpleLearner extends Learner {
+		SimpleLearner(FileTxnSnapLog ftsl) throws IOException {
+			self = new QuorumPeer();
+			zk = new SimpleLearnerZooKeeperServer(ftsl);
+			((SimpleLearnerZooKeeperServer)zk).learner = this;
+		}
+	}
+	static private void recursiveDelete(File dir) {
+		if (dir == null || !dir.exists()) {
+			return;
+		}
+		if (!dir.isDirectory()) {
+			dir.delete();
+		}
+		for(File child: dir.listFiles()) {
+			recursiveDelete(child);
+		}
+	}
+	@Test
+	public void syncTest() throws Exception {
+		File tmpFile = File.createTempFile("test", ".dir");
+		tmpFile.delete();
+		try {
+			FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile);
+			SimpleLearner sl = new SimpleLearner(ftsl);
+			long startZxid = sl.zk.getLastProcessedZxid();
+			
+			// Set up bogus streams
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos);
+			sl.leaderOs = BinaryOutputArchive.getArchive(new ByteArrayOutputStream());
+			
+			// make streams and socket do something innocuous
+			sl.bufferedOutput = new BufferedOutputStream(System.out);
+			sl.sock = new Socket();
+			
+			// fake messages from the server
+			QuorumPacket qp = new QuorumPacket(Leader.SNAP, 0, null, null);
+			oa.writeRecord(qp, null);
+			sl.zk.getZKDatabase().serializeSnapshot(oa);
+			oa.writeString("BenWasHere", "signature");
+			TxnHeader hdr = new TxnHeader(0, 0, 0, 0, ZooDefs.OpCode.create);
+			CreateTxn txn = new CreateTxn("/foo", new byte[0], new ArrayList<ACL>(), false, sl.zk.getZKDatabase().getNode("/").stat.getCversion());
+	        ByteArrayOutputStream tbaos = new ByteArrayOutputStream();
+	        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(tbaos);
+	        hdr.serialize(boa, "hdr");
+	        txn.serialize(boa, "txn");
+	        tbaos.close();
+			qp = new QuorumPacket(Leader.PROPOSAL, 1, tbaos.toByteArray(), null);
+			oa.writeRecord(qp, null);
+
+			// setup the messages to be streamed to follower
+			sl.leaderIs = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray()));
+			
+			try {
+				sl.syncWithLeader(3);
+			} catch(EOFException e) {}
+			
+			sl.zk.shutdown();
+			sl = new SimpleLearner(ftsl);
+			Assert.assertEquals(startZxid, sl.zk.getLastProcessedZxid());
+		} finally {
+			recursiveDelete(tmpFile);
+		}
+	}
+}

+ 289 - 0
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig

@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.io.*;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CnxManagerTest extends ZKTestCase {
+    protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
+    protected static final int THRESHOLD = 4;
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+
+    @Before
+    public void setUp() throws Exception {
+
+        this.count = 3;
+        this.peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                    new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+    }
+
+    ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+        byte requestBytes[] = new byte[28];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(epoch);
+
+        return requestBuffer;
+    }
+
+    class CnxManagerThread extends Thread {
+
+        boolean failed;
+        CnxManagerThread(){
+            failed = false;
+        }
+
+        public void run(){
+            try {
+                QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
+                QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+                QuorumCnxManager.Listener listener = cnxManager.listener;
+                if(listener != null){
+                    listener.start();
+                } else {
+                    LOG.error("Null listener when initializing cnx manager");
+                }
+
+                long sid = 1;
+                cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+
+                Message m = null;
+                int numRetries = 1;
+                while((m == null) && (numRetries++ <= THRESHOLD)){
+                    m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                    if(m == null) cnxManager.connectAll();
+                }
+
+                if(numRetries > THRESHOLD){
+                    failed = true;
+                    return;
+                }
+
+                cnxManager.testInitiateConnection(sid);
+
+                m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                if(m == null){
+                    failed = true;
+                    return;
+                }
+            } catch (Exception e) {
+                LOG.error("Exception while running mock thread", e);
+                Assert.fail("Unexpected exception");
+            }
+        }
+    }
+
+    @Test
+    public void testCnxManager() throws Exception {
+        CnxManagerThread thread = new CnxManagerThread();
+
+        thread.start();
+
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+
+        Message m = null;
+        int numRetries = 1;
+        while((m == null) && (numRetries++ <= THRESHOLD)){
+            m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+            if(m == null) cnxManager.connectAll();
+        }
+
+        Assert.assertTrue("Exceeded number of retries", numRetries <= THRESHOLD);
+
+        thread.join(5000);
+        if (thread.isAlive()) {
+            Assert.fail("Thread didn't join");
+        } else {
+            if(thread.failed)
+                Assert.fail("Did not receive expected message");
+        }
+        
+    }
+
+    @Test
+    public void testCnxManagerTimeout() throws Exception {
+        Random rand = new Random();
+        byte b = (byte) rand.nextInt();
+        int deadPort = PortAssignment.unique();
+        String deadAddress = new String("10.1.1." + b);
+            
+        LOG.info("This is the dead address I'm trying: " + deadAddress);
+            
+        peers.put(Long.valueOf(2),
+                new QuorumServer(2,
+                        new InetSocketAddress(deadAddress, deadPort),
+                        new InetSocketAddress(deadAddress, PortAssignment.unique())));
+        tmpdir[2] = ClientBase.createTmpDir();
+        port[2] = deadPort;
+            
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        long begin = System.currentTimeMillis();
+        cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+        long end = System.currentTimeMillis();
+            
+        if((end - begin) > 6000) Assert.fail("Waited more than necessary");
+        
+    }       
+    
+    /**
+     * Tests a bug in QuorumCnxManager that causes a spin lock
+     * when a negative value is sent. This test checks if the 
+     * connection is being closed upon a message with negative
+     * length.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerSpinLock() throws Exception {               
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        
+        Thread.sleep(1000);
+        
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000);
+        
+        /*
+         * Write id first then negative length.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(new Long(2));
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        msgBuffer = ByteBuffer.wrap(new byte[4]);
+        msgBuffer.putInt(-20);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        Thread.sleep(1000);
+        
+        try{
+            /*
+             * Write a number of times until it
+             * detects that the socket is broken.
+             */
+            for(int i = 0; i < 100; i++){
+                msgBuffer.position(0);
+                sc.write(msgBuffer);
+            }
+            Assert.fail("Socket has not been closed");
+        } catch (Exception e) {
+            LOG.info("Socket has been closed as expected");
+        }
+    }
+
+    /*
+     * Test if a receiveConnection is able to timeout on socket errors
+     */
+    @Test
+    public void testSocketTimeout() throws Exception {
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2000, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        Thread.sleep(1000);
+        
+        Socket sock = new Socket();
+        sock.connect(peers.get(new Long(1)).electionAddr, 5000);
+        long begin = System.currentTimeMillis();
+        // Read without sending data. Verify timeout.
+        cnxManager.receiveConnection(sock);
+        long end = System.currentTimeMillis();
+        if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary");
+    }
+}

+ 230 - 0
src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java

@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.OpResult.ErrorResult;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+public class MultiTransactionTest extends ZKTestCase implements Watcher {
+    private static final Logger LOG = Logger.getLogger(MultiTransactionTest.class);
+
+    private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+
+    private ZooKeeper zk;
+    private ServerCnxnFactory serverFactory;
+
+    @Override
+    public void process(WatchedEvent event) {
+        // ignore
+    }
+
+    @Before
+    public void setupZk() throws Exception {
+        File tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
+        SyncRequestProcessor.setSnapCount(150);
+        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        serverFactory = ServerCnxnFactory.createFactory(PORT, -1);
+        serverFactory.startup(zks);
+        LOG.info("starting up the zookeeper server .. waiting");
+        Assert.assertTrue("waiting for server being up",
+                ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+        zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+    }
+
+    @After
+    public void shutdownServer() throws Exception {
+        zk.close();
+        serverFactory.shutdown();
+    }
+
+    @Test
+    public void testCreate() throws Exception {
+        List<OpResult> results = new ArrayList<OpResult>();
+
+        results = zk.multi(Arrays.asList(
+                Op.create("/multi0", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.create("/multi1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.create("/multi2", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+                ));
+        zk.getData("/multi0", false, null);
+        zk.getData("/multi1", false, null);
+        zk.getData("/multi2", false, null);
+    }
+    
+    @Test
+    public void testCreateDelete() throws Exception {
+
+        zk.multi(Arrays.asList(
+                Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.delete("/multi", 0)
+                ));
+
+        // '/multi' should have been deleted
+        Assert.assertNull(zk.exists("/multi", null));
+    }
+
+    @Test
+    public void testInvalidVersion() throws Exception {
+
+        try {
+            zk.multi(Arrays.asList(
+                    Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                    Op.delete("/multi", 1)
+            ));
+            Assert.fail("delete /multi should have failed");
+        } catch (KeeperException e) {
+            /* PASS */
+        }
+    }
+
+    @Test
+    public void testNestedCreate() throws Exception {
+
+        zk.multi(Arrays.asList(
+                /* Create */
+                Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.create("/multi/a", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.create("/multi/a/1", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+
+                /* Delete */
+                Op.delete("/multi/a/1", 0),
+                Op.delete("/multi/a", 0),
+                Op.delete("/multi", 0)
+                ));
+
+        //Verify tree deleted
+        Assert.assertNull(zk.exists("/multi/a/1", null));
+        Assert.assertNull(zk.exists("/multi/a", null));
+        Assert.assertNull(zk.exists("/multi", null));
+    }
+
+    @Test
+    public void testSetData() throws Exception {
+
+        String[] names = {"/multi0", "/multi1", "/multi2"};
+        List<Op> ops = new ArrayList<Op>();
+
+        for (int i = 0; i < names.length; i++) {
+            ops.add(Op.create(names[i], new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+            ops.add(Op.setData(names[i], names[i].getBytes(), 0));
+        }
+
+        zk.multi(ops) ;
+
+        for (int i = 0; i < names.length; i++) {
+            Assert.assertArrayEquals(names[i].getBytes(), zk.getData(names[i], false, null));
+        }
+    }
+
+    @Test
+    public void testUpdateConflict() throws Exception {
+    
+        Assert.assertNull(zk.exists("/multi", null));
+        
+        try {
+            zk.multi(Arrays.asList(
+                    Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                    Op.setData("/multi", "X".getBytes(), 0),
+                    Op.setData("/multi", "Y".getBytes(), 0)
+                    ));
+            Assert.fail("Should have thrown a KeeperException for invalid version");
+        } catch (KeeperException e) {
+            //PASS
+            LOG.error("STACKTRACE: " + e);
+        }
+
+        Assert.assertNull(zk.exists("/multi", null));
+
+        //Updating version solves conflict -- order matters
+        zk.multi(Arrays.asList(
+                Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.setData("/multi", "X".getBytes(), 0),
+                Op.setData("/multi", "Y".getBytes(), 1)
+                ));
+
+        Assert.assertArrayEquals(zk.getData("/multi", false, null), "Y".getBytes());
+    }
+
+    @Test
+    public void TestDeleteUpdateConflict() throws Exception {
+
+        /* Delete of a node folowed by an update of the (now) deleted node */
+        try {
+            zk.multi(Arrays.asList(
+                Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                Op.delete("/multi", 0),
+                Op.setData("/multi", "Y".getBytes(), 0)
+                ));
+            Assert.fail("/multi should have been deleted so setData should have failed");
+        } catch (KeeperException e) {
+            /* PASS */
+        }
+
+        // '/multi' should never have been created as entire op should fail
+        Assert.assertNull(zk.exists("/multi", null)) ;
+    }
+
+    @Test
+    public void TestGetResults() throws Exception {
+        /* Delete of a node folowed by an update of the (now) deleted node */
+        try {
+            zk.multi(Arrays.asList(
+                    Op.create("/multi", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                    Op.delete("/multi", 0),
+                    Op.setData("/multi", "Y".getBytes(), 0),
+                    Op.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+            ));
+            Assert.fail("/multi should have been deleted so setData should have failed");
+        } catch (KeeperException e) {
+            // '/multi' should never have been created as entire op should fail
+            Assert.assertNull(zk.exists("/multi", null));
+
+            for (OpResult r : e.getResults()) {
+                LOG.info("RESULT==> " + r);
+                if (r instanceof ErrorResult) {
+                    ErrorResult er = (ErrorResult) r;
+                    LOG.info("ERROR RESULT: " + er + " ERR=>" + KeeperException.Code.get(er.getErr()));
+                }
+            }
+        }
+    }
+
+
+
+}

+ 24 - 8
src/zookeeper.jute

@@ -21,7 +21,7 @@ module org.apache.zookeeper.data {
         ustring scheme;
         ustring id;
     }
-   class ACL {
+    class ACL {
         int perms;
         Id id;
     }
@@ -66,12 +66,6 @@ module org.apache.zookeeper.data {
 }
 
 module org.apache.zookeeper.proto {
-    class op_result_t {
-        int rc;
-        int op;
-        buffer response;
-    }
-
     class ConnectRequest {
         int protocolVersion;
         long lastZxidSeen;
@@ -95,6 +89,11 @@ module org.apache.zookeeper.proto {
         int xid;
         int type;
     }
+    class MultiHeader {
+        int type;
+        boolean done;
+        int err;
+    }
     class AuthPacket {
         int type;
         ustring scheme;
@@ -135,6 +134,10 @@ module org.apache.zookeeper.proto {
         ustring path;
         boolean watch;
     }
+    class CheckVersionRequest {
+        ustring path;
+        int version;
+    }
     class GetMaxChildrenRequest {
         ustring path;
     }
@@ -167,7 +170,9 @@ module org.apache.zookeeper.proto {
         int state; // state of the Keeper client runtime
         ustring path;
     }
-
+    class ErrorResponse {
+        int err;
+    }
     class CreateResponse {
         ustring path;
     }
@@ -245,6 +250,10 @@ module org.apache.zookeeper.txn {
         buffer data;
         int version;
     }
+    class CheckVersionTxn {
+        ustring path;
+        int version;
+    }
     class SetACLTxn {
         ustring path;
         vector<org.apache.zookeeper.data.ACL> acl;
@@ -260,4 +269,11 @@ module org.apache.zookeeper.txn {
     class ErrorTxn {
         int err;
     }
+    class Txn {
+        int type;
+        buffer data;
+    }
+    class MultiTxn {
+        vector<org.apache.zookeeper.txn.Txn> txns;
+    }
 }

Some files were not shown because too many files changed in this diff