Pārlūkot izejas kodu

HDFS-5121. Add RPCs for creating and manipulating cache pools. (Contributed by Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1519841 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 12 gadi atpakaļ
vecāks
revīzija
97b7267977
16 mainītis faili ar 1303 papildinājumiem un 121 dzēšanām
  1. 121 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchedRemoteIterator.java
  2. 4 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
  3. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java
  4. 89 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
  5. 49 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  6. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java
  7. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java
  8. 119 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  9. 162 43
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  10. 162 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
  11. 141 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
  12. 189 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  13. 36 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  14. 57 47
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  15. 57 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  16. 95 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java

+ 121 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchedRemoteIterator.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A RemoteIterator that fetches elements in batches.
+ */
+public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
+  public interface BatchedEntries<E> {
+    public E get(int i);
+    public int size();
+  }
+  
+  public static class BatchedListEntries<E> implements BatchedEntries<E> {
+    private final List<E> entries;
+
+    public BatchedListEntries(List<E> entries) {
+      this.entries = entries;
+    }
+
+    public E get(int i) {
+      return entries.get(i);
+      
+    }
+
+    public int size() {
+      return entries.size();
+    }
+  }
+
+  private K nextKey;
+  private final int maxRepliesPerRequest;
+  private BatchedEntries<E> entries;
+  private int idx;
+
+  public BatchedRemoteIterator(K nextKey, int maxRepliesPerRequest) {
+    this.nextKey = nextKey;
+    this.maxRepliesPerRequest = maxRepliesPerRequest;
+    this.entries = null;
+    this.idx = -1;
+  }
+
+  /**
+   * Perform the actual remote request.
+   *
+   * @param key                    The key to send.
+   * @param maxRepliesPerRequest   The maximum number of replies to allow.
+   * @return                       A list of replies.
+   */
+  public abstract BatchedEntries<E> makeRequest(K nextKey, int maxRepliesPerRequest)
+      throws IOException;
+
+  private void makeRequest() throws IOException {
+    idx = 0;
+    entries = null;
+    entries = makeRequest(nextKey, maxRepliesPerRequest);
+    if (entries.size() > maxRepliesPerRequest) {
+      throw new IOException("invalid number of replies returned: got " +
+          entries.size() + ", expected " + maxRepliesPerRequest +
+          " at most.");
+    }
+    if (entries.size() == 0) {
+      entries = null;
+    }
+  }
+
+  private void makeRequestIfNeeded() throws IOException {
+    if (idx == -1) {
+      makeRequest();
+    } else if ((entries != null) && (idx >= entries.size())) {
+      if (entries.size() < maxRepliesPerRequest) {
+        // Last time, we got fewer entries than requested.
+        // So we should be at the end.
+        entries = null;
+      } else {
+        makeRequest();
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    makeRequestIfNeeded();
+    return (entries != null);
+  }
+
+  /**
+   * Return the next list key associated with an element.
+   */
+  public abstract K elementToNextKey(E element);
+
+  @Override
+  public E next() throws IOException {
+    makeRequestIfNeeded();
+    if (entries == null) {
+      throw new NoSuchElementException();
+    }
+    E entry = entries.get(idx++);
+    nextKey = elementToNextKey(entry);
+    return entry;
+  }
+}

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt

@@ -21,6 +21,10 @@ HDFS-4949 (Unreleased)
     HDFS-5141. Add cache status information to datanode heartbeat.
     HDFS-5141. Add cache status information to datanode heartbeat.
     (Contributed by Andrew Wang)
     (Contributed by Andrew Wang)
 
 
+    HDFS-5121. Add RPCs for creating and manipulating cache pools.
+    (Contributed by Colin Patrick McCabe)
+
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java

@@ -65,6 +65,16 @@ public abstract class AddPathCacheDirectiveException extends IOException {
     }
     }
   }
   }
 
 
+  public static class PoolWritePermissionDeniedError
+      extends AddPathCacheDirectiveException {
+    private static final long serialVersionUID = 1L;
+
+    public PoolWritePermissionDeniedError(PathCacheDirective directive) {
+      super("write permission denied for pool '" + directive.getPool() + "'",
+            directive);
+    }
+  }
+
   public static class UnexpectedAddPathCacheDirectiveException
   public static class UnexpectedAddPathCacheDirectiveException
       extends AddPathCacheDirectiveException {
       extends AddPathCacheDirectiveException {
     private static final long serialVersionUID = 1L;
     private static final long serialVersionUID = 1L;

+ 89 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Information about a cache pool.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CachePoolInfo {
+  final String poolName;
+
+  @Nullable
+  String ownerName;
+
+  @Nullable
+  String groupName;
+
+  @Nullable
+  Integer mode;
+
+  @Nullable
+  Integer weight;
+
+  public CachePoolInfo(String poolName) {
+    this.poolName = poolName;
+  }
+  
+  public String getPoolName() {
+    return poolName;
+  }
+
+  public String getOwnerName() {
+    return ownerName;
+  }
+
+  public CachePoolInfo setOwnerName(String ownerName) {
+    this.ownerName = ownerName;
+    return this;
+  }
+
+  public String getGroupName() {
+    return groupName;
+  }
+
+  public CachePoolInfo setGroupName(String groupName) {
+    this.groupName = groupName;
+    return this;
+  }
+  
+  public Integer getMode() {
+    return mode;
+  }
+
+  public CachePoolInfo setMode(Integer mode) {
+    this.mode = mode;
+    return this;
+  }
+
+  public Integer getWeight() {
+    return weight;
+  }
+
+  public CachePoolInfo setWeight(Integer weight) {
+    this.weight = weight;
+    return this;
+  }
+}

+ 49 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -1144,5 +1144,53 @@ public interface ClientProtocol {
   @Idempotent
   @Idempotent
   public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
   public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
       String pool, int maxRepliesPerRequest) throws IOException;
       String pool, int maxRepliesPerRequest) throws IOException;
-}
+  
+  /**
+   * Modify a cache pool.
+   *
+   * @param req
+   *          The request to modify a cache pool.
+   * @throws IOException 
+   *          If the request could not be completed.
+   */
+  @AtMostOnce
+  public void addCachePool(CachePoolInfo info) throws IOException;
+
+  /**
+   * Modify a cache pool.
+   *
+   * @param req
+   *          The request to modify a cache pool.
+   * @throws IOException 
+   *          If the request could not be completed.
+   */
+  @Idempotent
+  public void modifyCachePool(CachePoolInfo req) throws IOException;
+  
+  /**
+   * Remove a cache pool.
+   *
+   * @param cachePoolName
+   *          Name of the cache pool to remove.
+   * @throws IOException 
+   *          if the cache pool did not exist, or could not be removed.
+   */
+  @AtMostOnce
+  public void removeCachePool(String cachePoolName) throws IOException;
 
 
+  /**
+   * List some cache pools.
+   *
+   * @param prevKey
+   *          The previous key we listed.  We will list keys greater than this.
+   * @param maxRepliesPerRequest
+   *          Maximum number of cache pools to list.
+   * @return A remote iterator from which you can get CachePool objects.
+   *          Requests will be made as needed.
+   * @throws IOException
+   *          If there was an error listing cache pools.
+   */
+  @Idempotent
+  public RemoteIterator<CachePoolInfo> listCachePools(String prevKey,
+      int maxRepliesPerRequest) throws IOException;
+}

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java

