Jelajahi Sumber

ZOOKEEPER-1046: Creating a new sequential node results in a ZNODEEXISTS error

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1125581 13f79535-47bb-0310-9956-ffa450edef68
Camille Fournier 14 tahun lalu
induk
melakukan
a4414196b3

+ 2 - 0
CHANGES.txt

@@ -214,6 +214,8 @@ BUGFIXES:
   ZOOKEEPER-1059. stat command isses on non-existing node causes NPE. (Bhallamudi Kamesh via mahadev)
   
   ZOOKEEPER-1058. fix typo in opToString for getData. (camille)
+  
+  ZOOKEEPER-1046. Creating a new sequential node results in a ZNODEEXISTS error. (Vishal K via camille) 
 
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 

+ 33 - 6
src/java/main/org/apache/zookeeper/server/DataTree.java

@@ -764,17 +764,18 @@ public class DataTree {
                 case OpCode.create:
                     CreateTxn createTxn = (CreateTxn) txn;
                     debug = "Create transaction for " + createTxn.getPath();
+                    rc.path = createTxn.getPath();
                     createNode(
                             createTxn.getPath(),
                             createTxn.getData(),
                             createTxn.getAcl(),
                             createTxn.getEphemeral() ? header.getClientId() : 0,
                             header.getZxid(), header.getTime());
-                    rc.path = createTxn.getPath();
                     break;
                 case OpCode.delete:
                     DeleteTxn deleteTxn = (DeleteTxn) txn;
                     debug = "Delete transaction for " + deleteTxn.getPath();
+                    rc.path = deleteTxn.getPath();
                     deleteNode(deleteTxn.getPath(), header.getZxid());
                     break;
                 case OpCode.setData:
@@ -801,11 +802,8 @@ public class DataTree {
                     break;
             }
         } catch (KeeperException e) {
-            // These are expected errors since we take a lazy snapshot
-            if (initialized
-                    || (e.code() != Code.NONODE && e.code() != Code.NODEEXISTS)) {
-                LOG.warn("Failed:" + debug, e);
-            }
+             LOG.debug("Failed: " + debug, e);
+             rc.err = e.code().intValue();
         }
         return rc;
     }
@@ -1192,4 +1190,33 @@ public class DataTree {
             }
         }
     }
+
+     /**
+      * If the znode for the specified path is found, then this method
+      * increments the cversion and sets its pzxid to the zxid passed
+      * in the second argument. A NoNodeException is thrown if the znode is
+      * not found.
+      *
+      * @param path
+      *     Full path to the znode whose cversion needs to be incremented.
+      *     A "/" at the end of the path is ignored.
+      * @param zxid
+      *     Value to be assigned to pzxid
+      * @throws KeeperException.NoNodeException
+      *     If znode not found.
+      **/
+    public void incrementCversion(String path, long zxid)
+        throws KeeperException.NoNodeException {
+        if (path.endsWith("/")) {
+           path = path.substring(0, path.length() - 1);
+        }
+        DataNode node = nodes.get(path);
+        if (node == null) {
+            throw new KeeperException.NoNodeException(path);
+        }
+        synchronized (node) {
+            node.stat.setCversion(node.stat.getCversion() + 1);
+            node.stat.setPzxid(zxid);
+        }
+    }
 }

+ 53 - 7
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -34,6 +34,11 @@ import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This is a helper class 
@@ -142,7 +147,12 @@ public class FileTxnSnapLog {
             } else {
                 highestZxid = hdr.getZxid();
             }
-            processTransaction(hdr,dt,sessions, itr.getTxn());
+            try {
+                processTransaction(hdr,dt,sessions, itr.getTxn());
+            } catch(KeeperException.NoNodeException e) {
+               throw new IOException("Failed to process transaction type: " +
+                     hdr.getType() + " error: " + e.getMessage());
+            }
             listener.onTxnLoaded(hdr, itr.getTxn());
             if (!itr.next()) 
                 break;
