|
@@ -17,28 +17,34 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
-import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
|
|
|
-import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
|
|
|
-import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
|
|
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
|
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
|
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
|
|
|
-import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
|
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
|
|
|
import org.apache.hadoop.util.Fallible;
|
|
|
|
|
|
/**
|
|
@@ -65,62 +71,58 @@ final class CacheManager {
|
|
|
/**
|
|
|
* Cache pools, sorted by name.
|
|
|
*/
|
|
|
- private final TreeMap<String, CachePool> cachePoolsByName =
|
|
|
+ private final TreeMap<String, CachePool> cachePools =
|
|
|
new TreeMap<String, CachePool>();
|
|
|
|
|
|
/**
|
|
|
- * Cache pools, sorted by ID
|
|
|
+ * The entry ID to use for a new entry.
|
|
|
*/
|
|
|
- private final TreeMap<Long, CachePool> cachePoolsById =
|
|
|
- new TreeMap<Long, CachePool>();
|
|
|
+ private long nextEntryId;
|
|
|
|
|
|
/**
|
|
|
- * The entry ID to use for a new entry.
|
|
|
+ * Maximum number of cache pools to list in one operation.
|
|
|
*/
|
|
|
- private long nextEntryId;
|
|
|
+ private final int maxListCachePoolsResponses;
|
|
|
|
|
|
/**
|
|
|
- * The pool ID to use for a new pool.
|
|
|
+ * Maximum number of cache pool directives to list in one operation.
|
|
|
*/
|
|
|
- private long nextPoolId;
|
|
|
+ private final int maxListCacheDirectivesResponses;
|
|
|
|
|
|
CacheManager(FSDirectory dir, Configuration conf) {
|
|
|
// TODO: support loading and storing of the CacheManager state
|
|
|
clear();
|
|
|
+ maxListCachePoolsResponses = conf.getInt(
|
|
|
+ DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
|
|
|
+ DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
|
|
|
+ maxListCacheDirectivesResponses = conf.getInt(
|
|
|
+ DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
|
|
|
+ DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT);
|
|
|
}
|
|
|
|
|
|
synchronized void clear() {
|
|
|
entriesById.clear();
|
|
|
entriesByDirective.clear();
|
|
|
- cachePoolsByName.clear();
|
|
|
- cachePoolsById.clear();
|
|
|
+ cachePools.clear();
|
|
|
nextEntryId = 1;
|
|
|
- nextPoolId = 1;
|
|
|
}
|
|
|
|
|
|
synchronized long getNextEntryId() throws IOException {
|
|
|
if (nextEntryId == Long.MAX_VALUE) {
|
|
|
- throw new IOException("no more available entry IDs");
|
|
|
+ throw new IOException("no more available IDs");
|
|
|
}
|
|
|
return nextEntryId++;
|
|
|
}
|
|
|
|
|
|
- synchronized long getNextPoolId() throws IOException {
|
|
|
- if (nextPoolId == Long.MAX_VALUE) {
|
|
|
- throw new IOException("no more available pool IDs");
|
|
|
- }
|
|
|
- return nextPoolId++;
|
|
|
- }
|
|
|
-
|
|
|
private synchronized Fallible<PathCacheEntry> addDirective(
|
|
|
- FSPermissionChecker pc, PathCacheDirective directive) {
|
|
|
- CachePool pool = cachePoolsById.get(directive.getPoolId());
|
|
|
+ PathCacheDirective directive, FSPermissionChecker pc) {
|
|
|
+ CachePool pool = cachePools.get(directive.getPool());
|
|
|
if (pool == null) {
|
|
|
LOG.info("addDirective " + directive + ": pool not found.");
|
|
|
return new Fallible<PathCacheEntry>(
|
|
|
- new InvalidPoolError(directive));
|
|
|
+ new InvalidPoolNameError(directive));
|
|
|
}
|
|
|
- if (!pc.checkPermission(pool, FsAction.WRITE)) {
|
|
|
+ if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
|
|
|
LOG.info("addDirective " + directive + ": write permission denied.");
|
|
|
return new Fallible<PathCacheEntry>(
|
|
|
new PoolWritePermissionDeniedError(directive));
|
|
@@ -155,17 +157,17 @@ final class CacheManager {
|
|
|
}
|
|
|
|
|
|
public synchronized List<Fallible<PathCacheEntry>> addDirectives(
|
|
|
- FSPermissionChecker pc, List<PathCacheDirective> directives) {
|
|
|
+ List<PathCacheDirective> directives, FSPermissionChecker pc) {
|
|
|
ArrayList<Fallible<PathCacheEntry>> results =
|
|
|
new ArrayList<Fallible<PathCacheEntry>>(directives.size());
|
|
|
for (PathCacheDirective directive: directives) {
|
|
|
- results.add(addDirective(pc, directive));
|
|
|
+ results.add(addDirective(directive, pc));
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
- private synchronized Fallible<Long> removeEntry(FSPermissionChecker pc,
|
|
|
- long entryId) {
|
|
|
+ private synchronized Fallible<Long> removeEntry(long entryId,
|
|
|
+ FSPermissionChecker pc) {
|
|
|
// Check for invalid IDs.
|
|
|
if (entryId <= 0) {
|
|
|
LOG.info("removeEntry " + entryId + ": invalid non-positive entry ID.");
|
|
@@ -177,20 +179,20 @@ final class CacheManager {
|
|
|
LOG.info("removeEntry " + entryId + ": entry not found.");
|
|
|
return new Fallible<Long>(new NoSuchIdException(entryId));
|
|
|
}
|
|
|
- CachePool pool = cachePoolsById.get(existing.getDirective().getPoolId());
|
|
|
+ CachePool pool = cachePools.get(existing.getDirective().getPool());
|
|
|
if (pool == null) {
|
|
|
LOG.info("removeEntry " + entryId + ": pool not found for directive " +
|
|
|
existing.getDirective());
|
|
|
return new Fallible<Long>(
|
|
|
new UnexpectedRemovePathCacheEntryException(entryId));
|
|
|
}
|
|
|
- if (!pc.checkPermission(pool, FsAction.WRITE)) {
|
|
|
+ if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
|
|
|
LOG.info("removeEntry " + entryId + ": write permission denied to " +
|
|
|
"pool " + pool + " for entry " + existing);
|
|
|
return new Fallible<Long>(
|
|
|
new RemovePermissionDeniedException(entryId));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Remove the corresponding entry in entriesByDirective.
|
|
|
if (entriesByDirective.remove(existing.getDirective()) == null) {
|
|
|
LOG.warn("removeEntry " + entryId + ": failed to find existing entry " +
|
|
@@ -202,41 +204,43 @@ final class CacheManager {
|
|
|
return new Fallible<Long>(entryId);
|
|
|
}
|
|
|
|
|
|
- public synchronized List<Fallible<Long>> removeEntries(FSPermissionChecker pc,
|
|
|
- List<Long> entryIds) {
|
|
|
+ public synchronized List<Fallible<Long>> removeEntries(List<Long> entryIds,
|
|
|
+ FSPermissionChecker pc) {
|
|
|
ArrayList<Fallible<Long>> results =
|
|
|
new ArrayList<Fallible<Long>>(entryIds.size());
|
|
|
for (Long entryId : entryIds) {
|
|
|
- results.add(removeEntry(pc, entryId));
|
|
|
+ results.add(removeEntry(entryId, pc));
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
- public synchronized List<PathCacheEntry> listPathCacheEntries(
|
|
|
- FSPermissionChecker pc, long prevId, Long poolId, int maxReplies) {
|
|
|
- final int MAX_PRE_ALLOCATED_ENTRIES = 16;
|
|
|
- ArrayList<PathCacheEntry> replies = new ArrayList<PathCacheEntry>(
|
|
|
- Math.min(MAX_PRE_ALLOCATED_ENTRIES, maxReplies));
|
|
|
+ public synchronized BatchedListEntries<PathCacheEntry>
|
|
|
+ listPathCacheEntries(long prevId, String filterPool, FSPermissionChecker pc) {
|
|
|
+ final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
|
+ ArrayList<PathCacheEntry> replies =
|
|
|
+ new ArrayList<PathCacheEntry>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
|
int numReplies = 0;
|
|
|
SortedMap<Long, PathCacheEntry> tailMap = entriesById.tailMap(prevId + 1);
|
|
|
- for (PathCacheEntry entry : tailMap.values()) {
|
|
|
- if (numReplies >= maxReplies) {
|
|
|
- return replies;
|
|
|
+ for (Entry<Long, PathCacheEntry> cur : tailMap.entrySet()) {
|
|
|
+ if (numReplies >= maxListCacheDirectivesResponses) {
|
|
|
+ return new BatchedListEntries<PathCacheEntry>(replies, true);
|
|
|
}
|
|
|
- long entryPoolId = entry.getDirective().getPoolId();
|
|
|
- if (poolId == null || poolId <= 0 || entryPoolId == poolId) {
|
|
|
- if (pc.checkPermission(
|
|
|
- cachePoolsById.get(entryPoolId), FsAction.EXECUTE)) {
|
|
|
- replies.add(entry);
|
|
|
- numReplies++;
|
|
|
- }
|
|
|
+ PathCacheEntry curEntry = cur.getValue();
|
|
|
+ if (!filterPool.isEmpty() &&
|
|
|
+ !cur.getValue().getDirective().getPool().equals(filterPool)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ CachePool pool = cachePools.get(curEntry.getDirective().getPool());
|
|
|
+ if (pool == null) {
|
|
|
+ LOG.error("invalid pool for PathCacheEntry " + curEntry);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (pc.checkPermission(pool, FsAction.EXECUTE)) {
|
|
|
+ replies.add(cur.getValue());
|
|
|
+ numReplies++;
|
|
|
}
|
|
|
}
|
|
|
- return replies;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized CachePool getCachePool(long id) {
|
|
|
- return cachePoolsById.get(id);
|
|
|
+ return new BatchedListEntries<PathCacheEntry>(replies, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -246,24 +250,22 @@ final class CacheManager {
|
|
|
*
|
|
|
* @param info
|
|
|
* The info for the cache pool to create.
|
|
|
- * @return created CachePool
|
|
|
*/
|
|
|
- public synchronized CachePool addCachePool(CachePoolInfo info)
|
|
|
+ public synchronized void addCachePool(CachePoolInfo info)
|
|
|
throws IOException {
|
|
|
String poolName = info.getPoolName();
|
|
|
- if (poolName == null || poolName.isEmpty()) {
|
|
|
+ if (poolName.isEmpty()) {
|
|
|
throw new IOException("invalid empty cache pool name");
|
|
|
}
|
|
|
- if (cachePoolsByName.containsKey(poolName)) {
|
|
|
+ CachePool pool = cachePools.get(poolName);
|
|
|
+ if (pool != null) {
|
|
|
throw new IOException("cache pool " + poolName + " already exists.");
|
|
|
}
|
|
|
- CachePool cachePool = new CachePool(getNextPoolId(), poolName,
|
|
|
+ CachePool cachePool = new CachePool(poolName,
|
|
|
info.getOwnerName(), info.getGroupName(), info.getMode(),
|
|
|
info.getWeight());
|
|
|
- cachePoolsById.put(cachePool.getId(), cachePool);
|
|
|
- cachePoolsByName.put(poolName, cachePool);
|
|
|
+ cachePools.put(poolName, cachePool);
|
|
|
LOG.info("created new cache pool " + cachePool);
|
|
|
- return cachePool;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -274,62 +276,46 @@ final class CacheManager {
|
|
|
* @param info
|
|
|
* The info for the cache pool to modify.
|
|
|
*/
|
|
|
- public synchronized void modifyCachePool(long poolId, CachePoolInfo info)
|
|
|
+ public synchronized void modifyCachePool(CachePoolInfo info)
|
|
|
throws IOException {
|
|
|
- if (poolId <= 0) {
|
|
|
- throw new IOException("invalid pool id " + poolId);
|
|
|
+ String poolName = info.getPoolName();
|
|
|
+ if (poolName.isEmpty()) {
|
|
|
+ throw new IOException("invalid empty cache pool name");
|
|
|
}
|
|
|
- if (!cachePoolsById.containsKey(poolId)) {
|
|
|
- throw new IOException("cache pool id " + poolId + " does not exist.");
|
|
|
+ CachePool pool = cachePools.get(poolName);
|
|
|
+ if (pool == null) {
|
|
|
+ throw new IOException("cache pool " + poolName + " does not exist.");
|
|
|
}
|
|
|
- CachePool pool = cachePoolsById.get(poolId);
|
|
|
- // Remove the old CachePoolInfo
|
|
|
- removeCachePool(poolId);
|
|
|
- // Build up the new CachePoolInfo
|
|
|
- CachePoolInfo.Builder newInfo = CachePoolInfo.newBuilder(pool.getInfo());
|
|
|
StringBuilder bld = new StringBuilder();
|
|
|
String prefix = "";
|
|
|
- if (info.getPoolName() != null) {
|
|
|
- newInfo.setPoolName(info.getPoolName());
|
|
|
- bld.append(prefix).
|
|
|
- append("set name to ").append(info.getOwnerName());
|
|
|
- prefix = "; ";
|
|
|
- }
|
|
|
if (info.getOwnerName() != null) {
|
|
|
- newInfo.setOwnerName(info.getOwnerName());
|
|
|
+ pool.setOwnerName(info.getOwnerName());
|
|
|
bld.append(prefix).
|
|
|
append("set owner to ").append(info.getOwnerName());
|
|
|
prefix = "; ";
|
|
|
}
|
|
|
if (info.getGroupName() != null) {
|
|
|
- newInfo.setGroupName(info.getGroupName());
|
|
|
+ pool.setGroupName(info.getGroupName());
|
|
|
bld.append(prefix).
|
|
|
append("set group to ").append(info.getGroupName());
|
|
|
prefix = "; ";
|
|
|
}
|
|
|
if (info.getMode() != null) {
|
|
|
- newInfo.setMode(info.getMode());
|
|
|
+ pool.setMode(info.getMode());
|
|
|
bld.append(prefix).
|
|
|
- append(String.format("set mode to ", info.getMode()));
|
|
|
+ append(String.format("set mode to 0%3o", info.getMode()));
|
|
|
prefix = "; ";
|
|
|
}
|
|
|
if (info.getWeight() != null) {
|
|
|
- newInfo.setWeight(info.getWeight());
|
|
|
+ pool.setWeight(info.getWeight());
|
|
|
bld.append(prefix).
|
|
|
append("set weight to ").append(info.getWeight());
|
|
|
prefix = "; ";
|
|
|
}
|
|
|
if (prefix.isEmpty()) {
|
|
|
bld.append("no changes.");
|
|
|
- } else {
|
|
|
- pool.setInfo(newInfo.build());
|
|
|
}
|
|
|
- // Put the newly modified info back in
|
|
|
- cachePoolsById.put(poolId, pool);
|
|
|
- cachePoolsByName.put(info.getPoolName(), pool);
|
|
|
- LOG.info("modified pool id " + pool.getId()
|
|
|
- + " (" + pool.getInfo().getPoolName() + "); "
|
|
|
- + bld.toString());
|
|
|
+ LOG.info("modified " + poolName + "; " + bld.toString());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -337,39 +323,47 @@ final class CacheManager {
|
|
|
*
|
|
|
* Only the superuser should be able to call this function.
|
|
|
*
|
|
|
- * @param poolId
|
|
|
- * The id of the cache pool to remove.
|
|
|
+ * @param poolName
|
|
|
+ * The name for the cache pool to remove.
|
|
|
*/
|
|
|
- public synchronized void removeCachePool(long poolId) throws IOException {
|
|
|
- if (!cachePoolsById.containsKey(poolId)) {
|
|
|
- throw new IOException("can't remove nonexistent cache pool id " + poolId);
|
|
|
+ public synchronized void removeCachePool(String poolName)
|
|
|
+ throws IOException {
|
|
|
+ CachePool pool = cachePools.remove(poolName);
|
|
|
+ if (pool == null) {
|
|
|
+ throw new IOException("can't remove nonexistent cache pool " + poolName);
|
|
|
}
|
|
|
- // Remove all the entries associated with the pool
|
|
|
- Iterator<Map.Entry<Long, PathCacheEntry>> it =
|
|
|
- entriesById.entrySet().iterator();
|
|
|
- while (it.hasNext()) {
|
|
|
- Map.Entry<Long, PathCacheEntry> entry = it.next();
|
|
|
- if (entry.getValue().getDirective().getPoolId() == poolId) {
|
|
|
- it.remove();
|
|
|
- entriesByDirective.remove(entry.getValue().getDirective());
|
|
|
+
|
|
|
+ // Remove entries using this pool
|
|
|
+ // TODO: could optimize this somewhat to avoid the need to iterate
|
|
|
+ // over all entries in entriesByDirective
|
|
|
+ Iterator<Entry<PathCacheDirective, PathCacheEntry>> iter =
|
|
|
+ entriesByDirective.entrySet().iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Entry<PathCacheDirective, PathCacheEntry> entry = iter.next();
|
|
|
+ if (entry.getKey().getPool().equals(poolName)) {
|
|
|
+ entriesById.remove(entry.getValue().getEntryId());
|
|
|
+ iter.remove();
|
|
|
}
|
|
|
}
|
|
|
- // Remove the pool
|
|
|
- CachePool pool = cachePoolsById.remove(poolId);
|
|
|
- cachePoolsByName.remove(pool.getInfo().getPoolName());
|
|
|
}
|
|
|
|
|
|
- public synchronized List<CachePool> listCachePools(Long prevKey,
|
|
|
- int maxRepliesPerRequest) {
|
|
|
- final int MAX_PREALLOCATED_REPLIES = 16;
|
|
|
- ArrayList<CachePool> results =
|
|
|
- new ArrayList<CachePool>(Math.min(MAX_PREALLOCATED_REPLIES,
|
|
|
- maxRepliesPerRequest));
|
|
|
- SortedMap<Long, CachePool> tailMap =
|
|
|
- cachePoolsById.tailMap(prevKey, false);
|
|
|
- for (CachePool pool : tailMap.values()) {
|
|
|
- results.add(pool);
|
|
|
+ public synchronized BatchedListEntries<CachePoolInfo>
|
|
|
+ listCachePools(FSPermissionChecker pc, String prevKey) {
|
|
|
+ final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
|
+ ArrayList<CachePoolInfo> results =
|
|
|
+ new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
|
+ SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
|
|
|
+ int numListed = 0;
|
|
|
+ for (Entry<String, CachePool> cur : tailMap.entrySet()) {
|
|
|
+ if (numListed++ >= maxListCachePoolsResponses) {
|
|
|
+ return new BatchedListEntries<CachePoolInfo>(results, true);
|
|
|
+ }
|
|
|
+ if (pc == null) {
|
|
|
+ results.add(cur.getValue().getInfo(true));
|
|
|
+ } else {
|
|
|
+ results.add(cur.getValue().getInfo(pc));
|
|
|
+ }
|
|
|
}
|
|
|
- return results;
|
|
|
+ return new BatchedListEntries<CachePoolInfo>(results, false);
|
|
|
}
|
|
|
}
|