@@ -36,7 +36,7 @@ public class PathCacheDirective implements Comparable<PathCacheDirective> {
 
 
   private final String pool;
   private final String pool;
 
 
-  public PathCacheDirective(String path, String pool) throws IOException {
+  public PathCacheDirective(String path, String pool) {
     Preconditions.checkNotNull(path);
     Preconditions.checkNotNull(path);
     Preconditions.checkNotNull(pool);
     Preconditions.checkNotNull(pool);
     this.path = path;
     this.path = path;
@@ -67,10 +67,9 @@ public class PathCacheDirective implements Comparable<PathCacheDirective> {
     if (path.isEmpty()) {
     if (path.isEmpty()) {
       throw new EmptyPathError(this);
       throw new EmptyPathError(this);
     }
     }
-    if (DFSUtil.isValidName(path)) {
+    if (!DFSUtil.isValidName(path)) {
       throw new InvalidPathNameError(this);
       throw new InvalidPathNameError(this);
     }
     }
-
     if (pool.isEmpty()) {
     if (pool.isEmpty()) {
       throw new InvalidPoolNameError(this);
       throw new InvalidPoolNameError(this);
     }
     }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java

@@ -47,6 +47,16 @@ public abstract class RemovePathCacheEntryException extends IOException {
     }
     }
   }
   }
 
 
+  public final static class RemovePermissionDeniedException
+      extends RemovePathCacheEntryException {
+    private static final long serialVersionUID = 1L;
+
+    public RemovePermissionDeniedException(long entryId) {
+      super("permission denied when trying to remove path cache entry id " +
+        entryId, entryId);
+    }
+  }
+
   public final static class NoSuchIdException
   public final static class NoSuchIdException
       extends RemovePathCacheEntryException {
       extends RemovePathCacheEntryException {
     private static final long serialVersionUID = 1L;
     private static final long serialVersionUID = 1L;

+ 119 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -27,26 +27,29 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
-import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
-import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
-import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
+import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
+import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
+import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto;
@@ -77,8 +80,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncR
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
@@ -105,22 +108,30 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
@@ -160,6 +171,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
@@ -1081,6 +1093,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
         } catch (NoSuchIdException ioe) {
         } catch (NoSuchIdException ioe) {
           builder.addResults(RemovePathCacheEntryErrorProto.
           builder.addResults(RemovePathCacheEntryErrorProto.
               NO_SUCH_CACHED_PATH_ID_ERROR_VALUE);
               NO_SUCH_CACHED_PATH_ID_ERROR_VALUE);
+        } catch (RemovePermissionDeniedException ioe) {
+          builder.addResults(RemovePathCacheEntryErrorProto.
+              REMOVE_PERMISSION_DENIED_ERROR_VALUE);
         } catch (IOException ioe) {
         } catch (IOException ioe) {
           builder.addResults(RemovePathCacheEntryErrorProto.
           builder.addResults(RemovePathCacheEntryErrorProto.
               UNEXPECTED_REMOVE_ERROR_VALUE);
               UNEXPECTED_REMOVE_ERROR_VALUE);
@@ -1115,4 +1130,99 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
       throw new ServiceException(e);
     }
     }
   }
   }
