|
@@ -19,8 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
@@ -38,14 +38,15 @@ import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
|
|
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
|
|
|
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
|
|
|
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
|
|
|
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
|
|
|
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
|
|
|
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheEntryException.InvalidIdException;
|
|
|
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheEntryException.NoSuchIdException;
|
|
|
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheEntryException.UnexpectedRemovePathBasedCacheEntryException;
|
|
|
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheEntryException.RemovePermissionDeniedException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
|
|
|
import org.apache.hadoop.util.Fallible;
|
|
|
|
|
|
/**
|
|
@@ -57,18 +58,12 @@ public final class CacheManager {
|
|
|
/**
|
|
|
* Cache entries, sorted by ID.
|
|
|
*
|
|
|
- * listPathBasedCacheEntries relies on the ordering of elements in this map
|
|
|
+ * listPathBasedCacheDescriptors relies on the ordering of elements in this map
|
|
|
* to track what has already been listed by the client.
|
|
|
*/
|
|
|
private final TreeMap<Long, PathBasedCacheEntry> entriesById =
|
|
|
new TreeMap<Long, PathBasedCacheEntry>();
|
|
|
|
|
|
- /**
|
|
|
- * Cache entries, sorted by directive.
|
|
|
- */
|
|
|
- private final TreeMap<PathBasedCacheDirective, PathBasedCacheEntry> entriesByDirective =
|
|
|
- new TreeMap<PathBasedCacheDirective, PathBasedCacheEntry>();
|
|
|
-
|
|
|
/**
|
|
|
* Cache entries, sorted by path
|
|
|
*/
|
|
@@ -94,7 +89,7 @@ public final class CacheManager {
|
|
|
/**
|
|
|
* Maximum number of cache pool directives to list in one operation.
|
|
|
*/
|
|
|
- private final int maxListCacheDirectivesResponses;
|
|
|
+ private final int maxListCacheDescriptorsResponses;
|
|
|
|
|
|
final private FSNamesystem namesystem;
|
|
|
final private FSDirectory dir;
|
|
@@ -107,14 +102,13 @@ public final class CacheManager {
|
|
|
maxListCachePoolsResponses = conf.getInt(
|
|
|
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
|
|
|
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
|
|
|
- maxListCacheDirectivesResponses = conf.getInt(
|
|
|
- DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
|
|
|
- DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT);
|
|
|
+ maxListCacheDescriptorsResponses = conf.getInt(
|
|
|
+ DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
|
|
|
+ DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
|
|
|
}
|
|
|
|
|
|
synchronized void clear() {
|
|
|
entriesById.clear();
|
|
|
- entriesByDirective.clear();
|
|
|
entriesByPath.clear();
|
|
|
cachePools.clear();
|
|
|
nextEntryId = 1;
|
|
@@ -127,17 +121,32 @@ public final class CacheManager {
|
|
|
return nextEntryId++;
|
|
|
}
|
|
|
|
|
|
- private synchronized Fallible<PathBasedCacheEntry> addDirective(
|
|
|
+ private synchronized PathBasedCacheEntry
|
|
|
+ findEntry(PathBasedCacheDirective directive) {
|
|
|
+ List<PathBasedCacheEntry> existing =
|
|
|
+ entriesByPath.get(directive.getPath());
|
|
|
+ if (existing == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ for (PathBasedCacheEntry entry : existing) {
|
|
|
+ if (entry.getPool().getName().equals(directive.getPool())) {
|
|
|
+ return entry;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized Fallible<PathBasedCacheDescriptor> addDirective(
|
|
|
PathBasedCacheDirective directive, FSPermissionChecker pc) {
|
|
|
CachePool pool = cachePools.get(directive.getPool());
|
|
|
if (pool == null) {
|
|
|
LOG.info("addDirective " + directive + ": pool not found.");
|
|
|
- return new Fallible<PathBasedCacheEntry>(
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(
|
|
|
new InvalidPoolNameError(directive));
|
|
|
}
|
|
|
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
|
|
|
LOG.info("addDirective " + directive + ": write permission denied.");
|
|
|
- return new Fallible<PathBasedCacheEntry>(
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(
|
|
|
new PoolWritePermissionDeniedError(directive));
|
|
|
}
|
|
|
try {
|
|
@@ -145,22 +154,24 @@ public final class CacheManager {
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.info("addDirective " + directive + ": validation failed: "
|
|
|
+ ioe.getClass().getName() + ": " + ioe.getMessage());
|
|
|
- return new Fallible<PathBasedCacheEntry>(ioe);
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(ioe);
|
|
|
}
|
|
|
+
|
|
|
// Check if we already have this entry.
|
|
|
- PathBasedCacheEntry existing = entriesByDirective.get(directive);
|
|
|
+ PathBasedCacheEntry existing = findEntry(directive);
|
|
|
if (existing != null) {
|
|
|
- // Entry already exists: return existing entry.
|
|
|
LOG.info("addDirective " + directive + ": there is an " +
|
|
|
- "existing directive " + existing);
|
|
|
- return new Fallible<PathBasedCacheEntry>(existing);
|
|
|
+ "existing directive " + existing + " in this pool.");
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(
|
|
|
+ existing.getDescriptor());
|
|
|
}
|
|
|
// Add a new entry with the next available ID.
|
|
|
PathBasedCacheEntry entry;
|
|
|
try {
|
|
|
- entry = new PathBasedCacheEntry(getNextEntryId(), directive);
|
|
|
+ entry = new PathBasedCacheEntry(getNextEntryId(),
|
|
|
+ directive.getPath(), pool);
|
|
|
} catch (IOException ioe) {
|
|
|
- return new Fallible<PathBasedCacheEntry>(
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(
|
|
|
new UnexpectedAddPathBasedCacheDirectiveException(directive));
|
|
|
}
|
|
|
LOG.info("addDirective " + directive + ": added cache directive "
|
|
@@ -168,7 +179,6 @@ public final class CacheManager {
|
|
|
|
|
|
// Success!
|
|
|
// First, add it to the various maps
|
|
|
- entriesByDirective.put(directive, entry);
|
|
|
entriesById.put(entry.getEntryId(), entry);
|
|
|
String path = directive.getPath();
|
|
|
List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
|
|
@@ -181,7 +191,7 @@ public final class CacheManager {
|
|
|
// Next, set the path as cached in the namesystem
|
|
|
try {
|
|
|
INode node = dir.getINode(directive.getPath());
|
|
|
- if (node.isFile()) {
|
|
|
+ if (node != null && node.isFile()) {
|
|
|
INodeFile file = node.asFile();
|
|
|
// TODO: adjustable cache replication factor
|
|
|
namesystem.setCacheReplicationInt(directive.getPath(),
|
|
@@ -192,96 +202,90 @@ public final class CacheManager {
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.info("addDirective " + directive +": failed to cache file: " +
|
|
|
ioe.getClass().getName() +": " + ioe.getMessage());
|
|
|
- return new Fallible<PathBasedCacheEntry>(ioe);
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(ioe);
|
|
|
}
|
|
|
-
|
|
|
- return new Fallible<PathBasedCacheEntry>(entry);
|
|
|
+ return new Fallible<PathBasedCacheDescriptor>(
|
|
|
+ entry.getDescriptor());
|
|
|
}
|
|
|
|
|
|
- public synchronized List<Fallible<PathBasedCacheEntry>> addDirectives(
|
|
|
+ public synchronized List<Fallible<PathBasedCacheDescriptor>> addDirectives(
|
|
|
List<PathBasedCacheDirective> directives, FSPermissionChecker pc) {
|
|
|
- ArrayList<Fallible<PathBasedCacheEntry>> results =
|
|
|
- new ArrayList<Fallible<PathBasedCacheEntry>>(directives.size());
|
|
|
+ ArrayList<Fallible<PathBasedCacheDescriptor>> results =
|
|
|
+ new ArrayList<Fallible<PathBasedCacheDescriptor>>(directives.size());
|
|
|
for (PathBasedCacheDirective directive: directives) {
|
|
|
results.add(addDirective(directive, pc));
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
- private synchronized Fallible<Long> removeEntry(long entryId,
|
|
|
+ private synchronized Fallible<Long> removeDescriptor(long id,
|
|
|
FSPermissionChecker pc) {
|
|
|
// Check for invalid IDs.
|
|
|
- if (entryId <= 0) {
|
|
|
- LOG.info("removeEntry " + entryId + ": invalid non-positive entry ID.");
|
|
|
- return new Fallible<Long>(new InvalidIdException(entryId));
|
|
|
+ if (id <= 0) {
|
|
|
+ LOG.info("removeDescriptor " + id + ": invalid non-positive " +
|
|
|
+ "descriptor ID.");
|
|
|
+ return new Fallible<Long>(new InvalidIdException(id));
|
|
|
}
|
|
|
// Find the entry.
|
|
|
- PathBasedCacheEntry existing = entriesById.get(entryId);
|
|
|
+ PathBasedCacheEntry existing = entriesById.get(id);
|
|
|
if (existing == null) {
|
|
|
- LOG.info("removeEntry " + entryId + ": entry not found.");
|
|
|
- return new Fallible<Long>(new NoSuchIdException(entryId));
|
|
|
+ LOG.info("removeDescriptor " + id + ": entry not found.");
|
|
|
+ return new Fallible<Long>(new NoSuchIdException(id));
|
|
|
}
|
|
|
- CachePool pool = cachePools.get(existing.getDirective().getPool());
|
|
|
+ CachePool pool = cachePools.get(existing.getDescriptor().getPool());
|
|
|
if (pool == null) {
|
|
|
- LOG.info("removeEntry " + entryId + ": pool not found for directive " +
|
|
|
- existing.getDirective());
|
|
|
+ LOG.info("removeDescriptor " + id + ": pool not found for directive " +
|
|
|
+ existing.getDescriptor());
|
|
|
return new Fallible<Long>(
|
|
|
- new UnexpectedRemovePathBasedCacheEntryException(entryId));
|
|
|
+ new UnexpectedRemovePathBasedCacheDescriptorException(id));
|
|
|
}
|
|
|
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
|
|
|
- LOG.info("removeEntry " + entryId + ": write permission denied to " +
|
|
|
+ LOG.info("removeDescriptor " + id + ": write permission denied to " +
|
|
|
"pool " + pool + " for entry " + existing);
|
|
|
return new Fallible<Long>(
|
|
|
- new RemovePermissionDeniedException(entryId));
|
|
|
+ new RemovePermissionDeniedException(id));
|
|
|
}
|
|
|
|
|
|
- // Remove the corresponding entry in entriesByDirective.
|
|
|
- if (entriesByDirective.remove(existing.getDirective()) == null) {
|
|
|
- LOG.warn("removeEntry " + entryId + ": failed to find existing entry " +
|
|
|
- existing + " in entriesByDirective");
|
|
|
- return new Fallible<Long>(
|
|
|
- new UnexpectedRemovePathBasedCacheEntryException(entryId));
|
|
|
- }
|
|
|
// Remove the corresponding entry in entriesByPath.
|
|
|
- String path = existing.getDirective().getPath();
|
|
|
+ String path = existing.getDescriptor().getPath();
|
|
|
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
|
|
|
if (entries == null || !entries.remove(existing)) {
|
|
|
return new Fallible<Long>(
|
|
|
- new UnexpectedRemovePathBasedCacheEntryException(entryId));
|
|
|
+ new UnexpectedRemovePathBasedCacheDescriptorException(id));
|
|
|
}
|
|
|
if (entries.size() == 0) {
|
|
|
entriesByPath.remove(path);
|
|
|
}
|
|
|
- entriesById.remove(entryId);
|
|
|
+ entriesById.remove(id);
|
|
|
|
|
|
// Set the path as uncached in the namesystem
|
|
|
try {
|
|
|
- INode node = dir.getINode(existing.getDirective().getPath());
|
|
|
- if (node.isFile()) {
|
|
|
- namesystem.setCacheReplicationInt(existing.getDirective().getPath(),
|
|
|
+ INode node = dir.getINode(existing.getDescriptor().getPath());
|
|
|
+ if (node != null && node.isFile()) {
|
|
|
+ namesystem.setCacheReplicationInt(existing.getDescriptor().getPath(),
|
|
|
(short) 0);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.warn("removeEntry " + entryId + ": failure while setting cache"
|
|
|
+ LOG.warn("removeDescriptor " + id + ": failure while setting cache"
|
|
|
+ " replication factor", e);
|
|
|
return new Fallible<Long>(e);
|
|
|
}
|
|
|
- LOG.info("removeEntry successful for PathCacheEntry id " + entryId);
|
|
|
- return new Fallible<Long>(entryId);
|
|
|
+ LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
|
|
|
+ return new Fallible<Long>(id);
|
|
|
}
|
|
|
|
|
|
- public synchronized List<Fallible<Long>> removeEntries(List<Long> entryIds,
|
|
|
+ public synchronized List<Fallible<Long>> removeDescriptors(List<Long> ids,
|
|
|
FSPermissionChecker pc) {
|
|
|
ArrayList<Fallible<Long>> results =
|
|
|
- new ArrayList<Fallible<Long>>(entryIds.size());
|
|
|
- for (Long entryId : entryIds) {
|
|
|
- results.add(removeEntry(entryId, pc));
|
|
|
+ new ArrayList<Fallible<Long>>(ids.size());
|
|
|
+ for (Long id : ids) {
|
|
|
+ results.add(removeDescriptor(id, pc));
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
- public synchronized BatchedListEntries<PathBasedCacheEntry>
|
|
|
- listPathBasedCacheEntries(long prevId, String filterPool,
|
|
|
+ public synchronized BatchedListEntries<PathBasedCacheDescriptor>
|
|
|
+ listPathBasedCacheDescriptors(long prevId, String filterPool,
|
|
|
String filterPath, FSPermissionChecker pc) throws IOException {
|
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
|
if (filterPath != null) {
|
|
@@ -289,16 +293,16 @@ public final class CacheManager {
|
|
|
throw new IOException("invalid path name '" + filterPath + "'");
|
|
|
}
|
|
|
}
|
|
|
- ArrayList<PathBasedCacheEntry> replies =
|
|
|
- new ArrayList<PathBasedCacheEntry>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
|
+ ArrayList<PathBasedCacheDescriptor> replies =
|
|
|
+ new ArrayList<PathBasedCacheDescriptor>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
|
int numReplies = 0;
|
|
|
SortedMap<Long, PathBasedCacheEntry> tailMap = entriesById.tailMap(prevId + 1);
|
|
|
for (Entry<Long, PathBasedCacheEntry> cur : tailMap.entrySet()) {
|
|
|
- if (numReplies >= maxListCacheDirectivesResponses) {
|
|
|
- return new BatchedListEntries<PathBasedCacheEntry>(replies, true);
|
|
|
+ if (numReplies >= maxListCacheDescriptorsResponses) {
|
|
|
+ return new BatchedListEntries<PathBasedCacheDescriptor>(replies, true);
|
|
|
}
|
|
|
PathBasedCacheEntry curEntry = cur.getValue();
|
|
|
- PathBasedCacheDirective directive = cur.getValue().getDirective();
|
|
|
+ PathBasedCacheDirective directive = cur.getValue().getDescriptor();
|
|
|
if (filterPool != null &&
|
|
|
!directive.getPool().equals(filterPool)) {
|
|
|
continue;
|
|
@@ -307,17 +311,12 @@ public final class CacheManager {
|
|
|
!directive.getPath().equals(filterPath)) {
|
|
|
continue;
|
|
|
}
|
|
|
- CachePool pool = cachePools.get(curEntry.getDirective().getPool());
|
|
|
- if (pool == null) {
|
|
|
- LOG.error("invalid pool for PathBasedCacheEntry " + curEntry);
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (pc.checkPermission(pool, FsAction.READ)) {
|
|
|
- replies.add(cur.getValue());
|
|
|
+ if (pc.checkPermission(curEntry.getPool(), FsAction.READ)) {
|
|
|
+ replies.add(cur.getValue().getDescriptor());
|
|
|
numReplies++;
|
|
|
}
|
|
|
}
|
|
|
- return new BatchedListEntries<PathBasedCacheEntry>(replies, false);
|
|
|
+ return new BatchedListEntries<PathBasedCacheDescriptor>(replies, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -409,12 +408,12 @@ public final class CacheManager {
|
|
|
|
|
|
// Remove entries using this pool
|
|
|
// TODO: could optimize this somewhat to avoid the need to iterate
|
|
|
- // over all entries in entriesByDirective
|
|
|
- Iterator<Entry<PathBasedCacheDirective, PathBasedCacheEntry>> iter =
|
|
|
- entriesByDirective.entrySet().iterator();
|
|
|
+ // over all entries in entriesById
|
|
|
+ Iterator<Entry<Long, PathBasedCacheEntry>> iter =
|
|
|
+ entriesById.entrySet().iterator();
|
|
|
while (iter.hasNext()) {
|
|
|
- Entry<PathBasedCacheDirective, PathBasedCacheEntry> entry = iter.next();
|
|
|
- if (entry.getKey().getPool().equals(poolName)) {
|
|
|
+ Entry<Long, PathBasedCacheEntry> entry = iter.next();
|
|
|
+ if (entry.getValue().getPool() == pool) {
|
|
|
entriesById.remove(entry.getValue().getEntryId());
|
|
|
iter.remove();
|
|
|
}
|