@@ -157,8 +167,10 @@ public class FileTxnSnapLog {
      * @param sessions the sessions to be restored
      * @param txn the transaction to be applied
      */
-    private void processTransaction(TxnHeader hdr,DataTree dt,
-            Map<Long, Integer> sessions, Record txn){
+    public void processTransaction(TxnHeader hdr,DataTree dt,
+            Map<Long, Integer> sessions, Record txn)
+        throws KeeperException.NoNodeException {
+        ProcessTxnResult rc;
         switch (hdr.getType()) {
         case OpCode.createSession:
             sessions.put(hdr.getClientId(),
@@ -171,7 +183,7 @@ public class FileTxnSnapLog {
                                 + ((CreateSessionTxn) txn).getTimeOut());
             }
             // give dataTree a chance to sync its lastProcessedZxid
-            dt.processTxn(hdr, txn);
+            rc = dt.processTxn(hdr, txn);
             break;
         case OpCode.closeSession:
             sessions.remove(hdr.getClientId());
@@ -180,11 +192,45 @@ public class FileTxnSnapLog {
                         "playLog --- close session in log: "
                                 + Long.toHexString(hdr.getClientId()));
             }
-            dt.processTxn(hdr, txn);
+            rc = dt.processTxn(hdr, txn);
             break;
         default:
-            dt.processTxn(hdr, txn);
-        }        
+            rc = dt.processTxn(hdr, txn);
+        }
+
+        /**
+         * Snapshots are taken lazily. It can happen that the child
+         * znodes of a parent are modified (deleted or created) after the parent
+         * is serialized. Therefore, while replaying logs during restore, a
+         * delete/create might fail because the node was already
+         * deleted/created.
+         *
+         * After seeing this failure, we should increment
+         * the cversion of the parent znode since the parent was serialized
+         * before its children.
+         *
+         * Note, such failures on DT should be seen only during
+         * restore.
+         */
+        if ((hdr.getType() == OpCode.delete &&
+                 rc.err == Code.NONODE.intValue()) ||
+            (hdr.getType() == OpCode.create &&
+                rc.err == Code.NODEEXISTS.intValue())) {
+            LOG.debug("Failed Txn: " + hdr.getType() + " path:" +
+                  rc.path + " err: " + rc.err);
+            int lastSlash = rc.path.lastIndexOf('/');
+            String parentName = rc.path.substring(0, lastSlash);
+            try {
+                dt.incrementCversion(parentName, hdr.getZxid());
+            } catch (KeeperException.NoNodeException e) {
+                LOG.error("Failed to increment parent cversion for: " +
+                      parentName, e);
+                throw e;
+            }
+        } else if (rc.err != Code.OK.intValue()) {
+            LOG.debug("Ignoring processTxn failure hdr: " + hdr.getType() +
+                  " : error: " + rc.err);
+        }
     }
     
     /**

+ 18 - 0
src/java/test/org/apache/zookeeper/test/DataTreeTest.java

@@ -29,6 +29,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.zookeeper.server.DataNode;
 
 public class DataTreeTest extends ZKTestCase {
     protected static final Logger LOG = LoggerFactory.getLogger(DataTreeTest.class);
@@ -62,4 +63,21 @@ public class DataTreeTest extends ZKTestCase {
         Assert.assertFalse("Root node watch not triggered",!watcher.fired);
     }
 
+    /**
+     * For ZOOKEEPER-1046 test if cversion is getting incremented correctly.
+     */
+    @Test
+    public void testIncrementCversion() throws Exception {
+        dt.createNode("/test", new byte[0], null, 0, 1, 1);
+        DataNode zk = dt.getNode("/test");
+        long prevCversion = zk.stat.getCversion();
+        long prevPzxid = zk.stat.getPzxid();
+        dt.incrementCversion("/test/",  prevPzxid + 1);
+        long newCversion = zk.stat.getCversion();
+        long newPzxid = zk.stat.getPzxid();
+        Assert.assertTrue("<cversion, pzxid> verification failed. Expected: <" +
+                (prevCversion + 1) + ", " + (prevPzxid + 1) + ">, found: <" +
+                newCversion + ", " + newPzxid + ">",
+                (newCversion == prevCversion + 1 && newPzxid == prevPzxid + 1));
+    }
 }