+
+  @Override
+  public AddCachePoolResponseProto addCachePool(RpcController controller,
+      AddCachePoolRequestProto request) throws ServiceException {
+    try {
+      CachePoolInfo info =
+          new CachePoolInfo(request.getPoolName());
+      if (request.hasOwnerName()) {
+        info.setOwnerName(request.getOwnerName());
+      }
+      if (request.hasGroupName()) {
+        info.setGroupName(request.getGroupName());
+      }
+      if (request.hasMode()) {
+        info.setMode(request.getMode());
+      }
+      if (request.hasWeight()) {
+        info.setWeight(request.getWeight());
+      }
+      server.addCachePool(info);
+      return AddCachePoolResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+  
+  @Override
+  public ModifyCachePoolResponseProto modifyCachePool(RpcController controller,
+      ModifyCachePoolRequestProto request) throws ServiceException {
+    try {
+      CachePoolInfo info =
+          new CachePoolInfo(request.getPoolName());
+      if (request.hasOwnerName()) {
+        info.setOwnerName(request.getOwnerName());
+      }
+      if (request.hasGroupName()) {
+        info.setGroupName(request.getGroupName());
+      }
+      if (request.hasMode()) {
+        info.setMode(request.getMode());
+      }
+      if (request.hasWeight()) {
+        info.setWeight(request.getWeight());
+      }
+      server.modifyCachePool(info);
+      return ModifyCachePoolResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RemoveCachePoolResponseProto removeCachePool(RpcController controller,
+      RemoveCachePoolRequestProto request) throws ServiceException {
+    try {
+      server.removeCachePool(request.getPoolName());
+      return RemoveCachePoolResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public ListCachePoolsResponseProto listCachePools(RpcController controller,
+      ListCachePoolsRequestProto request) throws ServiceException {
+    try {
+      RemoteIterator<CachePoolInfo> iter =
+        server.listCachePools(request.getPrevPoolName(),
+            request.getMaxReplies());
+      ListCachePoolsResponseProto.Builder responseBuilder =
+        ListCachePoolsResponseProto.newBuilder();
+      while (iter.hasNext()) {
+        CachePoolInfo pool = iter.next();
+        ListCachePoolsResponseElementProto.Builder elemBuilder = 
+            ListCachePoolsResponseElementProto.newBuilder();
+        elemBuilder.setPoolName(pool.getPoolName());
+        if (pool.getOwnerName() != null) {
+          elemBuilder.setOwnerName(pool.getOwnerName());
+        }
+        if (pool.getGroupName() != null) {
+          elemBuilder.setGroupName(pool.getGroupName());
+        }
+        if (pool.getMode() != null) {
+          elemBuilder.setMode(pool.getMode());
+        }
+        if (pool.getWeight() != null) {
+          elemBuilder.setWeight(pool.getWeight());
+        }
+        responseBuilder.addElements(elemBuilder.build());
+      }
+      return responseBuilder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
 }

+ 162 - 43
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -27,6 +27,8 @@ import java.util.NoSuchElementException;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoo
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
+import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@@ -58,11 +61,13 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
@@ -108,14 +113,19 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPa
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@@ -1064,6 +1074,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
     } else if (code == RemovePathCacheEntryErrorProto.
     } else if (code == RemovePathCacheEntryErrorProto.
         NO_SUCH_CACHED_PATH_ID_ERROR_VALUE) {
         NO_SUCH_CACHED_PATH_ID_ERROR_VALUE) {
       return new NoSuchIdException(id);
       return new NoSuchIdException(id);
+    } else if (code == RemovePathCacheEntryErrorProto.
+        REMOVE_PERMISSION_DENIED_ERROR_VALUE) {
+      return new RemovePermissionDeniedException(id);
     } else {
     } else {
       return new UnexpectedRemovePathCacheEntryException(id);
       return new UnexpectedRemovePathCacheEntryException(id);
     }
     }
@@ -1098,32 +1111,49 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
     }
   }
   }
 
 
+  private static class BatchedPathCacheEntries
+      implements BatchedEntries<PathCacheEntry> {
+    private ListPathCacheEntriesResponseProto response;
+
+    BatchedPathCacheEntries(ListPathCacheEntriesResponseProto response) {
+      this.response = response;
+    }
+
+    @Override
+    public PathCacheEntry get(int i) {
+      ListPathCacheEntriesElementProto elementProto =
+        response.getElements(i);
+      return new PathCacheEntry(elementProto.getId(), 
+          new PathCacheDirective(elementProto.getPath(),
+              elementProto.getPool()));
+    }
+
+    @Override
+    public int size() {
+      return response.getElementsCount();
+    }
+  }
+
   private class PathCacheEntriesIterator
   private class PathCacheEntriesIterator
-      implements RemoteIterator<PathCacheEntry> {
-    private long prevId;
+      extends BatchedRemoteIterator<Long, PathCacheEntry> {
     private final String pool;
     private final String pool;
-    private final int repliesPerRequest;
-    private ListPathCacheEntriesResponseProto response;
-    private int idx;
 
 
-    public PathCacheEntriesIterator(long prevId, String pool,
-        int repliesPerRequest) {
-      this.prevId = prevId;
+    public PathCacheEntriesIterator(long prevKey, int maxRepliesPerRequest,
+        String pool) {
+      super(prevKey, maxRepliesPerRequest);
       this.pool = pool;
       this.pool = pool;
-      this.repliesPerRequest = repliesPerRequest;
-      this.response = null;
-      this.idx = -1;
     }
     }
 
 
-    private void makeRequest() throws IOException {
-      idx = 0;
-      response = null;
+    @Override
+    public BatchedEntries<PathCacheEntry> makeRequest(
+        Long nextKey, int maxRepliesPerRequest) throws IOException {
+      ListPathCacheEntriesResponseProto response;
       try {
       try {
         ListPathCacheEntriesRequestProto req =
         ListPathCacheEntriesRequestProto req =
             ListPathCacheEntriesRequestProto.newBuilder().
             ListPathCacheEntriesRequestProto.newBuilder().
-              setPrevId(prevId).
+              setPrevId(nextKey).
               setPool(pool).
               setPool(pool).
-              setMaxReplies(repliesPerRequest).
+              setMaxReplies(maxRepliesPerRequest).
               build();
               build();
         response = rpcProxy.listPathCacheEntries(null, req);
         response = rpcProxy.listPathCacheEntries(null, req);
         if (response.getElementsCount() == 0) {
         if (response.getElementsCount() == 0) {
@@ -1132,45 +1162,134 @@ public class ClientNamenodeProtocolTranslatorPB implements
       } catch (ServiceException e) {
       } catch (ServiceException e) {
         throw ProtobufHelper.getRemoteException(e);
         throw ProtobufHelper.getRemoteException(e);
       }
       }
+      return new BatchedPathCacheEntries(response);
     }
     }
 
 
-    private void makeRequestIfNeeded() throws IOException {
-      if (idx == -1) {
-        makeRequest();
-      } else if ((response != null) && (idx >= response.getElementsCount())) {
-        if (response.getHasMore()) {
-          makeRequest();
-        } else {
-          response = null;
-        }
-      }
+    @Override
+    public Long elementToNextKey(PathCacheEntry element) {
+      return element.getEntryId();
+    }
+  }
+
+  @Override
+  public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
+      String pool, int repliesPerRequest) throws IOException {
+    return new PathCacheEntriesIterator(prevId, repliesPerRequest, pool);
+  }
+
+  @Override
+  public void addCachePool(CachePoolInfo info) throws IOException {
+    AddCachePoolRequestProto.Builder builder = 
+        AddCachePoolRequestProto.newBuilder();
+    builder.setPoolName(info.getPoolName());
+    if (info.getOwnerName() != null) {
+      builder.setOwnerName(info.getOwnerName());
+    }
+    if (info.getGroupName() != null) {
+      builder.setGroupName(info.getGroupName());
+    }
+    if (info.getMode() != null) {
+      builder.setMode(info.getMode());
+    }
+    if (info.getWeight() != null) {
+      builder.setWeight(info.getWeight());
+    }
+    try {
+      rpcProxy.addCachePool(null, builder.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void modifyCachePool(CachePoolInfo req) throws IOException {
+    ModifyCachePoolRequestProto.Builder builder = 
+        ModifyCachePoolRequestProto.newBuilder();
+    builder.setPoolName(req.getPoolName());
+    if (req.getOwnerName() != null) {
+      builder.setOwnerName(req.getOwnerName());
+    }
+    if (req.getGroupName() != null) {
+      builder.setGroupName(req.getGroupName());
+    }
+    if (req.getMode() != null) {
+      builder.setMode(req.getMode());
+    }
+    if (req.getWeight() != null) {
+      builder.setWeight(req.getWeight());
+    }
+    try {
+      rpcProxy.modifyCachePool(null, builder.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void removeCachePool(String cachePoolName) throws IOException {
+    try {
+      rpcProxy.removeCachePool(null, 
+          RemoveCachePoolRequestProto.newBuilder().
+            setPoolName(cachePoolName).build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  private static class BatchedPathDirectiveEntries
+      implements BatchedEntries<CachePoolInfo> {
+    private final ListCachePoolsResponseProto proto;
+    
+    public BatchedPathDirectiveEntries(ListCachePoolsResponseProto proto) {
+      this.proto = proto;
+    }
+      
+    @Override
+    public CachePoolInfo get(int i) {
+      ListCachePoolsResponseElementProto elem = proto.getElements(i);
+      return new CachePoolInfo(elem.getPoolName()).
+          setOwnerName(elem.getOwnerName()).
+          setGroupName(elem.getGroupName()).
+          setMode(elem.getMode()).
+          setWeight(elem.getWeight());
     }
     }
 
 
     @Override
     @Override
-    public boolean hasNext() throws IOException {
-      makeRequestIfNeeded();
-      return (response != null);
+    public int size() {
+      return proto.getElementsCount();
+    }
+  }
+  
+  private class CachePoolIterator 
+      extends BatchedRemoteIterator<String, CachePoolInfo> {
+
+    public CachePoolIterator(String prevKey, int maxRepliesPerRequest) {
+      super(prevKey, maxRepliesPerRequest);
     }
     }
 
 
     @Override
     @Override
-    public PathCacheEntry next() throws IOException {
-      makeRequestIfNeeded();
-      if (response == null) {
-        throw new NoSuchElementException();
+    public BatchedEntries<CachePoolInfo> makeRequest(String prevKey,
+        int maxRepliesPerRequest) throws IOException {
+      try {
+        return new BatchedPathDirectiveEntries(
+            rpcProxy.listCachePools(null, 
+              ListCachePoolsRequestProto.newBuilder().
+                setPrevPoolName(prevKey).
+                setMaxReplies(maxRepliesPerRequest).build()));
+      } catch (ServiceException e) {
+        throw ProtobufHelper.getRemoteException(e);
       }
       }
-      ListPathCacheEntriesElementProto elementProto =
-        response.getElements(idx);
-      prevId = elementProto.getId();
-      idx++;
-      return new PathCacheEntry(elementProto.getId(), 
-          new PathCacheDirective(elementProto.getPath(),
-              elementProto.getPool()));
+    }
+
+    @Override
+    public String elementToNextKey(CachePoolInfo element) {
+      return element.getPoolName();
     }
     }
   }
   }
 
 
   @Override
   @Override
-  public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
-      String pool, int repliesPerRequest) throws IOException {
-    return new PathCacheEntriesIterator(prevId, pool, repliesPerRequest);
+  public RemoteIterator<CachePoolInfo> listCachePools(String prevKey,
+      int maxRepliesPerRequest) throws IOException {
+    return new CachePoolIterator(prevKey, maxRepliesPerRequest);
   }
   }
 }
 }

+ 162 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -27,12 +27,17 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
+import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
+import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
+import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
 import org.apache.hadoop.util.Fallible;
 import org.apache.hadoop.util.Fallible;
 
 
 /**
 /**
@@ -56,6 +61,12 @@ final class CacheManager {
   private final TreeMap<PathCacheDirective, PathCacheEntry> entriesByDirective =
   private final TreeMap<PathCacheDirective, PathCacheEntry> entriesByDirective =
       new TreeMap<PathCacheDirective, PathCacheEntry>();
       new TreeMap<PathCacheDirective, PathCacheEntry>();
 
 
+  /**
+   * Cache pools, sorted by name.
+   */
+  private final TreeMap<String, CachePool> cachePools =
+      new TreeMap<String, CachePool>();
+
   /**
   /**
    * The entry ID to use for a new entry.
    * The entry ID to use for a new entry.
    */
    */
@@ -80,16 +91,31 @@ final class CacheManager {
   }
   }
 
 
   private synchronized Fallible<PathCacheEntry> addDirective(
   private synchronized Fallible<PathCacheEntry> addDirective(
-        PathCacheDirective directive) {
+        PathCacheDirective directive, FSPermissionChecker pc) {
+    CachePool pool = cachePools.get(directive.getPool());
+    if (pool == null) {
+      LOG.info("addDirective " + directive + ": pool not found.");
+      return new Fallible<PathCacheEntry>(
+          new InvalidPoolNameError(directive));
+    }
+    if (!pc.checkWritePermission(pool.getOwnerName(),
+        pool.getGroupName(), pool.getMode())) {
+      LOG.info("addDirective " + directive + ": write permission denied.");
+      return new Fallible<PathCacheEntry>(
+          new PoolWritePermissionDeniedError(directive));
+    }
     try {
     try {
       directive.validate();
       directive.validate();
     } catch (IOException ioe) {
     } catch (IOException ioe) {
+      LOG.info("addDirective " + directive + ": validation failed.");
       return new Fallible<PathCacheEntry>(ioe);
       return new Fallible<PathCacheEntry>(ioe);
     }
     }
     // Check if we already have this entry.
     // Check if we already have this entry.
     PathCacheEntry existing = entriesByDirective.get(directive);
     PathCacheEntry existing = entriesByDirective.get(directive);
     if (existing != null) {
     if (existing != null) {
       // Entry already exists: return existing entry.
       // Entry already exists: return existing entry.
+      LOG.info("addDirective " + directive + ": there is an " +
+          "existing directive " + existing);
       return new Fallible<PathCacheEntry>(existing);
       return new Fallible<PathCacheEntry>(existing);
     }
     }
     // Add a new entry with the next available ID.
     // Add a new entry with the next available ID.
@@ -100,33 +126,57 @@ final class CacheManager {
       return new Fallible<PathCacheEntry>(
       return new Fallible<PathCacheEntry>(
           new UnexpectedAddPathCacheDirectiveException(directive));
           new UnexpectedAddPathCacheDirectiveException(directive));
     }
     }
+    LOG.info("addDirective " + directive + ": added cache directive "
+        + directive);
     entriesByDirective.put(directive, entry);
     entriesByDirective.put(directive, entry);
     entriesById.put(entry.getEntryId(), entry);
     entriesById.put(entry.getEntryId(), entry);
     return new Fallible<PathCacheEntry>(entry);
     return new Fallible<PathCacheEntry>(entry);
   }
   }
 
 
   public synchronized List<Fallible<PathCacheEntry>> addDirectives(
   public synchronized List<Fallible<PathCacheEntry>> addDirectives(
-      List<PathCacheDirective> directives) {
+      List<PathCacheDirective> directives, FSPermissionChecker pc) {
     ArrayList<Fallible<PathCacheEntry>> results = 
     ArrayList<Fallible<PathCacheEntry>> results = 
         new ArrayList<Fallible<PathCacheEntry>>(directives.size());
         new ArrayList<Fallible<PathCacheEntry>>(directives.size());
     for (PathCacheDirective directive: directives) {
     for (PathCacheDirective directive: directives) {
-      results.add(addDirective(directive));
+      results.add(addDirective(directive, pc));
     }
     }
     return results;
     return results;
   }
   }
 
 
-  private synchronized Fallible<Long> removeEntry(long entryId) {
+  private synchronized Fallible<Long> removeEntry(long entryId,
+        FSPermissionChecker pc) {
     // Check for invalid IDs.
     // Check for invalid IDs.
     if (entryId <= 0) {
     if (entryId <= 0) {
+      LOG.info("removeEntry " + entryId + ": invalid non-positive entry ID.");
       return new Fallible<Long>(new InvalidIdException(entryId));
       return new Fallible<Long>(new InvalidIdException(entryId));
     }
     }
     // Find the entry.
     // Find the entry.
     PathCacheEntry existing = entriesById.get(entryId);
     PathCacheEntry existing = entriesById.get(entryId);
     if (existing == null) {
     if (existing == null) {
+      LOG.info("removeEntry " + entryId + ": entry not found.");
       return new Fallible<Long>(new NoSuchIdException(entryId));
       return new Fallible<Long>(new NoSuchIdException(entryId));
     }
     }
+    CachePool pool = cachePools.get(existing.getDirective().getPool());
+    if (pool == null) {
+      LOG.info("removeEntry " + entryId + ": pool not found for directive " +
+        existing.getDirective());
+      return new Fallible<Long>(
+          new UnexpectedRemovePathCacheEntryException(entryId));
+    }
+    if (!pc.isSuperUser()) {
+      if (!pc.checkWritePermission(pool.getOwnerName(),
+          pool.getGroupName(), pool.getMode())) {
+        LOG.info("removeEntry " + entryId + ": write permission denied to " +
+            "pool " + pool + " for entry " + existing);
+        return new Fallible<Long>(
+            new RemovePermissionDeniedException(entryId));
+      }
+    }
+    
     // Remove the corresponding entry in entriesByDirective.
     // Remove the corresponding entry in entriesByDirective.
     if (entriesByDirective.remove(existing.getDirective()) == null) {
     if (entriesByDirective.remove(existing.getDirective()) == null) {
+      LOG.warn("removeEntry " + entryId + ": failed to find existing entry " +
+          existing + " in entriesByDirective");
       return new Fallible<Long>(
       return new Fallible<Long>(
           new UnexpectedRemovePathCacheEntryException(entryId));
           new UnexpectedRemovePathCacheEntryException(entryId));
     }
     }
@@ -134,11 +184,12 @@ final class CacheManager {
     return new Fallible<Long>(entryId);
     return new Fallible<Long>(entryId);
   }
   }
 
 
-  public synchronized List<Fallible<Long>> removeEntries(List<Long> entryIds) {
+  public synchronized List<Fallible<Long>> removeEntries(List<Long> entryIds,
+      FSPermissionChecker pc) {
     ArrayList<Fallible<Long>> results = 
     ArrayList<Fallible<Long>> results = 
         new ArrayList<Fallible<Long>>(entryIds.size());
         new ArrayList<Fallible<Long>>(entryIds.size());
     for (Long entryId : entryIds) {
     for (Long entryId : entryIds) {
-      results.add(removeEntry(entryId));
+      results.add(removeEntry(entryId, pc));
     }
     }
     return results;
     return results;
   }
   }
@@ -162,4 +213,109 @@ final class CacheManager {
     }
     }
     return replies;
     return replies;
   }
   }
+
+  /**
+   * Create a cache pool.
+   * 
+   * Only the superuser should be able to call this function.
+   *
+   * @param info
+   *          The info for the cache pool to create.
+   */
+  public synchronized void addCachePool(CachePoolInfo info)
+      throws IOException {
+    String poolName = info.getPoolName();
+    if (poolName.isEmpty()) {
+      throw new IOException("invalid empty cache pool name");
+    }
+    CachePool pool = cachePools.get(poolName);
+    if (pool != null) {
+      throw new IOException("cache pool " + poolName + " already exists.");
+    }
+    CachePool cachePool = new CachePool(poolName,
+      info.getOwnerName(), info.getGroupName(), info.getMode(),
+      info.getWeight());
+    cachePools.put(poolName, cachePool);
+    LOG.info("created new cache pool " + cachePool);
+  }
+
+  /**
+   * Modify a cache pool.
+   * 
+   * Only the superuser should be able to call this function.
+   *
+   * @param info
+   *          The info for the cache pool to modify.
+   */
+  public synchronized void modifyCachePool(CachePoolInfo info)
+      throws IOException {
+    String poolName = info.getPoolName();
+    if (poolName.isEmpty()) {
+      throw new IOException("invalid empty cache pool name");
+    }
+    CachePool pool = cachePools.get(poolName);
+    if (pool == null) {
+      throw new IOException("cache pool " + poolName + " does not exist.");
+    }
+    StringBuilder bld = new StringBuilder();
+    String prefix = "";
+    if (info.getOwnerName() != null) {
+      pool.setOwnerName(info.getOwnerName());
+      bld.append(prefix).
+        append("set owner to ").append(info.getOwnerName());
+      prefix = "; ";
+    }
+    if (info.getGroupName() != null) {
+      pool.setGroupName(info.getGroupName());
+      bld.append(prefix).
+        append("set group to ").append(info.getGroupName());
+      prefix = "; ";
+    }
+    if (info.getMode() != null) {
+      pool.setMode(info.getMode());
+      bld.append(prefix).
+        append(String.format("set mode to 0%3o", info.getMode()));
+      prefix = "; ";
+    }
+    if (info.getWeight() != null) {
+      pool.setWeight(info.getWeight());
+      bld.append(prefix).
+        append("set weight to ").append(info.getWeight());
+      prefix = "; ";
+    }
+    if (prefix.isEmpty()) {
+      bld.append("no changes.");
+    }
+    LOG.info("modified " + poolName + "; " + bld.toString());
+  }
+
+  /**
+   * Remove a cache pool.
+   * 
+   * Only the superuser should be able to call this function.
+   *
+   * @param poolName
+   *          The name for the cache pool to remove.
+   */
+  public synchronized void removeCachePool(String poolName)
+      throws IOException {
+    CachePool pool = cachePools.remove(poolName);
+    if (pool == null) {
+      throw new IOException("can't remove nonexistent cache pool " + poolName);
+    }
+  }
+
+  public synchronized List<CachePoolInfo>
+      listCachePools(FSPermissionChecker pc, String prevKey,
+          int maxRepliesPerRequest) {
+    final int MAX_PREALLOCATED_REPLIES = 16;
+    ArrayList<CachePoolInfo> results = 
+        new ArrayList<CachePoolInfo>(Math.min(MAX_PREALLOCATED_REPLIES,
+            maxRepliesPerRequest));
+    SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
+    for (Entry<String, CachePool> cur : tailMap.entrySet()) {
+      results.add(cur.getValue().getInfo(pc));
+    }
+    return results;
+  }
 }
 }

+ 141 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java

@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The NameNode uses CachePools to manage cache resources on the DataNodes.
+ */
+public final class CachePool {
+  public static final Log LOG = LogFactory.getLog(CachePool.class);
+
+  @Nonnull
+  private final String poolName;
+
+  @Nonnull
+  private String ownerName;
+
+  @Nonnull
+  private String groupName;
+  
+  private int mode;
+  
+  private int weight;
+  
+  public static String getCurrentUserPrimaryGroupName() throws IOException {
+    UserGroupInformation ugi= NameNode.getRemoteUser();
+    String[] groups = ugi.getGroupNames();
+    if (groups.length == 0) {
+      throw new IOException("failed to get group names from UGI " + ugi);
+    }
+    return groups[0];
+  }
+  
+  public CachePool(String poolName, String ownerName, String groupName,
+      Integer mode, Integer weight) throws IOException {
+    this.poolName = poolName;
+    this.ownerName = ownerName != null ? ownerName :
+      NameNode.getRemoteUser().getShortUserName();
+    this.groupName = groupName != null ? groupName :
+      getCurrentUserPrimaryGroupName();
+    this.mode = mode != null ? mode : 0644;
+    this.weight = weight != null ? weight : 100;
+  }
+
+  public String getName() {
+    return poolName;
+  }
+
+  public String getOwnerName() {
+    return ownerName;
+  }
+
+  public CachePool setOwnerName(String ownerName) {
+    this.ownerName = ownerName;
+    return this;
+  }
+
+  public String getGroupName() {
+    return groupName;
+  }
+
+  public CachePool setGroupName(String groupName) {
+    this.groupName = groupName;
+    return this;
+  }
+
+  public int getMode() {
+    return mode;
+  }
+
+  public CachePool setMode(int mode) {
+    this.mode = mode;
+    return this;
+  }
+  
+  public int getWeight() {
+    return weight;
+  }
+
+  public CachePool setWeight(int weight) {
+    this.weight = weight;
+    return this;
+  }
+  
+  /**
+   * Get information about this cache pool.
+   *
+   * @param fullInfo
+   *          If true, only the name will be returned (i.e., what you 
+   *          would get if you didn't have read permission for this pool.)
+   * @return
+   *          Cache pool information.
+   */
+  public CachePoolInfo getInfo(boolean fullInfo) {
+    CachePoolInfo info = new CachePoolInfo(poolName);
+    if (!fullInfo) {
+      return info;
+    }
+    return info.setOwnerName(ownerName).
+        setGroupName(groupName).
+        setMode(mode).
+        setWeight(weight);
+  }
+
+  public CachePoolInfo getInfo(FSPermissionChecker pc) {
+    return getInfo(pc.checkReadPermission(ownerName, groupName, mode));
+  }
+
+  public String toString() {
+    return new StringBuilder().
+        append("{ ").append("poolName:").append(poolName).
+        append(", ownerName:").append(ownerName).
+        append(", groupName:").append(groupName).
+        append(", mode:").append(String.format("%3o", mode)).
+        append(", weight:").append(weight).
+        append(" }").toString();
+  }
+}

+ 189 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -6700,6 +6701,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return; // Return previous response
       return; // Return previous response
     }
     }
     boolean success = false;
     boolean success = false;
+    checkOperation(OperationCategory.WRITE);
     writeLock();
     writeLock();
     try {
     try {
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
@@ -6748,17 +6750,198 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   }
 
 
   List<Fallible<PathCacheEntry>> addPathCacheDirectives(
   List<Fallible<PathCacheEntry>> addPathCacheDirectives(
-      List<PathCacheDirective> directives) {
-    return cacheManager.addDirectives(directives);
+      List<PathCacheDirective> directives) throws IOException {
+    CacheEntryWithPayload retryCacheEntry =
+        RetryCache.waitForCompletion(retryCache, null);
+    if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
+      return (List<Fallible<PathCacheEntry>>) retryCacheEntry.getPayload();
+    }
+    final FSPermissionChecker pc = getPermissionChecker();
+    boolean success = false;
+    List<Fallible<PathCacheEntry>> results = null;
+    checkOperation(OperationCategory.WRITE);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot add path cache directive", safeMode);
+      }
+      results = cacheManager.addDirectives(directives, pc);
+      //getEditLog().logAddPathCacheDirectives(results); FIXME: HDFS-5119
+      success = true;
+    } finally {
+      writeUnlock();
+      if (success) {
+        getEditLog().logSync();
+      }
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "addPathCacheDirectives", null, null, null);
+      }
+      RetryCache.setState(retryCacheEntry, success, results);
+    }
+    return results;
   }
   }
 
 
