Pārlūkot izejas kodu

ZOOKEEPER-231. Quotas in ZooKeeper. (mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@740216 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 16 gadi atpakaļ
vecāks
revīzija
a4c675afff

+ 2 - 0
CHANGES.txt

@@ -154,6 +154,8 @@ flavio via mahadev)
 NEW FEATURES:
 NEW FEATURES:
 
 
    ZOOKEEPER-276. Bookkeeper contribution (Flavio and Luca Telloli via mahadev)
    ZOOKEEPER-276. Bookkeeper contribution (Flavio and Luca Telloli via mahadev)
+  
+   ZOOKEEPER-231. Quotas in ZooKeeper. (mahadev)
 
 
 Release 3.0.0 - 2008-10-21
 Release 3.0.0 - 2008-10-21
 
 

+ 1 - 0
src/docs/src/documentation/content/xdocs/index.xml

@@ -61,6 +61,7 @@
       <li><strong>Administrators &amp; Operators</strong> <p> Documents for Administrators and Operations Engineers of ZooKeeper Deployments</p>
       <li><strong>Administrators &amp; Operators</strong> <p> Documents for Administrators and Operations Engineers of ZooKeeper Deployments</p>
       <ul>
       <ul>
       <li><a href="zookeeperAdmin.html">Administrator's Guide</a> - a guide for system administrators and anyone else who might deploy ZooKeeper</li>
       <li><a href="zookeeperAdmin.html">Administrator's Guide</a> - a guide for system administrators and anyone else who might deploy ZooKeeper</li>
+      <li><a href="zookeeperQuotas.html">Quota Guide</a> - a guide for system administrators on Quotas in ZooKeeper. </li>
       <li><a href="zookeeperJMX.html">JMX</a> - how to enable JMX in ZooKeeper</li>
       <li><a href="zookeeperJMX.html">JMX</a> - how to enable JMX in ZooKeeper</li>
       </ul>
       </ul>
       </li>
       </li>

+ 1 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -48,6 +48,7 @@ See http://forrest.apache.org/docs/linking.html for more info.
   
   
   <docs label="Admin &amp; Ops">
   <docs label="Admin &amp; Ops">
       <admin label="Administrator's Guide"  href="zookeeperAdmin.html" />
       <admin label="Administrator's Guide"  href="zookeeperAdmin.html" />
+      <quota label="Quota Guide"            href="zookeeperQuotas.html" />
       <jmx   label="JMX"                    href="zookeeperJMX.html" />
       <jmx   label="JMX"                    href="zookeeperJMX.html" />
   </docs>
   </docs>
   
   

+ 72 - 0
src/docs/src/documentation/content/xdocs/zookeeperQuotas.xml

@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+	<!--
+		Copyright 2002-2004 The Apache Software Foundation Licensed under the
+		Apache License, Version 2.0 (the "License"); you may not use this file
+		except in compliance with the License. You may obtain a copy of the
+		License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+		by applicable law or agreed to in writing, software distributed under
+		the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+		CONDITIONS OF ANY KIND, either express or implied. See the License for
+		the specific language governing permissions and limitations under the
+		License.
+	-->
+                        <!DOCTYPE article PUBLIC "-//OASIS//DTD Simplified DocBook XML V1.0//EN"
+                        "http://www.oasis-open.org/docbook/xml/simple/1.0/sdocbook.dtd">
+<article id="bk_Quota">
+	<title>ZooKeeper Quota's Guide</title>
+	<subtitle>A Guide to Deployment and Administration</subtitle>
+	<articleinfo>
+		<legalnotice>
+			<para>
+				Licensed under the Apache License, Version 2.0 (the "License"); you
+				may not use this file except in compliance with the License. You may
+				obtain a copy of the License at
+				<ulink url="http://www.apache.org/licenses/LICENSE-2.0">http://www.apache.org/licenses/LICENSE-2.0
+				</ulink>
+				.
+			</para>
+			<para>Unless required by applicable law or agreed to in
+				writing, software distributed under the License is distributed on an
+				"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+				express or implied. See the License for the specific language
+				governing permissions and limitations under the License.</para>
+		</legalnotice>
+		<abstract>
+			<para>This document contains information about deploying,
+				administering and mantaining ZooKeeper. It also discusses best
+				practices and common problems.</para>
+		</abstract>
+	</articleinfo>
+	<section id="zookeeper_quotas">
+	<title>Quotas</title>
+	<para> ZooKeeper has both namespace and bytes quotas. You can use the ZooKeeperMain class to setup quotas.
+	ZooKeeper prints <emphasis>WARN</emphasis> messages if users exceed the quota assigned to them. The messages 
+	are printed in the log of the ZooKeeper. 
+	</para>
+	<para><computeroutput>$java -cp zookeeper.jar:src/java/lib/log4j-1.2.15.jar/conf \
+	 org.apache.zookeeper.ZooKeeperMain host:port</computeroutput></para> 
+	 <para> The above command gives you a command line option of using quotas.</para>
+	 <section>
+	 <title>Setting Quotas</title>
+	<para>You can use 
+	 <emphasis>setquota</emphasis> to set a quota on a ZooKeeper node. It has an option of setting quota with
+	  -n (for namespace)
+	 and -b (for bytes). </para>
+	<para> The ZooKeeper quota are stored in ZooKeeper itself in /zookeeper/quota. To disable other people from
+	changing the quota's set the ACL for /zookeeper/quota such that only admins are able to read and write to it.
+	</para>
+	</section>
+	<section>
+	<title>Listing Quotas</title>
+	<para> You can use
+	<emphasis>listquota</emphasis> to list a quota on a ZooKeeper node.
+	</para>
+	</section>
+	<section>
+	<title> Deleting Quotas</title>
+	<para> You can use
+	<emphasis>delquota</emphasis> to delete quota on a ZooKeeper node.
+	</para>
+	</section>
+	</section>
+	</article>

+ 68 - 0
src/java/main/org/apache/zookeeper/Quotas.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+/**
+ * this class manages quotas
+ * and has many other utils
+ * for quota
+ */
+public class Quotas {
+
+    /** the zookeeper nodes that acts as the management and status node **/
+    public static final String procZookeeper = "/zookeeper";
+
+    /** the zookeeper quota node that acts as the quota
+     * management node for zookeeper */
+    public static final String quotaZookeeper = "/zookeeper/quota";
+
+    /**
+     * the limit node that has the limit of
+     * a subtree
+     */
+    public static final String limitNode = "zookeeper_limits";
+
+    /**
+     * the stat node that monitors the limit of
+     * a subtree.
+     */
+    public static final String statNode = "zookeeper_stats";
+
+    /**
+     * return the quota path associated with this
+     * prefix
+     * @param path the actual path in zookeeper.
+     * @return the limit quota path
+     */
+    public static String quotaPath(String path) {
+        return quotaZookeeper + path +
+        "/" + limitNode;
+    }
+
+    /**
+     * return the stat quota path associated with this
+     * prefix.
+     * @param path the actual path in zookeeper
+     * @return the stat quota path
+     */
+    public static String statPath(String path) {
+        return quotaZookeeper + path + "/" +
+        statNode;
+    }
+}

+ 101 - 0
src/java/main/org/apache/zookeeper/StatsTrack.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+/**
+ * a class that represents the stats associated with quotas
+ */
+public class StatsTrack {
+    private int count;
+    private long bytes;
+    private String countStr = "count";
+    private String byteStr = "bytes";
+
+    /**
+     * a default constructor for
+     * stats
+     */
+    public StatsTrack() {
+        this(null);
+    }
+    /**
+     * the stat string should be of the form count=int,bytes=long
+     * if stats is called with null the count and bytes are initialized
+     * to -1.
+     * @param stats the stat string to be intialized with
+     */
+    public StatsTrack(String stats) {
+        if (stats == null) {
+            stats = "count=-1,bytes=-1";
+        }
+        String[] split = stats.split(",");
+        if (split.length != 2) {
+            throw new IllegalArgumentException("invalid string " + stats);
+        }
+        count = Integer.parseInt(split[0].split("=")[1]);
+        bytes = Long.parseLong(split[1].split("=")[1]);
+    }
+
+
+    /**
+     * get the count of nodes allowed as part of quota
+     *
+     * @return the count as part of this string
+     */
+    public int getCount() {
+        return this.count;
+    }
+
+    /**
+     * set the count for this stat tracker.
+     *
+     * @param count
+     *            the count to set with
+     */
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    /**
+     * get the count of bytes allowed as part of quota
+     *
+     * @return the bytes as part of this string
+     */
+    public long getBytes() {
+        return this.bytes;
+    }
+
+    /**
+     * set teh bytes for this stat tracker.
+     *
+     * @param bytes
+     *            the bytes to set with
+     */
+    public void setBytes(long bytes) {
+        this.bytes = bytes;
+    }
+
+    @Override
+    /*
+     * returns the string that maps to this stat tracking.
+     */
+    public String toString() {
+        return countStr + "=" + count + "," + byteStr + "=" + bytes;
+    }
+}