+ 81 - 0
src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java

@@ -21,6 +21,8 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.WatchedEvent;
@@ -37,11 +39,19 @@ import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.DeleteTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.jute.Record;
 
 public class LoadFromLogTest extends ZKTestCase implements  Watcher {
     private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
     private static final int CONNECTION_TIMEOUT = 3000;
     private static final int NUM_MESSAGES = 300;
+    protected static final Logger LOG = LoggerFactory.getLogger(LoadFromLogTest.class);
 
     // setting up the quorum has a transaction overhead for creating and closing the session
     private static final int TRANSACTION_OVERHEAD = 2;	
@@ -104,4 +114,75 @@ public class LoadFromLogTest extends ZKTestCase implements  Watcher {
         // do nothing
     }
 
+    /**
+     * For ZOOKEEPER-1046. Verify if cversion and pzxid if incremented
+     * after create/delete failure during restore.
+     */
+    @Test
+    public void testTxnFailure() throws Exception {
+        long count = 1;
+        File tmpDir = ClientBase.createTmpDir();
+        FileTxnSnapLog logFile = new FileTxnSnapLog(tmpDir, tmpDir);
+        DataTree dt = new DataTree();
+        dt.createNode("/test", new byte[0], null, 0, 1, 1);
+        for (count = 1; count <= 3; count++) {
+            dt.createNode("/test/" + count, new byte[0], null, 0, count,
+                    System.currentTimeMillis());
+        }
+        DataNode zk = dt.getNode("/test");
+
+        // Make create to fail, then verify cversion.
+        LOG.info("Attempting to create " + "/test/" + (count - 1));
+        doOp(logFile, OpCode.create, "/test/" + (count - 1), dt, zk);
+
+        // Make delete fo fail, then verify cversion.
+        LOG.info("Attempting to delete " + "/test/" + (count + 1));
+        doOp(logFile, OpCode.delete, "/test/" + (count + 1), dt, zk);
+    }
+    /*
+     * Does create/delete depending on the type and verifies
+     * if cversion before the operation is 1 less than cversion afer.
+     */
+    private void doOp(FileTxnSnapLog logFile, int type, String path,
+            DataTree dt, DataNode parent) throws Exception {
+        int lastSlash = path.lastIndexOf('/');
+        String parentName = path.substring(0, lastSlash);
+
+        long prevCversion = parent.stat.getCversion();
+        long prevPzxid = parent.stat.getPzxid();
+        List<String> child = dt.getChildren(parentName, null, null);
+        String childStr = "";
+        for (String s : child) {
+            childStr += s + " ";
+        }
+        LOG.info("Children: " + childStr + " for " + parentName);
+        LOG.info("(cverions, pzxid): " + prevCversion + ", " + prevPzxid);
+
+        Record txn = null;
+        TxnHeader txnHeader = null;
+        if (type == OpCode.delete) {
+            txn = new DeleteTxn(path);
+            txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1,
+                System.currentTimeMillis(), OpCode.delete);
+        } else if (type == OpCode.create) {
+            txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1,
+                    System.currentTimeMillis(), OpCode.create);
+            txn = new CreateTxn(path, new byte[0], null, false);
+        }
+        logFile.processTransaction(txnHeader, dt, null, txn);
+
+        long newCversion = parent.stat.getCversion();
+        long newPzxid = parent.stat.getPzxid();
+        child = dt.getChildren(parentName, null, null);
+        childStr = "";
+        for (String s : child) {
+            childStr += s + " ";
+        }
+        LOG.info("Children: " + childStr + " for " + parentName);
+        LOG.info("(cverions, pzxid): " +newCversion + ", " + newPzxid);
+        Assert.assertTrue("<cversion, pzxid> verification failed. Expected: <" +
+                (prevCversion + 1) + ", " + (prevPzxid + 1) + ">, found: <" +
+                newCversion + ", " + newPzxid + ">",
+                (newCversion == prevCversion + 1 && newPzxid == prevPzxid + 1));
+    }
 }