-  List<Fallible<Long>> removePathCacheEntries(List<Long> ids) {
-    return cacheManager.removeEntries(ids);
+  List<Fallible<Long>> removePathCacheEntries(List<Long> ids) throws IOException {
+    CacheEntryWithPayload retryCacheEntry =
+        RetryCache.waitForCompletion(retryCache, null);
+    if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
+      return (List<Fallible<Long>>) retryCacheEntry.getPayload();
+    }
+    final FSPermissionChecker pc = getPermissionChecker();
+    boolean success = false;
+    List<Fallible<Long>> results = null;
+    checkOperation(OperationCategory.WRITE);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot remove path cache directives", safeMode);
+      }
+      results = cacheManager.removeEntries(ids, pc);
+      //getEditLog().logRemovePathCacheEntries(results); FIXME: HDFS-5119
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "removePathCacheEntries", null, null, null);
+      }
+      RetryCache.setState(retryCacheEntry, success, results);
+    }
+    getEditLog().logSync();
+    return results;
   }
   }
 
 
   List<PathCacheEntry> listPathCacheEntries(long startId, String pool,
   List<PathCacheEntry> listPathCacheEntries(long startId, String pool,
-      int maxReplies) {
-    return cacheManager.listPathCacheEntries(startId, pool, maxReplies);
+      int maxReplies) throws IOException {
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      return cacheManager.listPathCacheEntries(startId, pool, maxReplies);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  public void addCachePool(CachePoolInfo req) throws IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    checkOperation(OperationCategory.WRITE);
+    writeLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (!pc.isSuperUser()) {
+        throw new AccessControlException("Non-super users cannot " +
+            "add cache pools.");
+      }
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot add cache pool " + req.getPoolName(), safeMode);
+      }
+      cacheManager.addCachePool(req);
+      //getEditLog().logAddCachePool(req); // FIXME: HDFS-5119
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "addCachePool", req.getPoolName(), null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+    
+    getEditLog().logSync();
+  }
+
+  public void modifyCachePool(CachePoolInfo req) throws IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    checkOperation(OperationCategory.WRITE);
+    writeLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (!pc.isSuperUser()) {
+        throw new AccessControlException("Non-super users cannot " +
+            "modify cache pools.");
+      }
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot modify cache pool " + req.getPoolName(), safeMode);
+      }
+      cacheManager.modifyCachePool(req);
+      //getEditLog().logModifyCachePool(req); // FIXME: HDFS-5119
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+
+    getEditLog().logSync();
+  }
+
+  public void removeCachePool(String cachePoolName) throws IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    checkOperation(OperationCategory.WRITE);
+    writeLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (!pc.isSuperUser()) {
+        throw new AccessControlException("Non-super users cannot " +
+            "remove cache pools.");
+      }
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot remove cache pool " + cachePoolName, safeMode);
+      }
+      cacheManager.removeCachePool(cachePoolName);
+      //getEditLog().logRemoveCachePool(req); // FIXME: HDFS-5119
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "removeCachePool", cachePoolName, null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+    
+    getEditLog().logSync();
+  }
+
+  public List<CachePoolInfo> listCachePools(String prevKey,
+      int maxRepliesPerRequest) throws IOException {
+    final FSPermissionChecker pc = getPermissionChecker();
+    List<CachePoolInfo> results;
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      results = cacheManager.listCachePools(pc, prevKey, maxRepliesPerRequest);
+    } finally {
+      readUnlock();
+    }
+    return results;
+  }
+
+  public CacheManager getCacheManager() {
+    return cacheManager;
   }
   }
 
 
   /**
   /**
@@ -6798,8 +6981,4 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
       }
     }
     }
   }
   }
-
-  public CacheManager getCacheManager() {
-    return cacheManager;
-  }
 }
 }

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -255,4 +255,40 @@ class FSPermissionChecker {
     throw new AccessControlException("Permission denied by sticky bit setting:" +
     throw new AccessControlException("Permission denied by sticky bit setting:" +
       " user=" + user + ", inode=" + inode);
       " user=" + user + ", inode=" + inode);
   }
   }
+
+  /**
+   * Check if this CachePool can be accessed.
+   *
+   * @param pc
+   *          Permission checker object with user name and groups.
+   * @param write
+   *          True if we care about write access; false otherwise.
+   * @return
+   *          True only if the cache pool is accessible.
+   */
+  private boolean checkPermission(String userName, 
+      String groupName, int mode, int mask) {
+    if ((mode & mask) != 0) {
+      return true;
+    }
+    if (((mode & (mask << 6)) != 0) 
+        && (getUser().equals(userName))) {
+      return true;
+    }
+    if (((mode & (mask << 6)) != 0) 
+        && (containsGroup(groupName))) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean checkWritePermission(String userName,
+      String groupName, int mode) {
+    return checkPermission(userName, groupName, mode, 02);
+  }
+
+  public boolean checkReadPermission(String userName,
+      String groupName, int mode) {
+    return checkPermission(userName, groupName, mode, 04);
+  }
 }
 }