+ 284 - 11
src/java/main/org/apache/zookeeper/ZooKeeperMain.java

@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.Date;
 import java.util.List;
 import java.util.List;
 
 
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.ACL;
@@ -35,8 +36,13 @@ import org.apache.zookeeper.data.Stat;
  * 
  * 
  */
  */
 public class ZooKeeperMain {
 public class ZooKeeperMain {
-
+    /**
+     * the logger for this class
+     */
+    private static final Logger LOG = Logger.getLogger(ZooKeeperMain.class);
+    
     static void usage() {
     static void usage() {
+        LOG.info("message", new IOException("USAGE"));
         System.err.println("ZooKeeper host:port cmd args");
         System.err.println("ZooKeeper host:port cmd args");
         System.err.println("\tcreate path data acl");
         System.err.println("\tcreate path data acl");
         System.err.println("\tdelete path [version]");
         System.err.println("\tdelete path [version]");
@@ -47,7 +53,10 @@ public class ZooKeeperMain {
         System.err.println("\tsetAcl path acl");
         System.err.println("\tsetAcl path acl");
         System.err.println("\tstat path [watch]");
         System.err.println("\tstat path [watch]");
         System.err.println("\tsync path");
         System.err.println("\tsync path");
-    }
+        System.err.println("\tsetquota -n|-b val path");
+        System.err.println("\tlistquota path");
+        System.err.println("\tdelquotsssa [-n|-b] path");
+        }
 
 
     static private class MyWatcher implements Watcher {
     static private class MyWatcher implements Watcher {
         public void process(WatchedEvent event) {
         public void process(WatchedEvent event) {
@@ -88,11 +97,9 @@ public class ZooKeeperMain {
         System.err.println("mZxid = " + stat.getMzxid());
         System.err.println("mZxid = " + stat.getMzxid());
         System.err.println("mtime = " + new Date(stat.getMtime()).toString());
         System.err.println("mtime = " + new Date(stat.getMtime()).toString());
         System.err.println("pZxid = " + stat.getPzxid());
         System.err.println("pZxid = " + stat.getPzxid());
-
         System.err.println("cversion = " + stat.getCversion());
         System.err.println("cversion = " + stat.getCversion());
         System.err.println("dataVersion = " + stat.getVersion());
         System.err.println("dataVersion = " + stat.getVersion());
         System.err.println("aclVersion = " + stat.getAversion());
         System.err.println("aclVersion = " + stat.getAversion());
-        
         System.err.println("ephemeralOwner = " + stat.getEphemeralOwner());
         System.err.println("ephemeralOwner = " + stat.getEphemeralOwner());
         System.err.println("dataLength = " + stat.getDataLength());
         System.err.println("dataLength = " + stat.getDataLength());
         System.err.println("numChildren = " + stat.getNumChildren());
         System.err.println("numChildren = " + stat.getNumChildren());
@@ -112,15 +119,16 @@ public class ZooKeeperMain {
             }
             }
         } else if (args.length < 3) {
         } else if (args.length < 3) {
             usage();
             usage();
+            System.exit(-1);
         }
         }
 
 
         ZooKeeper zooKeeper = new ZooKeeper(args[0], 5000, new MyWatcher());
         ZooKeeper zooKeeper = new ZooKeeper(args[0], 5000, new MyWatcher());
         boolean watch = processCmd(args, zooKeeper);
         boolean watch = processCmd(args, zooKeeper);
         if (!watch) {
         if (!watch) {
             System.exit(0);
             System.exit(0);
-        }
+        } 
     }
     }
-
+ 
     private static DataCallback dataCallback = new DataCallback() {
     private static DataCallback dataCallback = new DataCallback() {
 
 
         public void processResult(int rc, String path, Object ctx, byte[] data,
         public void processResult(int rc, String path, Object ctx, byte[] data,
@@ -131,57 +139,322 @@ public class ZooKeeperMain {
         }
         }
 
 
     };
     };
+    
+    /**
+     * trim the quota tree to recover unwanted tree elements
+     * in the quota's tree
+     * @param zk the zookeeper client
+     * @param path the path to start from and go up and see if their
+     * is any unwanted parent in the path.
+     * @return true if sucessful
+     * @throws KeeperException
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private static boolean trimProcQuotas(ZooKeeper zk, String path) throws
+            KeeperException, IOException, InterruptedException {
+        if (Quotas.quotaZookeeper.equals(path)) {
+            return true;
+        }
+        List<String> children = zk.getChildren(path, false);
+        if (children.size() == 0) {
+            zk.delete(path, -1);
+            String parent = path.substring(0, path.lastIndexOf('/'));
+            return trimProcQuotas(zk, parent);
+        }
+        else {
+            return true;
+        }
+    }
 
 
+    /**
+     * this method deletes quota for a node.
+     * @param zk the zookeeper client
+     * @param path the path to delete quota for
+     * @param bytes true if number of bytes needs to
+     * be unset
+     * @param numNodes true if number of nodes needs 
+     * to be unset
+     * @return true if quota deletion is successful
+     * @throws KeeperException
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static boolean delQuota(ZooKeeper zk, String path,
+            boolean bytes, boolean numNodes) throws KeeperException,
+            IOException, InterruptedException {
+        String parentPath = Quotas.quotaZookeeper + path;
+        String quotaPath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+        if (zk.exists(quotaPath, false) == null) {
+            System.out.println("Quota does not exist for " + path);
+            return true;
+        }
+        byte[] data = null;
+        try {
+           data = zk.getData(quotaPath, false, new Stat());
+        } catch(KeeperException.NoNodeException ne) {
+            System.err.println("quota does not exist for " + path);
+        }
+        StatsTrack strack = new StatsTrack(new String(data));
+        if (bytes && !numNodes) {
+            strack.setBytes(-1L);
+            zk.setData(quotaPath, strack.toString().getBytes(), -1);
+        }
+        else if (!bytes && numNodes) {
+            strack.setCount(-1);
+            zk.setData(quotaPath, strack.toString().getBytes(), -1);
+        }
+        else if (bytes && numNodes) {
+            // delete till you can find a node with more than
+            // one child
+            List<String> children = zk.getChildren(parentPath, false);
+            /// delete the direct children first
+            for (String child: children) {
+                zk.delete(parentPath + "/" + child, -1);
+            }
+            // cut the tree till their is more than one child
+            trimProcQuotas(zk, parentPath);
+        }
+        return true;
+    }
+    
+    private static void checkIfParentQuota(ZooKeeper zk, String path)
+        throws InterruptedException, KeeperException
+    {
+        final String[] splits = path.split("/");
+        String quotaPath = Quotas.quotaZookeeper;
+        for (String str: splits) {
+            if (str.length() == 0) {
+                // this should only be for the beginning of the path
+                // i.e. "/..." - split(path)[0] is empty string before first '/'
+                continue;
+            }
+            quotaPath += "/" + str;
+            List<String> children =  null;
+            try {
+                children = zk.getChildren(quotaPath, false);
+            } catch(KeeperException.NoNodeException ne) {
+                return;
+            }
+            if (children.size() == 0) {
+                return;
+            }
+            for (String child: children) {
+                if (Quotas.limitNode.equals(child)) {
+                    throw new IllegalArgumentException(path + " has a parent " 
+                            + quotaPath + " which has a quota");
+                }
+            }
+        }
+    }
+        
+    /**
+     * this method creates a quota node for the path
+     * @param zk the ZooKeeper client
+     * @param path the path for which quota needs to be created
+     * @param bytes the limit of bytes on this path
+     * @param numNodes the limit of number of nodes on this path
+     * @return true if its successful and false if not.
+     */
+    public static boolean createQuota(ZooKeeper zk, String path, 
+            long bytes, int numNodes) throws KeeperException, IOException,
+            InterruptedException {
+        // check if the path exists. We cannot create 
+        // quota for a path that already exists in zookeeper
+        // for now.
+        Stat initStat = zk.exists(path, false);
+        if (initStat == null) {
+            throw new IllegalArgumentException(path + " does not exist.");
+        }
+        // now check if their is already existing 
+        // parent or child that has quota
+        
+        String quotaPath = Quotas.quotaZookeeper;
+        // check for more than 2 children -- 
+        // if zookeeper_stats and zookeeper_qutoas
+        // are not the children then this path 
+        // is an ancestor of some path that 
+        // already has quota
+        String realPath = Quotas.quotaZookeeper + path;
+        try {
+           List<String> children = zk.getChildren(realPath, false);
+           for (String child: children) {
+               if (!child.startsWith("zookeeper_")) {
+                   throw new IllegalArgumentException(path + " has child " + 
+                           child + " which has a quota");
+               }
+           }
+        } catch(KeeperException.NoNodeException ne) {
+            //this is fine
+            // we can proceed further
+        }
+        
+        //check for any parent that has been quota
+        checkIfParentQuota(zk, path);
+        
+        // this is valid node for quota
+        // start creating all the parents
+        if (zk.exists(quotaPath, false) == null) {
+            try {
+                zk.create(Quotas.procZookeeper, null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+                zk.create(Quotas.quotaZookeeper, null, Ids.OPEN_ACL_UNSAFE, 
+                        CreateMode.PERSISTENT);
+            } catch(KeeperException.NodeExistsException ne) {
+                // do nothing
+            }
+        }
+        
+        // now create the direct children 
+        // and the stat and quota nodes
+        String[] splits = path.split("/");
+        for (int i=1; i<splits.length; i++) {
+            quotaPath = quotaPath + "/" + splits[i];
+            try {
+                zk.create(quotaPath, null, Ids.OPEN_ACL_UNSAFE , 
+                        CreateMode.PERSISTENT);
+            } catch(KeeperException.NodeExistsException ne) {
+                //do nothing
+            }
+        }
+        String statPath = quotaPath + "/" + Quotas.statNode;
+        quotaPath = quotaPath + "/" + Quotas.limitNode;
+        StatsTrack strack = new StatsTrack(null);
+        strack.setBytes(bytes);
+        strack.setCount(numNodes);
+        try {
+            zk.create(quotaPath, strack.toString().getBytes(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            StatsTrack stats = new StatsTrack(null);
+            stats.setBytes(0L);
+            stats.setCount(0);
+            zk.create(statPath, stats.toString().getBytes(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch(KeeperException.NodeExistsException ne) {
+            byte[] data = zk.getData(quotaPath, false , new Stat());
+            StatsTrack strackC = new StatsTrack(new String(data));
+            if (bytes != -1L) {
+                strackC.setBytes(bytes);
+            }
+            if (numNodes != -1) {
+                strackC.setCount(numNodes);
+            }
+            zk.setData(quotaPath, strackC.toString().getBytes(), -1);
+        }
+        return true;
+    }
+   
     private static boolean processCmd(String[] args, ZooKeeper zooKeeper)
     private static boolean processCmd(String[] args, ZooKeeper zooKeeper)
             throws KeeperException, IOException, InterruptedException {
             throws KeeperException, IOException, InterruptedException {
         Stat stat = new Stat();
         Stat stat = new Stat();
         if (args.length < 2) {
         if (args.length < 2) {
-            return false;
-        }
-        if (args.length < 3) {
             usage();
             usage();
             return false;
             return false;
         }
         }
+        
         String cmd = args[1];
         String cmd = args[1];
         boolean watch = args.length > 3;
         boolean watch = args.length > 3;
-        String path = args[2];
+        String path = null;
         List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
         List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
         System.out.println("Processing " + cmd);
         System.out.println("Processing " + cmd);
         if (cmd.equals("create") && args.length >= 4) {
         if (cmd.equals("create") && args.length >= 4) {
             if (args.length == 5) {
             if (args.length == 5) {
                 acl = parseACLs(args[4]);
                 acl = parseACLs(args[4]);
             }
             }
+            path = args[2];
             String newPath = zooKeeper.create(path, args[3].getBytes(), acl, CreateMode.PERSISTENT);
             String newPath = zooKeeper.create(path, args[3].getBytes(), acl, CreateMode.PERSISTENT);
             System.err.println("Created " + newPath);
             System.err.println("Created " + newPath);
         } else if (cmd.equals("delete") && args.length >= 3) {
         } else if (cmd.equals("delete") && args.length >= 3) {
+            path = args[2];
             zooKeeper.delete(path, watch ? Integer.parseInt(args[3]) : -1);
             zooKeeper.delete(path, watch ? Integer.parseInt(args[3]) : -1);
         } else if (cmd.equals("set") && args.length >= 4) {
         } else if (cmd.equals("set") && args.length >= 4) {
+            path = args[2];
             stat = zooKeeper.setData(path, args[3].getBytes(),
             stat = zooKeeper.setData(path, args[3].getBytes(),
                     args.length > 4 ? Integer.parseInt(args[4]) : -1);
                     args.length > 4 ? Integer.parseInt(args[4]) : -1);
             printStat(stat);
             printStat(stat);
         } else if (cmd.equals("aget") && args.length >= 3) {
         } else if (cmd.equals("aget") && args.length >= 3) {
+            path = args[2];
             zooKeeper.getData(path, watch, dataCallback, path);
             zooKeeper.getData(path, watch, dataCallback, path);
         } else if (cmd.equals("get") && args.length >= 3) {
         } else if (cmd.equals("get") && args.length >= 3) {
+            path = args[2];
             byte data[] = zooKeeper.getData(path, watch, stat);
             byte data[] = zooKeeper.getData(path, watch, stat);
             System.out.println(new String(data));
             System.out.println(new String(data));
             printStat(stat);
             printStat(stat);
         } else if (cmd.equals("ls") && args.length >= 3) {
         } else if (cmd.equals("ls") && args.length >= 3) {
+            path = args[2];
             List<String> children = zooKeeper.getChildren(path, watch);
             List<String> children = zooKeeper.getChildren(path, watch);
             System.out.println(children);
             System.out.println(children);
         } else if (cmd.equals("getAcl") && args.length >= 2) {
         } else if (cmd.equals("getAcl") && args.length >= 2) {
+            path = args[2];
             acl = zooKeeper.getACL(path, stat);
             acl = zooKeeper.getACL(path, stat);
             for (ACL a : acl) {
             for (ACL a : acl) {
                 System.out.println(a.getId() + ": "
                 System.out.println(a.getId() + ": "
                         + getPermString(a.getPerms()));
                         + getPermString(a.getPerms()));
             }
             }
         } else if (cmd.equals("setAcl") && args.length >= 4) {
         } else if (cmd.equals("setAcl") && args.length >= 4) {
-
+            path = args[2];
             stat = zooKeeper.setACL(path, parseACLs(args[3]),
             stat = zooKeeper.setACL(path, parseACLs(args[3]),
                     args.length > 4 ? Integer.parseInt(args[4]) : -1);
                     args.length > 4 ? Integer.parseInt(args[4]) : -1);
             printStat(stat);
             printStat(stat);
         } else if (cmd.equals("stat") && args.length >= 3) {
         } else if (cmd.equals("stat") && args.length >= 3) {
+            path = args[2];
             stat = zooKeeper.exists(path, watch);
             stat = zooKeeper.exists(path, watch);
             printStat(stat);
             printStat(stat);
+        } else if (cmd.equals("listquota") && args.length >= 3) {
+            path = args[2];
+            String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+            byte[] data =  null;
+            try {   
+                System.err.println("absolute path is " + absolutePath);
+                data = zooKeeper.getData(absolutePath, false, stat);
+                StatsTrack st = new StatsTrack(new String(data));
+                System.out.println("Output quota for " + path + " " 
+                        + st.toString());
+                
+                data = zooKeeper.getData(Quotas.quotaZookeeper + path + "/" +
+                        Quotas.statNode, false, stat);
+                System.out.println("Output stat for " + path + " " +
+                            new StatsTrack(new String(data)).toString());
+            } catch(KeeperException.NoNodeException ne) {
+                System.err.println("quota for " + path + " does not exist.");
+            }
+            
+        } else if (cmd.equals("setquota") && args.length > 4) {
+            String option = args[2];
+            path = args[4];
+            System.err.println("Comment: the parts are " +
+            		"option " + option + " path " + 
+            		args[4] + " val " + args[3]);
+            String val = args[3];
+            if ("-b".equals(option)) {
+                // we are setting the bytes quota
+                createQuota(zooKeeper, path, Long.parseLong(val), -1);
+            } else if ("-n".equals(option)) {
+                // we are setting the num quota
+                createQuota(zooKeeper, path, -1L, Integer.parseInt(val));
+            }
+            else {
+                usage();
+            }
+        } else if (cmd.equals("delquota") && args.length >= 3) {
+            //if neither option -n or -b is specified, we delete 
+            // the quota node for thsi node.
+            if (args.length == 4) {
+                //this time we have an option
+                String option = args[2];
+                path = args[3];
+                if ("-b".equals(option)) {
+                    delQuota(zooKeeper, path, true, false);
+                } else if ("-n".equals(option)) {
+                    delQuota(zooKeeper, path, false, true);
+                }
+            }
+            else if (args.length == 3) {
+                path = args[2];
+                // we dont have an option specified.
+                // just delete whole quota node
+                delQuota(zooKeeper, path, true, true);
+            }
         } else {
         } else {
             usage();
             usage();
         }
         }

+ 280 - 0
src/java/main/org/apache/zookeeper/common/PathTrie.java

@@ -0,0 +1,280 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+/**
+ * a class that implements prefix matching for 
+ * components of a filesystem path. the trie
+ * looks like a tree with edges mapping to 
+ * the component of a path.
+ * example /ab/bc/cf would map to a trie
+ *           /
+ *        ab/
+ *        (ab)
+ *      bc/
+ *       / 
+ *      (bc)
+ *   cf/
+ *   (cf)
+ */    
+public class PathTrie {
+    /**
+     * the logger for this class
+     */
+    private static final Logger LOG = Logger.getLogger(PathTrie.class);
+    
+    /**
+     * the root node of PathTrie
+     */
+    private TrieNode rootNode ;
+    
+    static class TrieNode {
+        boolean property = false;
+        HashMap<String, TrieNode> children;
+        TrieNode parent = null;
+        /**
+         * create a trienode with parent
+         * as parameter
+         * @param parent the parent of this trienode
+         */
+        private TrieNode(TrieNode parent) {
+            children = new HashMap<String, TrieNode>();
+            this.parent = parent;
+        }
+        
+        /**
+         * get the parent of this node
+         * @return the parent node
+         */
+        TrieNode getParent() {
+            return this.parent;
+        }
+        
+        /**
+         * set the parent of this node
+         * @param parent the parent to set to
+         */
+        void setParent(TrieNode parent) {
+            this.parent = parent;
+        }
+        
+        /**
+         * a property that is set 
+         * for a node - making it 
+         * special.
+         */
+        void setProperty(boolean prop) {
+            this.property = prop;
+        }
+        
+        /** the property of this
+         * node 
+         * @return the property for this
+         * node
+         */
+        boolean getProperty() {
+            return this.property;
+        }
+        /**
+         * add a child to the existing node
+         * @param childName the string name of the child
+         * @param node the node that is the child
+         */
+        void addChild(String childName, TrieNode node) {
+            synchronized(children) {
+                if (children.containsKey(childName)) {
+                    return;
+                }
+                children.put(childName, node);
+            }
+        }
+     
+        /**
+         * delete child from this node
+         * @param childName the string name of the child to 
+         * be deleted
+         */
+        void deleteChild(String childName) {
+            synchronized(children) {
+                if (!children.containsKey(childName)) {
+                    return;
+                }
+                TrieNode childNode = children.get(childName);
+                // this is the only child node.
+                if (childNode.getChildren().length == 1) { 
+                    childNode.setParent(null);
+                    children.remove(childName);
+                }
+                else {
+                    // their are more child nodes
+                    // so just reset property.
+                    childNode.setProperty(false);
+                }
+            }
+        }
+        
+        /**
+         * return the child of a node mapping
+         * to the input childname
+         * @param childName the name of the child
+         * @return the child of a node
+         */
+        TrieNode getChild(String childName) {
+            synchronized(children) {
+               if (!children.containsKey(childName)) {
+                   return null;
+               }
+               else {
+                   return children.get(childName);
+               }
+            }
+        }
+
+        /**
+         * get the list of children of this 
+         * trienode.
+         * @param node to get its children
+         * @return the string list of its children
+         */
+        String[] getChildren() {
+           synchronized(children) {
+               return children.keySet().toArray(new String[0]);
+           }
+        }
+        
+        /**
+         * get the string representation
+         * for this node
+         */
+        public String toString() {
+            StringBuffer sb = new StringBuffer();
+            sb.append("Children of trienode: ");
+            synchronized(children) {
+                for (String str: children.keySet()) {
+                    sb.append(" " + str);
+                }
+            }
+            return sb.toString();
+        }
+    }
+    
+    /**
+     * construct a new PathTrie with
+     * a root node of /
+     */
+    public PathTrie() {
+        this.rootNode = new TrieNode(null);
+    }
+    
+    /**
+     * add a path to the path trie 
+     * @param path
+     */
+    public void addPath(String path) {
+        if (path == null) {
+            return;
+        }
+        String[] pathComponents = path.split("/");
+        TrieNode parent = rootNode;
+        String part = null;
+        if (pathComponents.length <= 1) {
+            throw new IllegalArgumentException("Invalid path " + path);
+        }
+        for (int i=1; i<pathComponents.length; i++) {
+            part = pathComponents[i];
+            if (parent.getChild(part) == null) {
+                parent.addChild(part, new TrieNode(parent));
+            }
+            parent = parent.getChild(part);
+        }
+        parent.setProperty(true);
+    }
+    
+    /**
+     * delete a path from the trie
+     * @param path the path to be deleted
+     */
+    public void deletePath(String path) {
+        if (path == null) {
+            return;
+        }
+        String[] pathComponents = path.split("/");
+        TrieNode parent = rootNode;
+        String part = null;
+        if (pathComponents.length <= 1) { 
+            throw new IllegalArgumentException("Invalid path " + path);
+        }
+        for (int i=1; i<pathComponents.length; i++) {
+            part = pathComponents[i];
+            if (parent.getChild(part) == null) {
+                //the path does not exist 
+                return;
+            }
+            parent = parent.getChild(part);
+            LOG.info(parent);
+        }
+        TrieNode realParent  = parent.getParent();
+        realParent.deleteChild(part);
+    }
+    
+    /**
+     * return the largest prefix for the input path.
+     * @param path the input path
+     * @return the largest prefix for the input path.
+     */
+    public String findMaxPrefix(String path) {
+        if (path == null) {
+            return null;
+        }
+        String[] pathComponents = path.split("/");
+        TrieNode parent = rootNode;
+        List<String> components = new ArrayList<String>();
+        if (pathComponents.length <= 1) {
+            throw new IllegalArgumentException("Invalid path " + path);
+        }
+        int i = 1;
+        String part = null;
+        StringBuffer sb = new StringBuffer();
+        int lastindex = -1;
+        while((i < pathComponents.length)) {
+            if (parent.getChild(pathComponents[i]) != null) {
+                part = pathComponents[i];
+                parent = parent.getChild(part);
+                components.add(part);
+                if (parent.getProperty()) {
+                    lastindex = i-1;
+                }
+            }
+            else {
+                break;
+            }
+            i++;
+        }
+        for (int j=0; j< (lastindex+1); j++) {
+            sb.append("/" + components.get(j));
+        }
+        return sb.toString();
+    }
+}

+ 29 - 19
src/java/main/org/apache/zookeeper/server/DataNode.java

@@ -20,7 +20,7 @@ package org.apache.zookeeper.server;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.HashSet;
-
+import java.util.Set;
 import org.apache.jute.InputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
@@ -35,9 +35,32 @@ import org.apache.zookeeper.data.StatPersisted;
  * 
  * 
  */
  */
 public class DataNode implements Record {
 public class DataNode implements Record {
-    DataNode() {
-        // default rather than public constructor
-    }
+    /** the parent of this datanode */
+    DataNode parent; 
+    
+    /** the data for this datanode */
+    byte data[];     
+    
+    /** the acl map long for this datanode. 
+    the datatree has the map */
+    Long acl;       
+    
+    /** the stat for this node that is persisted 
+     * to disk.
+     */
+    public StatPersisted stat; 
+
+    /** the list of children for this node. note 
+     * that the list of children string does not 
+     * contain the parent path -- just the last
+     * part of the path.
+     */
+    Set<String> children = new HashSet<String>();
+
+    /**
+     * default constructor for the datanode
+     */
+    DataNode() {}
 
 
     /**
     /**
      * create a DataNode with parent, data, acls and stat
      * create a DataNode with parent, data, acls and stat
@@ -68,20 +91,11 @@ public class DataNode implements Record {
      * convenience methods to get the children
      * convenience methods to get the children
      * @return the children of this datanode
      * @return the children of this datanode
      */
      */
-    public HashSet<String> getChildren() {
+    public Set<String> getChildren() {
         return this.children;
         return this.children;
     }
     }
     
     
-    DataNode parent;
-
-    byte data[];
-
-    Long acl;
-
-    public StatPersisted stat;
-
-    HashSet<String> children = new HashSet<String>();
-
+   
     public void copyStat(Stat to) {
     public void copyStat(Stat to) {
         to.setAversion(stat.getAversion());
         to.setAversion(stat.getAversion());
         to.setCtime(stat.getCtime());
         to.setCtime(stat.getCtime());
@@ -114,8 +128,4 @@ public class DataNode implements Record {
         stat.serialize(archive, "statpersisted");
         stat.serialize(archive, "statpersisted");
         archive.endRecord(this, "node");
         archive.endRecord(this, "node");
     }
     }
-    
-    public int compareTo(Object o) {
-        return -1;
-    }
 }
 }

+ 277 - 20
src/java/main/org/apache/zookeeper/server/DataTree.java

@@ -34,6 +34,8 @@ import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.Code;
@@ -42,6 +44,7 @@ import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.data.StatPersisted;
@@ -63,7 +66,7 @@ import org.apache.zookeeper.txn.TxnHeader;
  */
  */
 public class DataTree {
 public class DataTree {
     private static final Logger LOG = Logger.getLogger(DataTree.class);
     private static final Logger LOG = Logger.getLogger(DataTree.class);
-
+    
     /**
     /**
      * This hashtable provides a fast lookup to the datanodes. The tree is the
      * This hashtable provides a fast lookup to the datanodes. The tree is the
      * source of truth and is where all the locking occurs
      * source of truth and is where all the locking occurs
@@ -75,10 +78,34 @@ public class DataTree {
 
 
     private WatchManager childWatches = new WatchManager();
     private WatchManager childWatches = new WatchManager();
 
 
+    /** the root of zookeeper tree */
+    private final String rootZookeeper = "/";
+    
+    /** the zookeeper nodes that acts as the management and status node **/
+    private final String procZookeeper = Quotas.procZookeeper;
+    
+    /** this will be the string thats stored as a child of root */
+    private final String procChildZookeeper = procZookeeper.substring(1);
+    
+    /** the zookeeper quota node that acts as the quota 
+     * management node for zookeeper */
+    private final String quotaZookeeper = Quotas.quotaZookeeper;
+
+    /** this will be the string thats stored as a child of /zookeeper */
+    private final String quotaChildZookeeper = quotaZookeeper.substring(
+            procZookeeper.length() + 1);
+    
+    /** 
+     * the path trie that keeps track fo the quota nodes in 
+     * this datatree
+     */
+    PathTrie pTrie = new PathTrie();
+    
     /**
     /**
      * This hashtable lists the paths of the ephemeral nodes of a session.
      * This hashtable lists the paths of the ephemeral nodes of a session.
      */
      */
-    private Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
+    private Map<Long, HashSet<String>> ephemerals = new
+                            ConcurrentHashMap<Long, HashSet<String>>();
 
 
     /**
     /**
      * this is map from longs to acl's. It saves acl's being stored 
      * this is map from longs to acl's. It saves acl's being stored 
@@ -229,13 +256,48 @@ public class DataTree {
      */
      */
     private DataNode root =
     private DataNode root =
         new DataNode(null, new byte[0], -1L, new StatPersisted());
         new DataNode(null, new byte[0], -1L, new StatPersisted());
+    
+    /**
+     * create a /zookeeper filesystem that is the proc
+     * filesystem of zookeeper
+     */
+    private DataNode procDataNode = 
+        new DataNode(root, new byte[0], -1L, new StatPersisted());
+    
+    /** 
+     * create a /zookeeper/quota node for maintaining quota properties
+     * for zookeeper
+     */
+    private DataNode quotaDataNode = 
+        new DataNode(procDataNode, new byte[0], -1L, new StatPersisted());
 
 
     public DataTree() {
     public DataTree() {
         /* Rather than fight it, let root have an alias */
         /* Rather than fight it, let root have an alias */
         nodes.put("", root);
         nodes.put("", root);
-        nodes.put("/", root);
+        nodes.put(rootZookeeper, root);
+        /** add the proc ndoe and quota node */
+        Set<String> children = root.getChildren();
+        
+        children.add(procChildZookeeper);
+        nodes.put(procZookeeper, procDataNode);
+        children = procDataNode.getChildren();
+        children.add(quotaChildZookeeper);
+        nodes.put(quotaZookeeper, quotaDataNode);
     }
     }
-
+    
+    /**
+     * is the path one of the special paths owned by zookeeper.
+     * @param path the path to be checked
+     * @return true if a special path. false if not.
+     */
+    boolean isSpecialPath(String path) {
+        if (rootZookeeper.equals(path) || procZookeeper.equals(path)
+                || quotaZookeeper.equals(path)) {
+            return true;
+        }
+        return false;
+    }
+    
     static public void copyStatPersisted(StatPersisted from, StatPersisted to) {
     static public void copyStatPersisted(StatPersisted from, StatPersisted to) {
         to.setAversion(from.getAversion());
         to.setAversion(from.getAversion());
         to.setCtime(from.getCtime());
         to.setCtime(from.getCtime());
@@ -261,22 +323,64 @@ public class DataTree {
         to.setDataLength(from.getDataLength());
         to.setDataLength(from.getDataLength());
         to.setNumChildren(from.getNumChildren());
         to.setNumChildren(from.getNumChildren());
     }
     }
-
-    // public void remooveInterest(String path, Watcher nw) {
-    // DataNode n = nodes.get(path);
-    // if (n == null) {
-    // synchronized (nonExistentWatches) {
-    // HashSet<Watcher> list = nonExistentWatches.get(path);
-    // if (list != null) {
-    // list.remove(nw);
-    // }
-    // }
-    // }
-    // synchronized (n) {
-    // n.dataWatchers.remove(nw);
-    // n.childWatchers.remove(nw);
-    // }
-    // }
+    
+  
+    
+    /** 
+     * update the count of this stat datanode
+     * @param lastPrefix the path of the node that is quotaed.
+     * @param diff the diff to be added to the count
+     */
+    public void updateCount(String lastPrefix, int diff) {
+        String statNode = Quotas.statPath(lastPrefix);
+        DataNode node = nodes.get(statNode);
+        StatsTrack updatedStat = null;
+        synchronized(node) {
+            updatedStat = new StatsTrack(new String(node.data));
+            updatedStat.setCount(updatedStat.getCount() + diff);
+            node.data = updatedStat.toString().getBytes();
+        }
+        //now check if the counts match the quota
+        String quotaNode = Quotas.quotaPath(lastPrefix);
+        node = nodes.get(quotaNode);
+        StatsTrack thisStats = null;
+        synchronized(node) {
+            thisStats = new StatsTrack(new String(node.data));
+        }
+        if (thisStats.getCount() < updatedStat.getCount()) {
+            LOG.warn("Quota exceeded: " + lastPrefix + " count="
+                    + updatedStat.getCount() + " limit=" + 
+                    thisStats.getCount());
+        }
+    }
+    
+    /**
+     * update the count of bytes of this stat datanode
+     * @param lastPrefix the path of the node that is quotaed
+     * @param diff the diff to added to number of bytes
+     */
+    public void updateBytes(String lastPrefix, long diff) {
+        String statNode = Quotas.statPath(lastPrefix);
+        DataNode node = nodes.get(statNode);
+        StatsTrack updatedStat = null;
+        synchronized(node) {
+            updatedStat = new StatsTrack(new String(node.data));
+            updatedStat.setBytes(updatedStat.getBytes() + diff);
+            node.data = updatedStat.toString().getBytes();
+        }
+        // now check if the bytes match the quota
+        String quotaNode = Quotas.quotaPath(lastPrefix);
+        node = nodes.get(quotaNode);
+        StatsTrack thisStats = null;
+        synchronized(node) {
+            thisStats = new StatsTrack(new String(node.data));
+        }
+        if (thisStats.getBytes() < updatedStat.getBytes()) {
+            LOG.warn("Quota exceeded: " + lastPrefix + " bytes="
+                    + updatedStat.getBytes() + " limit=" + 
+                    thisStats.getBytes());
+        }
+    }
 
 
     /**
     /**
      * @param path
      * @param path
@@ -332,6 +436,25 @@ public class DataTree {
                 }
                 }
             }
             }
         }
         }
+        // now check if its one of the zookeeper node child
+        if (parentName.startsWith(quotaZookeeper)) {
+            //now check if its the limit node
+            if (Quotas.limitNode.equals(childName)) {
+                // this is the limit node
+                // get the parent and add it to the trie
+                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
+            }
+            if (Quotas.statNode.equals(childName)) {
+                updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
+            }
+        }
+        //also check to update the quotas for this node
+        String lastPrefix = pTrie.findMaxPrefix(path);
+        if (!rootZookeeper.equals(lastPrefix) && !("".equals(lastPrefix))) {
+            // ok we have some match and need to update 
+            updateCount(lastPrefix, 1);
+            updateBytes(lastPrefix, data == null? 0:data.length);
+        }
         dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
         dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
         childWatches.triggerWatch(parentName.equals("")?"/":parentName, Event.EventType.NodeChildrenChanged);
         childWatches.triggerWatch(parentName.equals("")?"/":parentName, Event.EventType.NodeChildrenChanged);
         return path;
         return path;
@@ -373,6 +496,22 @@ public class DataTree {
             }
             }
             node.parent = null;
             node.parent = null;
         }
         }
+        if (parentName.startsWith(procZookeeper)) {
+            // delete the node in the trie.
+            if (Quotas.limitNode.equals(childName)) {
+                // we need to update the trie 
+                // as well
+                pTrie.deletePath(parentName.substring(quotaZookeeper.length()));
+            }
+        }
+        
+        //also check to update the quotas for this node
+        String lastPrefix = pTrie.findMaxPrefix(path);
+        if (!rootZookeeper.equals(lastPrefix) && !("".equals(lastPrefix))) {
+            // ok we have some match and need to update 
+            updateCount(lastPrefix, -1);
+            updateBytes(lastPrefix, node.data == null? 0:-(node.data.length));
+        }
         ZooTrace.logTraceMessage(LOG,
         ZooTrace.logTraceMessage(LOG,
                                  ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                  ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                  "dataWatches.triggerWatch " + path);
                                  "dataWatches.triggerWatch " + path);
@@ -392,6 +531,7 @@ public class DataTree {
         if (n == null) {
         if (n == null) {
             throw new KeeperException.NoNodeException();
             throw new KeeperException.NoNodeException();
         }
         }
+        byte lastdata[] = n.data;
         synchronized (n) {
         synchronized (n) {
             n.data = data;
             n.data = data;
             n.stat.setMtime(time);
             n.stat.setMtime(time);
@@ -399,6 +539,15 @@ public class DataTree {
             n.stat.setVersion(version);
             n.stat.setVersion(version);
             n.copyStat(s);
             n.copyStat(s);
         }
         }
+        // now update if the path is in a quota subtree.
+        String lastPrefix = pTrie.findMaxPrefix(path);
+        // do nothing for the root. 
+        // we are not keeping a quota on the zookeeper 
+        // root node for now.
+        if (!rootZookeeper.equals(lastPrefix) && !("".equals(lastPrefix))) {
+            this.updateBytes(lastPrefix, (data == null?0:data.length) - 
+                    (lastdata == null?0:lastdata.length));
+        }
         dataWatches.triggerWatch(path, EventType.NodeDataChanged);
         dataWatches.triggerWatch(path, EventType.NodeDataChanged);
         return s;
         return s;
     }
     }
@@ -603,7 +752,110 @@ public class DataTree {
             }
             }
         }
         }
     }
     }
+    
+    /** 
+     * a encapsultaing class 
+     * for return value
+     */
+    private static class Counts {
+        long bytes;
+        int count;
+    }
 
 
+    /**
+     * this method gets the count of nodes 
+     * and the bytes under a subtree
+     * @param path the path to be used
+     * @param bytes the long bytes
+     * @param count the int count
+     */
+    private void getCounts(String path, Counts counts) {
+        DataNode node = getNode(path);
+        if (node == null) {
+            return;
+        }
+        String[] children = null;
+        synchronized (node) {
+            children = node.children.toArray(new
+                    String[node.children.size()]);
+        }
+        // add itself
+        counts.count += 1;
+        counts.bytes += (long)node.data.length;
+        if (children.length == 0) {
+            return;
+        }
+        for (String child: children) {
+            getCounts(path + "/" + child, counts);
+        }
+    }
+    /** 
+     * update the quota for the given path
+     * @param path the path to be used
+     */
+    private void updateQuotaForPath(String path) {
+        Counts c = new Counts();
+        getCounts(path, c);
+        StatsTrack strack = new StatsTrack();
+        strack.setBytes(c.bytes);
+        strack.setCount(c.count);
+        String statPath = Quotas.quotaZookeeper + path + 
+                        "/" + Quotas.statNode;
+        DataNode node = getNode(statPath);
+        //it should exist
+        if (node == null) {
+            LOG.warn("Missing quota stat node " + statPath);
+            return;
+        }
+        synchronized(node) {
+            node.data = strack.toString().getBytes();
+        }
+    }
+    /**
+     * this method traverses the quota 
+     * path and update the path trie and sets
+     * @param path
+     */
+    private void traverseNode(String path) {
+       DataNode node = getNode(path);
+       String children[] = null;
+       synchronized (node) {
+           children = node.children.toArray(new
+                   String[node.children.size()]);
+       }
+       if (children.length == 0) {
+           // this node does not have a child
+           // is the leaf node 
+           // check if its the leaf node
+           String endString = "/" + Quotas.limitNode;
+           if (path.endsWith(endString)) {
+               //ok this is the limit node
+               // get the real node and update 
+               // the count and the bytes
+               String realPath = path.substring(Quotas.quotaZookeeper.length(), 
+                       path.indexOf(endString));
+               updateQuotaForPath(realPath);
+               this.pTrie.addPath(realPath);
+           }
+           return;
+       }
+       for (String child: children) {
+           traverseNode(path + "/" + child);
+       }
+    }
+    
+    /**
+     * this method sets up the path trie
+     * and sets up stats for quota nodes
+     */
+    private void setupQuota() {
+        String quotaPath = Quotas.quotaZookeeper;
+        DataNode node = getNode(quotaPath);
+        if (node == null) {
+            return;
+        }
+        traverseNode(quotaPath);
+    }
     /**
     /**
      * this method uses a stringbuilder to create a new
      * this method uses a stringbuilder to create a new
      * path for children. This is faster than string
      * path for children. This is faster than string
@@ -721,6 +973,11 @@ public class DataTree {
             path = ia.readString("path");
             path = ia.readString("path");
         }
         }
         nodes.put("/", root);
         nodes.put("/", root);
+        // we are done with deserializing the 
+        // the datatree
+        // update the quotas - create path trie
+        // and also update the stat nodes
+        setupQuota();
     }
     }
 
 
     public String dumpEphemerals() {
     public String dumpEphemerals() {

+ 1 - 1
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -258,7 +258,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 path = deleteRequest.getPath();
                 path = deleteRequest.getPath();
                 lastSlash = path.lastIndexOf('/');
                 lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1
                 if (lastSlash == -1 || path.indexOf('\0') != -1
-                        || path.equals("/")) {
+                        || zks.dataTree.isSpecialPath(path)) {
                     throw new KeeperException.BadArgumentsException();
                     throw new KeeperException.BadArgumentsException();
                 }
                 }
                 parentPath = path.substring(0, lastSlash);
                 parentPath = path.substring(0, lastSlash);

+ 9 - 11
src/java/test/org/apache/zookeeper/test/AsyncTest.java

@@ -50,7 +50,7 @@ public class AsyncTest extends TestCase
 {
 {
     private static final Logger LOG = Logger.getLogger(AsyncTest.class);
     private static final Logger LOG = Logger.getLogger(AsyncTest.class);
 
 
-    private QuorumTest quorumTest = new QuorumTest();
+    private QuorumBase qb = new QuorumBase();
 
 
     private volatile boolean bang;
     private volatile boolean bang;
 
 
@@ -58,21 +58,18 @@ public class AsyncTest extends TestCase
     @Override
     @Override
     protected void setUp() throws Exception {
     protected void setUp() throws Exception {
         LOG.info("STARTING " + getName());
         LOG.info("STARTING " + getName());
-
-        ClientBase.setupTestEnv();
-
-        quorumTest.setUp();
+        qb.setUp();
     }
     }
 
 
     protected void restart() throws Exception {
     protected void restart() throws Exception {
-        quorumTest.startServers();
+        qb.startServers();
     }
     }
 
 
     @After
     @After
     @Override
     @Override
     protected void tearDown() throws Exception {
     protected void tearDown() throws Exception {
         LOG.info("Test clients shutting down");
         LOG.info("Test clients shutting down");
-        quorumTest.tearDown();
+        qb.tearDown();
         LOG.info("FINISHED " + getName());
         LOG.info("FINISHED " + getName());
     }
     }
 
 
@@ -87,7 +84,7 @@ public class AsyncTest extends TestCase
     }
     }
 
 
     private ZooKeeper createClient() throws IOException,InterruptedException {
     private ZooKeeper createClient() throws IOException,InterruptedException {
-        return createClient(quorumTest.hostPort);
+        return createClient(qb.hostPort);
     }
     }
 
 
     private ZooKeeper createClient(String hp)
     private ZooKeeper createClient(String hp)
@@ -120,7 +117,7 @@ public class AsyncTest extends TestCase
 
 
         public void run() {
         public void run() {
             try {
             try {
-                zk = new ZooKeeper(quorumTest.hostPort, 30000, this);
+                zk = new ZooKeeper(qb.hostPort, 30000, this);
                 while(bang) {
                 while(bang) {
                     incOutstanding(); // before create otw race
                     incOutstanding(); // before create otw race
                     zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE,
                     zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE,
@@ -191,13 +188,14 @@ public class AsyncTest extends TestCase
             verifyThreadTerminated(hammers[i], 60000);
             verifyThreadTerminated(hammers[i], 60000);
         }
         }
         // before restart
         // before restart
-        quorumTest.verifyRootOfAllServersMatch(quorumTest.hostPort);
+        QuorumTest qt = new QuorumTest();
+        qt.verifyRootOfAllServersMatch(qb.hostPort);
         tearDown();
         tearDown();
 
 
         restart();
         restart();
 
 
         // after restart
         // after restart
-        quorumTest.verifyRootOfAllServersMatch(quorumTest.hostPort);
+        qt.verifyRootOfAllServersMatch(qb.hostPort);
     }
     }
 
 
     LinkedList<Integer> results = new LinkedList<Integer>();
     LinkedList<Integer> results = new LinkedList<Integer>();

+ 2 - 2
src/java/test/org/apache/zookeeper/test/ClientTest.java

@@ -321,8 +321,8 @@ public class ClientTest extends ClientBase {
             zk.create("/ben", "Ben was here".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             zk.create("/ben", "Ben was here".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             LOG.info("Before getChildren /");
             LOG.info("Before getChildren /");
             List<String> children = zk.getChildren("/", false);
             List<String> children = zk.getChildren("/", false);
-            assertEquals(1, children.size());
-            assertEquals("ben", children.get(0));
+            assertEquals(2, children.size());
+            assertEquals("ben", children.get(1));
             String value = new String(zk.getData("/ben", false, stat));
             String value = new String(zk.getData("/ben", false, stat));
             assertEquals("Ben was here", value);
             assertEquals("Ben was here", value);
             // Test stat and watch of non existent node
             // Test stat and watch of non existent node

+ 2 - 8
src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java

@@ -22,21 +22,15 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Random;
+
+import junit.framework.TestCase;
 
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.quorum.FastLeaderElection;
 import org.apache.zookeeper.server.quorum.FastLeaderElection;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumStats;
 import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-
-import junit.framework.TestCase;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
 public class FLENewEpochTest extends TestCase {
 public class FLENewEpochTest extends TestCase {

+ 0 - 3
src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java

@@ -19,8 +19,6 @@
 package org.apache.zookeeper.test;
 package org.apache.zookeeper.test;
 
 
 import java.io.File;
 import java.io.File;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
@@ -36,7 +34,6 @@ import org.apache.zookeeper.server.PurgeTxnLog;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.test.ClientBase;
 
 
 /** 
 /** 
  * test the purging of the logs
  * test the purging of the logs

+ 147 - 0
src/java/test/org/apache/zookeeper/test/QuorumBase.java

@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.junit.After;
+
+public class QuorumBase extends ClientBase {
+    private static final Logger LOG = Logger.getLogger(QuorumBase.class);
+
+
+
+    File s1dir, s2dir, s3dir, s4dir, s5dir;
+    QuorumPeer s1, s2, s3, s4, s5;
+
+    protected void setUp() throws Exception {
+        LOG.info("STARTING " + getName());
+
+        setupTestEnv();
+
+        hostPort = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
+
+        s1dir = ClientBase.createTmpDir();
+        s2dir = ClientBase.createTmpDir();
+        s3dir = ClientBase.createTmpDir();
+        s4dir = ClientBase.createTmpDir();
+        s5dir = ClientBase.createTmpDir();
+
+        startServers();
+
+        LOG.info("Setup finished");
+    }
+    void startServers() throws IOException, InterruptedException {
+        int tickTime = 2000;
+        int initLimit = 3;
+        int syncLimit = 3;
+        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181)));
+        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182)));
+        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183)));
+        peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184)));
+        peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185)));
+
+        LOG.info("creating QuorumPeer 1");
+        s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, initLimit, syncLimit);
+        assertEquals(2181, s1.getClientPort());
+        LOG.info("creating QuorumPeer 2");
+        s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, initLimit, syncLimit);
+        assertEquals(2182, s2.getClientPort());
+        LOG.info("creating QuorumPeer 3");
+        s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, initLimit, syncLimit);
+        assertEquals(2183, s3.getClientPort());
+        LOG.info("creating QuorumPeer 4");
+        s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, initLimit, syncLimit);
+        assertEquals(2184, s4.getClientPort());
+        LOG.info("creating QuorumPeer 5");
+        s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, initLimit, syncLimit);
+        assertEquals(2185, s5.getClientPort());
+        LOG.info("start QuorumPeer 1");
+        s1.start();
+        LOG.info("start QuorumPeer 2");
+        s2.start();
+        LOG.info("start QuorumPeer 3");
+        s3.start();
+        LOG.info("start QuorumPeer 4");
+        s4.start();
+        LOG.info("start QuorumPeer 5");
+        s5.start();
+        LOG.info("started QuorumPeer 5");
+
+        for (String hp : hostPort.split(",")) {
+            assertTrue("waiting for server up",
+                       ClientBase.waitForServerUp(hp,
+                                    CONNECTION_TIMEOUT));
+            LOG.info(hp + " is accepting client connections");
+        }
+    }
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        LOG.info("TearDown started");
+        shutdown(s1);
+        shutdown(s2);
+        shutdown(s3);
+        shutdown(s4);
+        shutdown(s5);
+
+        for (String hp : hostPort.split(",")) {
+            assertTrue("waiting for server down",
+                       ClientBase.waitForServerDown(hp,
+                                           ClientBase.CONNECTION_TIMEOUT));
+            LOG.info(hp + " is no longer accepting client connections");
+        }
+
+        LOG.info("FINISHED " + getName());
+    }
+
+    protected void shutdown(QuorumPeer qp) {
+        try {
+            qp.shutdown();
+            qp.join(30000);
+            if (qp.isAlive()) {
+                fail("QP failed to shutdown in 30 seconds");
+            }
+        } catch (InterruptedException e) {
+            LOG.debug("QP interrupted", e);
+        }
+    }
+
+    protected ZooKeeper createClient()
+        throws IOException, InterruptedException
+        {
+        return createClient(hostPort);
+        }
+
+    protected ZooKeeper createClient(String hp)
+        throws IOException, InterruptedException
+        {
+        CountdownWatcher watcher = new CountdownWatcher();
+        return createClient(watcher, hp);
+        }
+}

+ 74 - 0
src/java/test/org/apache/zookeeper/test/QuorumQuotaTest.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeperMain;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * this class tests quota on a quorum
+ * zookeeper server.
+ *
+ */
+
+public class QuorumQuotaTest extends QuorumBase {
+    private static final Logger LOG = Logger.getLogger(
+            QuorumQuotaTest.class);
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public void testQuotaWithQuorum() throws Exception {
+        ZooKeeper zk = createClient();
+        zk.create("/a", "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        int i = 0;
+        for (i=0; i < 300;i++) {
+            zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+        ZooKeeperMain.createQuota(zk, "/a", 1000L, 5000);
+        String statPath = Quotas.quotaZookeeper + "/a"+ "/" + Quotas.statNode;
+        byte[] data = zk.getData(statPath, false, new Stat());
+        StatsTrack st = new StatsTrack(new String(data));
+        assertTrue("bytes are set", st.getBytes() == 1204L);
+        assertTrue("num count is set", st.getCount() == 301);
+        for (i=300; i < 600; i++) {
+            zk.create("/a/" + i, "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+        data = zk.getData(statPath, false, new Stat());
+        st = new StatsTrack(new String(data));
+        assertTrue("bytes are set", st.getBytes() == 2404L);
+        assertTrue("num count is set", st.getCount() == 601);
+    }
+}

+ 7 - 103
src/java/test/org/apache/zookeeper/test/QuorumTest.java

@@ -17,123 +17,27 @@
  */
  */
 
 
 package org.apache.zookeeper.test;
 package org.apache.zookeeper.test;
-
-import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
 
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
-public class QuorumTest extends ClientBase {
+public class QuorumTest extends QuorumBase {
     private static final Logger LOG = Logger.getLogger(QuorumTest.class);
     private static final Logger LOG = Logger.getLogger(QuorumTest.class);
-
+    private QuorumBase qb = new QuorumBase();
     private final ClientTest ct = new ClientTest();
     private final ClientTest ct = new ClientTest();
-
-    File s1dir, s2dir, s3dir, s4dir, s5dir;
-    QuorumPeer s1, s2, s3, s4, s5;
-
+    
     @Before
     @Before
     @Override
     @Override
     protected void setUp() throws Exception {
     protected void setUp() throws Exception {
-        LOG.info("STARTING " + getName());
-
-        setupTestEnv();
-
-        hostPort = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
-        ct.hostPort = hostPort;
-
-        s1dir = ClientBase.createTmpDir();
-        s2dir = ClientBase.createTmpDir();
-        s3dir = ClientBase.createTmpDir();
-        s4dir = ClientBase.createTmpDir();
-        s5dir = ClientBase.createTmpDir();
-
-        startServers();
-
-        LOG.info("Setup finished");
-    }
-    void startServers() throws IOException, InterruptedException {
-        int tickTime = 2000;
-        int initLimit = 3;
-        int syncLimit = 3;
-        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
-        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", 3181)));
-        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", 3182)));
-        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", 3183)));
-        peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", 3184)));
-        peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", 3185)));
-        
-        LOG.info("creating QuorumPeer 1");
-        s1 = new QuorumPeer(peers, s1dir, s1dir, 2181, 0, 1, tickTime, initLimit, syncLimit);
-        assertEquals(2181, s1.getClientPort());
-        LOG.info("creating QuorumPeer 2");
-        s2 = new QuorumPeer(peers, s2dir, s2dir, 2182, 0, 2, tickTime, initLimit, syncLimit);
-        assertEquals(2182, s2.getClientPort());
-        LOG.info("creating QuorumPeer 3");
-        s3 = new QuorumPeer(peers, s3dir, s3dir, 2183, 0, 3, tickTime, initLimit, syncLimit);
-        assertEquals(2183, s3.getClientPort());
-        LOG.info("creating QuorumPeer 4");
-        s4 = new QuorumPeer(peers, s4dir, s4dir, 2184, 0, 4, tickTime, initLimit, syncLimit);
-        assertEquals(2184, s4.getClientPort());
-        LOG.info("creating QuorumPeer 5");
-        s5 = new QuorumPeer(peers, s5dir, s5dir, 2185, 0, 5, tickTime, initLimit, syncLimit);
-        assertEquals(2185, s5.getClientPort());
-        LOG.info("start QuorumPeer 1");
-        s1.start();
-        LOG.info("start QuorumPeer 2");
-        s2.start();
-        LOG.info("start QuorumPeer 3");
-        s3.start();
-        LOG.info("start QuorumPeer 4");
-        s4.start();
-        LOG.info("start QuorumPeer 5");
-        s5.start();
-        LOG.info("started QuorumPeer 5");
-
-        for (String hp : hostPort.split(",")) {
-            assertTrue("waiting for server up",
-                       ClientBase.waitForServerUp(hp,
-                                    CONNECTION_TIMEOUT));
-            LOG.info(hp + " is accepting client connections");
-        }
+        qb.setUp();        
+        ct.hostPort = qb.hostPort;
     }
     }
-    @After
-    @Override
+    
     protected void tearDown() throws Exception {
     protected void tearDown() throws Exception {
-        LOG.info("TearDown started");
-        shutdown(s1);
-        shutdown(s2);
-        shutdown(s3);
-        shutdown(s4);
-        shutdown(s5);
-
-        for (String hp : hostPort.split(",")) {
-            assertTrue("waiting for server down",
-                       ClientBase.waitForServerDown(hp,
-                                           ClientBase.CONNECTION_TIMEOUT));
-            LOG.info(hp + " is no longer accepting client connections");
-        }
-
-        LOG.info("FINISHED " + getName());
-    }
-
-    protected void shutdown(QuorumPeer qp) {
-        try {
-            qp.shutdown();
-            qp.join(30000);
-            if (qp.isAlive()) {
-                fail("QP failed to shutdown in 30 seconds");
-            }
-        } catch (InterruptedException e) {
-            LOG.debug("QP interrupted", e);
-        }
+        qb.tearDown();
     }
     }
 
 
     @Test
     @Test

+ 12 - 12
src/java/test/org/apache/zookeeper/test/RepeatStartupTest.java

@@ -38,20 +38,20 @@ public class RepeatStartupTest extends TestCase {
      * @throws Exception
      * @throws Exception
      */
      */
     public void testFail() throws Exception {
     public void testFail() throws Exception {
-        QuorumTest qt = new QuorumTest();
-        qt.setUp();
-        System.out.println("Comment: the servers are at " + qt.hostPort);
-        ZooKeeper zk = qt.createClient();
+        QuorumBase qb = new QuorumBase();
+        qb.setUp();
+        System.out.println("Comment: the servers are at " + qb.hostPort);
+        ZooKeeper zk = qb.createClient();
         zk.create("/test", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zk.create("/test", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zk.close();
         zk.close();
-        qt.shutdown(qt.s1);
-        qt.shutdown(qt.s2);
-        qt.shutdown(qt.s3);
-        qt.shutdown(qt.s4);
-        qt.shutdown(qt.s5);
-        String hp = qt.hostPort.split(",")[0];
-        ZooKeeperServer zks = new ZooKeeperServer(qt.s1.getTxnFactory().getSnapDir(), 
-                qt.s1.getTxnFactory().getDataDir(), 3000);
+        qb.shutdown(qb.s1);
+        qb.shutdown(qb.s2);
+        qb.shutdown(qb.s3);
+        qb.shutdown(qb.s4);
+        qb.shutdown(qb.s5);
+        String hp = qb.hostPort.split(",")[0];
+        ZooKeeperServer zks = new ZooKeeperServer(qb.s1.getTxnFactory().getSnapDir(), 
+                qb.s1.getTxnFactory().getDataDir(), 3000);
         final int PORT = Integer.parseInt(hp.split(":")[1]);
         final int PORT = Integer.parseInt(hp.split(":")[1]);
         NIOServerCnxn.Factory factory = null;
         NIOServerCnxn.Factory factory = null;
         if (factory == null) {
         if (factory == null) {

+ 81 - 0
src/java/test/org/apache/zookeeper/test/ZooKeeperQuotaTest.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeperMain;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * this class tests quota on a single
+ * zookeeper server.
+ *
+ */
+public class ZooKeeperQuotaTest extends ClientBase {
+    private static final Logger LOG = Logger.getLogger(
+            ZooKeeperQuotaTest.class);
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public void testQuota() throws IOException,
+        InterruptedException, KeeperException {
+        final ZooKeeper zk = createClient();
+        final String path = "/a/b/v";
+
+        zk.create("/a", "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        zk.create("/a/b", "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        zk.create("/a/b/v", "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        zk.create("/a/b/v/d", "some".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        ZooKeeperMain.createQuota(zk, path, 1000L, 1000);
+        // see if its set
+        String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
+        byte[] data = zk.getData(absolutePath, false, new Stat());
+        StatsTrack st = new StatsTrack(new String(data));
+        assertTrue("bytes are set", st.getBytes() == 1000L);
+        assertTrue("num count is set", st.getCount() == 1000);
+
+        String statPath = Quotas.quotaZookeeper + path + "/" + Quotas.statNode;
+        byte[] qdata = zk.getData(statPath, false, new Stat());
+        StatsTrack qst = new StatsTrack(new String(qdata));
+        assertTrue("bytes are set", qst.getBytes() == 8L);
+        assertTrue("cound is set", qst.getCount() == 2);
+    }
+}