+ 57 - 47
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -36,6 +36,8 @@ import java.util.NoSuchElementException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -1219,68 +1222,75 @@ class NameNodeRpcServer implements NamenodeProtocols {
     return namesystem.removePathCacheEntries(ids);
     return namesystem.removePathCacheEntries(ids);
   }
   }
 
 
-  private class PathCacheEntriesIterator
-      implements RemoteIterator<PathCacheEntry> {
-    private long prevId;
+  private class ServerSidePathCacheEntriesIterator
+      extends BatchedRemoteIterator<Long, PathCacheEntry> {
+
     private final String pool;
     private final String pool;
-    private final int repliesPerRequest;
-    private List<PathCacheEntry> entries;
-    private int idx;
 
 
-    public PathCacheEntriesIterator(long prevId, String pool,
-        int repliesPerRequest) {
-      this.prevId = prevId;
+    public ServerSidePathCacheEntriesIterator(Long firstKey,
+        int maxRepliesPerRequest, String pool) {
+      super(firstKey, maxRepliesPerRequest);
       this.pool = pool;
       this.pool = pool;
-      this.repliesPerRequest = repliesPerRequest;
-      this.entries = null;
-      this.idx = -1;
     }
     }
 
 
-    private void makeRequest() throws IOException {
-      idx = 0;
-      entries = null;
-      entries = namesystem.listPathCacheEntries(prevId, pool,
-          repliesPerRequest);
-      if (entries.isEmpty()) {
-        entries = null;
-      }
+    @Override
+    public BatchedEntries<PathCacheEntry> makeRequest(
+        Long nextKey, int maxRepliesPerRequest) throws IOException {
+      return new BatchedListEntries<PathCacheEntry>(
+          namesystem.listPathCacheEntries(nextKey, pool,
+              maxRepliesPerRequest));
     }
     }
 
 
-    private void makeRequestIfNeeded() throws IOException {
-      if (idx == -1) {
-        makeRequest();
-      } else if ((entries != null) && (idx >= entries.size())) {
-        if (entries.size() < repliesPerRequest) {
-          // Last time, we got fewer entries than requested.
-          // So we should be at the end.
-          entries = null;
-        } else {
-          makeRequest();
-        }
-      }
+    @Override
+    public Long elementToNextKey(PathCacheEntry entry) {
+      return entry.getEntryId();
+    }
+  }
+  
+  @Override
+  public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId, String pool,
+      int maxReplies) throws IOException {
+    return new ServerSidePathCacheEntriesIterator(prevId, maxReplies, pool);
+  }
+
+  @Override
+  public void addCachePool(CachePoolInfo info) throws IOException {
+    namesystem.addCachePool(info);
+  }
+
+  @Override
+  public void modifyCachePool(CachePoolInfo info) throws IOException {
+    namesystem.modifyCachePool(info);
+  }
+
+  @Override
+  public void removeCachePool(String cachePoolName) throws IOException {
+    namesystem.removeCachePool(cachePoolName);
+  }
+
+  private class ServerSideCachePoolIterator 
+      extends BatchedRemoteIterator<String, CachePoolInfo> {
+
+    public ServerSideCachePoolIterator(String prevKey, int maxRepliesPerRequest) {
+      super(prevKey, maxRepliesPerRequest);
     }
     }
 
 
     @Override
     @Override
-    public boolean hasNext() throws IOException {
-      makeRequestIfNeeded();
-      return (entries != null);
+    public BatchedEntries<CachePoolInfo> makeRequest(String prevKey,
+        int maxRepliesPerRequest) throws IOException {
+      return new BatchedListEntries<CachePoolInfo>(
+          namesystem.listCachePools(prevKey, maxRepliesPerRequest));
     }
     }
 
 
     @Override
     @Override
-    public PathCacheEntry next() throws IOException {
-      makeRequestIfNeeded();
-      if (entries == null) {
-        throw new NoSuchElementException();
-      }
-      PathCacheEntry entry = entries.get(idx++);
-      prevId = entry.getEntryId();
-      return entry;
+    public String elementToNextKey(CachePoolInfo element) {
+      return element.getPoolName();
     }
     }
   }
   }
-  
+
   @Override
   @Override
-  public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId, String pool,
-      int maxReplies) throws IOException {
-    return new PathCacheEntriesIterator(prevId, pool, maxReplies);
+  public RemoteIterator<CachePoolInfo> listCachePools(String prevKey,
+      int maxRepliesPerRequest) throws IOException {
+    return new ServerSideCachePoolIterator(prevKey, maxRepliesPerRequest);
   }
   }
 }
 }

+ 57 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -394,7 +394,8 @@ message RemovePathCacheEntriesResponseProto {
 enum RemovePathCacheEntryErrorProto {
 enum RemovePathCacheEntryErrorProto {
   INVALID_CACHED_PATH_ID_ERROR = -1;
   INVALID_CACHED_PATH_ID_ERROR = -1;
   NO_SUCH_CACHED_PATH_ID_ERROR = -2;
   NO_SUCH_CACHED_PATH_ID_ERROR = -2;
-  UNEXPECTED_REMOVE_ERROR = -3;
+  REMOVE_PERMISSION_DENIED_ERROR = -3;
+  UNEXPECTED_REMOVE_ERROR = -4;
 }
 }
 
 
 message ListPathCacheEntriesRequestProto {
 message ListPathCacheEntriesRequestProto {
@@ -414,6 +415,53 @@ message ListPathCacheEntriesResponseProto {
   required bool hasMore = 2;
   required bool hasMore = 2;
 }
 }
 
 
+message AddCachePoolRequestProto {
+  required string poolName = 1;
+  optional string ownerName = 2;
+  optional string groupName = 3;
+  optional int32 mode = 4;
+  optional int32 weight = 5;
+}
+
+message AddCachePoolResponseProto { // void response
+}
+
+message ModifyCachePoolRequestProto {
+  required string poolName = 1;
+  optional string ownerName = 2;
+  optional string groupName = 3;
+  optional int32 mode = 4;
+  optional int32 weight = 5;
+}
+
+message ModifyCachePoolResponseProto { // void response
+}
+
+message RemoveCachePoolRequestProto {
+  required string poolName = 1;
+}
+
+message RemoveCachePoolResponseProto { // void response
+}
+
+message ListCachePoolsRequestProto {
+  required string prevPoolName = 1;
+  required int32 maxReplies = 2;
+}
+
+message ListCachePoolsResponseProto {
+  repeated ListCachePoolsResponseElementProto elements = 1;
+  optional bool hasMore = 2;
+}
+
+message ListCachePoolsResponseElementProto {
+  required string poolName = 1;
+  required string ownerName = 2;
+  required string groupName = 3;
+  required int32 mode = 4;
+  required int32 weight = 5;
+}
+
 message GetFileLinkInfoRequestProto {
 message GetFileLinkInfoRequestProto {
   required string src = 1;
   required string src = 1;
 }
 }
@@ -601,6 +649,14 @@ service ClientNamenodeProtocol {
       returns (RemovePathCacheEntriesResponseProto);
       returns (RemovePathCacheEntriesResponseProto);
   rpc listPathCacheEntries(ListPathCacheEntriesRequestProto)
   rpc listPathCacheEntries(ListPathCacheEntriesRequestProto)
       returns (ListPathCacheEntriesResponseProto);
       returns (ListPathCacheEntriesResponseProto);
+  rpc addCachePool(AddCachePoolRequestProto)
+      returns(AddCachePoolResponseProto);
+  rpc modifyCachePool(ModifyCachePoolRequestProto)
+      returns(ModifyCachePoolResponseProto);
+  rpc removeCachePool(RemoveCachePoolRequestProto)
+      returns(RemoveCachePoolResponseProto);
+  rpc listCachePools(ListCachePoolsRequestProto)
+      returns(ListCachePoolsResponseProto);
   rpc getFileLinkInfo(GetFileLinkInfoRequestProto)
   rpc getFileLinkInfo(GetFileLinkInfoRequestProto)
       returns(GetFileLinkInfoResponseProto);
       returns(GetFileLinkInfoResponseProto);
   rpc getContentSummary(GetContentSummaryRequestProto)
   rpc getContentSummary(GetContentSummaryRequestProto)

+ 95 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java

@@ -17,6 +17,8 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
@@ -32,17 +34,89 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
 import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
+import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
 import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Fallible;
 import org.apache.hadoop.util.Fallible;
 import org.junit.Test;
 import org.junit.Test;
 
 
 public class TestPathCacheRequests {
 public class TestPathCacheRequests {
   static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class);
   static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class);
 
 
+  @Test
+  public void testCreateAndRemovePools() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    NamenodeProtocols proto = cluster.getNameNodeRpc();
+    CachePoolInfo req = new CachePoolInfo("pool1").
+        setOwnerName("bob").setGroupName("bobgroup").
+        setMode(0755).setWeight(150);
+    proto.addCachePool(req);
+    try {
+      proto.removeCachePool("pool99");
+      Assert.fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("can't remove " +
+          "nonexistent cache pool", ioe);
+    }
+    proto.removeCachePool("pool1");
+    try {
+      proto.removeCachePool("pool1");
+      Assert.fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("can't remove " +
+          "nonexistent cache pool", ioe);
+    }
+    req = new CachePoolInfo("pool2");
+    proto.addCachePool(req);
+  }
+
+  @Test
+  public void testCreateAndModifyPools() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    NamenodeProtocols proto = cluster.getNameNodeRpc();
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setOwnerName("abc").setGroupName("123").
+        setMode(0755).setWeight(150));
+    proto.modifyCachePool(new CachePoolInfo("pool1").
+        setOwnerName("def").setGroupName("456"));
+    RemoteIterator<CachePoolInfo> iter = proto.listCachePools("", 1);
+    CachePoolInfo info = iter.next();
+    assertEquals("pool1", info.getPoolName());
+    assertEquals("def", info.getOwnerName());
+    assertEquals("456", info.getGroupName());
+    assertEquals(Integer.valueOf(0755), info.getMode());
+    assertEquals(Integer.valueOf(150), info.getWeight());
+
+    try {
+      proto.removeCachePool("pool99");
+      Assert.fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+    }
+    proto.removeCachePool("pool1");
+    try {
+      proto.removeCachePool("pool1");
+      Assert.fail("expected to get an exception when " +
+          "removing a non-existent pool.");
+    } catch (IOException ioe) {
+    }
+  }
+
   private static void validateListAll(
   private static void validateListAll(
       RemoteIterator<PathCacheEntry> iter,
       RemoteIterator<PathCacheEntry> iter,
       long id0, long id1, long id2) throws Exception {
       long id0, long id1, long id2) throws Exception {
@@ -67,12 +141,18 @@ public class TestPathCacheRequests {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
       cluster.waitActive();
       NamenodeProtocols proto = cluster.getNameNodeRpc();
       NamenodeProtocols proto = cluster.getNameNodeRpc();
+      proto.addCachePool(new CachePoolInfo("pool1"));
+      proto.addCachePool(new CachePoolInfo("pool2"));
+      proto.addCachePool(new CachePoolInfo("pool3"));
+      proto.addCachePool(new CachePoolInfo("pool4").setMode(0));
       List<Fallible<PathCacheEntry>> addResults1 = 
       List<Fallible<PathCacheEntry>> addResults1 = 
           proto.addPathCacheDirectives(Arrays.asList(
           proto.addPathCacheDirectives(Arrays.asList(
             new PathCacheDirective[] {
             new PathCacheDirective[] {
         new PathCacheDirective("/alpha", "pool1"),
         new PathCacheDirective("/alpha", "pool1"),
         new PathCacheDirective("/beta", "pool2"),
         new PathCacheDirective("/beta", "pool2"),
-        new PathCacheDirective("", "pool3")
+        new PathCacheDirective("", "pool3"),
+        new PathCacheDirective("/zeta", "nonexistent_pool"),
+        new PathCacheDirective("/zeta", "pool4")
       }));
       }));
       long ids1[] = new long[2];
       long ids1[] = new long[2];
       ids1[0] = addResults1.get(0).get().getEntryId();
       ids1[0] = addResults1.get(0).get().getEntryId();
@@ -83,6 +163,20 @@ public class TestPathCacheRequests {
       } catch (IOException ioe) {
       } catch (IOException ioe) {
         Assert.assertTrue(ioe.getCause() instanceof EmptyPathError);
         Assert.assertTrue(ioe.getCause() instanceof EmptyPathError);
       }
       }
+      try {
+        addResults1.get(3).get();
+        Assert.fail("expected an error when adding to a nonexistent pool.");
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
+      }
+      try {
+        addResults1.get(4).get();
+        Assert.fail("expected an error when adding to a pool with " +
+            "mode 0 (no permissions for anyone).");
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe.getCause()
+            instanceof PoolWritePermissionDeniedError);
+      }
 
 
       List<Fallible<PathCacheEntry>> addResults2 = 
       List<Fallible<PathCacheEntry>> addResults2 = 
           proto.addPathCacheDirectives(Arrays.asList(
           proto.addPathCacheDirectives(Arrays.asList(