|
@@ -43,7 +43,7 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.fs.IdNotFoundException;
|
|
|
|
|
|
+import org.apache.hadoop.fs.InvalidRequestException;
|
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
@@ -250,11 +250,87 @@ public final class CacheManager {
|
|
private long getNextEntryId() throws IOException {
|
|
private long getNextEntryId() throws IOException {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
if (nextEntryId == Long.MAX_VALUE) {
|
|
if (nextEntryId == Long.MAX_VALUE) {
|
|
- throw new IOException("No more available IDs");
|
|
|
|
|
|
+ throw new IOException("No more available IDs.");
|
|
}
|
|
}
|
|
return nextEntryId++;
|
|
return nextEntryId++;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Helper getter / validation methods
|
|
|
|
+
|
|
|
|
+ private static void checkWritePermission(FSPermissionChecker pc,
|
|
|
|
+ CachePool pool) throws AccessControlException {
|
|
|
|
+ if ((pc != null)) {
|
|
|
|
+ pc.checkPermission(pool, FsAction.WRITE);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static String validatePoolName(PathBasedCacheDirective directive)
|
|
|
|
+ throws InvalidRequestException {
|
|
|
|
+ String pool = directive.getPool();
|
|
|
|
+ if (pool == null) {
|
|
|
|
+ throw new InvalidRequestException("No pool specified.");
|
|
|
|
+ }
|
|
|
|
+ if (pool.isEmpty()) {
|
|
|
|
+ throw new InvalidRequestException("Invalid empty pool name.");
|
|
|
|
+ }
|
|
|
|
+ return pool;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static String validatePath(PathBasedCacheDirective directive)
|
|
|
|
+ throws InvalidRequestException {
|
|
|
|
+ if (directive.getPath() == null) {
|
|
|
|
+ throw new InvalidRequestException("No path specified.");
|
|
|
|
+ }
|
|
|
|
+ String path = directive.getPath().toUri().getPath();
|
|
|
|
+ if (!DFSUtil.isValidName(path)) {
|
|
|
|
+ throw new InvalidRequestException("Invalid path '" + path + "'.");
|
|
|
|
+ }
|
|
|
|
+ return path;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static short validateReplication(PathBasedCacheDirective directive,
|
|
|
|
+ short defaultValue) throws InvalidRequestException {
|
|
|
|
+ short repl = (directive.getReplication() != null)
|
|
|
|
+ ? directive.getReplication() : defaultValue;
|
|
|
|
+ if (repl <= 0) {
|
|
|
|
+ throw new InvalidRequestException("Invalid replication factor " + repl
|
|
|
|
+ + " <= 0");
|
|
|
|
+ }
|
|
|
|
+ return repl;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get a PathBasedCacheEntry by ID, validating the ID and that the entry
|
|
|
|
+ * exists.
|
|
|
|
+ */
|
|
|
|
+ private PathBasedCacheEntry getById(long id) throws InvalidRequestException {
|
|
|
|
+ // Check for invalid IDs.
|
|
|
|
+ if (id <= 0) {
|
|
|
|
+ throw new InvalidRequestException("Invalid negative ID.");
|
|
|
|
+ }
|
|
|
|
+ // Find the entry.
|
|
|
|
+ PathBasedCacheEntry entry = entriesById.get(id);
|
|
|
|
+ if (entry == null) {
|
|
|
|
+ throw new InvalidRequestException("No directive with ID " + id
|
|
|
|
+ + " found.");
|
|
|
|
+ }
|
|
|
|
+ return entry;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get a CachePool by name, validating that it exists.
|
|
|
|
+ */
|
|
|
|
+ private CachePool getCachePool(String poolName)
|
|
|
|
+ throws InvalidRequestException {
|
|
|
|
+ CachePool pool = cachePools.get(poolName);
|
|
|
|
+ if (pool == null) {
|
|
|
|
+ throw new InvalidRequestException("Unknown pool " + poolName);
|
|
|
|
+ }
|
|
|
|
+ return pool;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // RPC handlers
|
|
|
|
+
|
|
private void addInternal(PathBasedCacheEntry entry) {
|
|
private void addInternal(PathBasedCacheEntry entry) {
|
|
entriesById.put(entry.getEntryId(), entry);
|
|
entriesById.put(entry.getEntryId(), entry);
|
|
String path = entry.getPath();
|
|
String path = entry.getPath();
|
|
@@ -272,34 +348,10 @@ public final class CacheManager {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
PathBasedCacheEntry entry;
|
|
PathBasedCacheEntry entry;
|
|
try {
|
|
try {
|
|
- if (directive.getPool() == null) {
|
|
|
|
- throw new IdNotFoundException("addDirective: no pool was specified.");
|
|
|
|
- }
|
|
|
|
- if (directive.getPool().isEmpty()) {
|
|
|
|
- throw new IdNotFoundException("addDirective: pool name was empty.");
|
|
|
|
- }
|
|
|
|
- CachePool pool = cachePools.get(directive.getPool());
|
|
|
|
- if (pool == null) {
|
|
|
|
- throw new IdNotFoundException("addDirective: no such pool as " +
|
|
|
|
- directive.getPool());
|
|
|
|
- }
|
|
|
|
- if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
|
|
|
|
- throw new AccessControlException("addDirective: write " +
|
|
|
|
- "permission denied for pool " + directive.getPool());
|
|
|
|
- }
|
|
|
|
- if (directive.getPath() == null) {
|
|
|
|
- throw new IOException("addDirective: no path was specified.");
|
|
|
|
- }
|
|
|
|
- String path = directive.getPath().toUri().getPath();
|
|
|
|
- if (!DFSUtil.isValidName(path)) {
|
|
|
|
- throw new IOException("addDirective: path '" + path + "' is invalid.");
|
|
|
|
- }
|
|
|
|
- short replication = directive.getReplication() == null ?
|
|
|
|
- (short)1 : directive.getReplication();
|
|
|
|
- if (replication <= 0) {
|
|
|
|
- throw new IOException("addDirective: replication " + replication +
|
|
|
|
- " is invalid.");
|
|
|
|
- }
|
|
|
|
|
|
+ CachePool pool = getCachePool(validatePoolName(directive));
|
|
|
|
+ checkWritePermission(pc, pool);
|
|
|
|
+ String path = validatePath(directive);
|
|
|
|
+ short replication = validateReplication(directive, (short)1);
|
|
long id;
|
|
long id;
|
|
if (directive.getId() != null) {
|
|
if (directive.getId() != null) {
|
|
// We are loading an entry from the edit log.
|
|
// We are loading an entry from the edit log.
|
|
@@ -312,10 +364,10 @@ public final class CacheManager {
|
|
entry = new PathBasedCacheEntry(id, path, replication, pool);
|
|
entry = new PathBasedCacheEntry(id, path, replication, pool);
|
|
addInternal(entry);
|
|
addInternal(entry);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("addDirective " + directive + ": failed.", e);
|
|
|
|
|
|
+ LOG.warn("addDirective of " + directive + " failed: ", e);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
- LOG.info("addDirective " + directive + ": succeeded.");
|
|
|
|
|
|
+ LOG.info("addDirective of " + directive + " successful.");
|
|
if (monitor != null) {
|
|
if (monitor != null) {
|
|
monitor.kick();
|
|
monitor.kick();
|
|
}
|
|
}
|
|
@@ -332,75 +384,43 @@ public final class CacheManager {
|
|
// Check for invalid IDs.
|
|
// Check for invalid IDs.
|
|
Long id = directive.getId();
|
|
Long id = directive.getId();
|
|
if (id == null) {
|
|
if (id == null) {
|
|
- throw new IdNotFoundException("modifyDirective: " +
|
|
|
|
- "no ID to modify was supplied.");
|
|
|
|
- }
|
|
|
|
- if (id <= 0) {
|
|
|
|
- throw new IdNotFoundException("modifyDirective " + id +
|
|
|
|
- ": invalid non-positive directive ID.");
|
|
|
|
- }
|
|
|
|
- // Find the entry.
|
|
|
|
- PathBasedCacheEntry prevEntry = entriesById.get(id);
|
|
|
|
- if (prevEntry == null) {
|
|
|
|
- throw new IdNotFoundException("modifyDirective " + id +
|
|
|
|
- ": id not found.");
|
|
|
|
- }
|
|
|
|
- if ((pc != null) &&
|
|
|
|
- (!pc.checkPermission(prevEntry.getPool(), FsAction.WRITE))) {
|
|
|
|
- throw new AccessControlException("modifyDirective " + id +
|
|
|
|
- ": permission denied for initial pool " + prevEntry.getPool());
|
|
|
|
|
|
+ throw new InvalidRequestException("Must supply an ID.");
|
|
}
|
|
}
|
|
|
|
+ PathBasedCacheEntry prevEntry = getById(id);
|
|
|
|
+ checkWritePermission(pc, prevEntry.getPool());
|
|
String path = prevEntry.getPath();
|
|
String path = prevEntry.getPath();
|
|
if (directive.getPath() != null) {
|
|
if (directive.getPath() != null) {
|
|
- path = directive.getPath().toUri().getPath();
|
|
|
|
- if (!DFSUtil.isValidName(path)) {
|
|
|
|
- throw new IOException("modifyDirective " + id + ": new path " +
|
|
|
|
- path + " is not valid.");
|
|
|
|
- }
|
|
|
|
|
|
+ path = validatePath(directive);
|
|
}
|
|
}
|
|
- short replication = (directive.getReplication() != null) ?
|
|
|
|
- directive.getReplication() : prevEntry.getReplication();
|
|
|
|
- if (replication <= 0) {
|
|
|
|
- throw new IOException("modifyDirective: replication " + replication +
|
|
|
|
- " is invalid.");
|
|
|
|
|
|
+ short replication = prevEntry.getReplication();
|
|
|
|
+ if (directive.getReplication() != null) {
|
|
|
|
+ replication = validateReplication(directive, replication);
|
|
}
|
|
}
|
|
CachePool pool = prevEntry.getPool();
|
|
CachePool pool = prevEntry.getPool();
|
|
if (directive.getPool() != null) {
|
|
if (directive.getPool() != null) {
|
|
- pool = cachePools.get(directive.getPool());
|
|
|
|
- if (pool == null) {
|
|
|
|
- throw new IdNotFoundException("modifyDirective " + id +
|
|
|
|
- ": pool " + directive.getPool() + " not found.");
|
|
|
|
- }
|
|
|
|
- if (directive.getPool().isEmpty()) {
|
|
|
|
- throw new IdNotFoundException("modifyDirective: pool name was " +
|
|
|
|
- "empty.");
|
|
|
|
- }
|
|
|
|
- if ((pc != null) &&
|
|
|
|
- (!pc.checkPermission(pool, FsAction.WRITE))) {
|
|
|
|
- throw new AccessControlException("modifyDirective " + id +
|
|
|
|
- ": permission denied for target pool " + pool);
|
|
|
|
- }
|
|
|
|
|
|
+ pool = getCachePool(validatePoolName(directive));
|
|
|
|
+ checkWritePermission(pc, pool);
|
|
}
|
|
}
|
|
removeInternal(prevEntry);
|
|
removeInternal(prevEntry);
|
|
PathBasedCacheEntry newEntry =
|
|
PathBasedCacheEntry newEntry =
|
|
new PathBasedCacheEntry(id, path, replication, pool);
|
|
new PathBasedCacheEntry(id, path, replication, pool);
|
|
addInternal(newEntry);
|
|
addInternal(newEntry);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("modifyDirective " + idString + ": failed.", e);
|
|
|
|
|
|
+ LOG.warn("modifyDirective of " + idString + " failed: ", e);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
- LOG.info("modifyDirective " + idString + ": successfully applied " +
|
|
|
|
- directive);
|
|
|
|
|
|
+ LOG.info("modifyDirective of " + idString + " successfully applied " +
|
|
|
|
+ directive + ".");
|
|
}
|
|
}
|
|
|
|
|
|
public void removeInternal(PathBasedCacheEntry existing)
|
|
public void removeInternal(PathBasedCacheEntry existing)
|
|
- throws IOException {
|
|
|
|
|
|
+ throws InvalidRequestException {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
// Remove the corresponding entry in entriesByPath.
|
|
// Remove the corresponding entry in entriesByPath.
|
|
String path = existing.getPath();
|
|
String path = existing.getPath();
|
|
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
|
|
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
|
|
if (entries == null || !entries.remove(existing)) {
|
|
if (entries == null || !entries.remove(existing)) {
|
|
- throw new IdNotFoundException("removeInternal: failed to locate entry " +
|
|
|
|
|
|
+ throw new InvalidRequestException("Failed to locate entry " +
|
|
existing.getEntryId() + " by path " + existing.getPath());
|
|
existing.getEntryId() + " by path " + existing.getPath());
|
|
}
|
|
}
|
|
if (entries.size() == 0) {
|
|
if (entries.size() == 0) {
|
|
@@ -413,32 +433,17 @@ public final class CacheManager {
|
|
throws IOException {
|
|
throws IOException {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
try {
|
|
try {
|
|
- // Check for invalid IDs.
|
|
|
|
- if (id <= 0) {
|
|
|
|
- throw new IdNotFoundException("removeDirective " + id + ": invalid " +
|
|
|
|
- "non-positive directive ID.");
|
|
|
|
- }
|
|
|
|
- // Find the entry.
|
|
|
|
- PathBasedCacheEntry existing = entriesById.get(id);
|
|
|
|
- if (existing == null) {
|
|
|
|
- throw new IdNotFoundException("removeDirective " + id +
|
|
|
|
- ": id not found.");
|
|
|
|
- }
|
|
|
|
- if ((pc != null) &&
|
|
|
|
- (!pc.checkPermission(existing.getPool(), FsAction.WRITE))) {
|
|
|
|
- throw new AccessControlException("removeDirective " + id +
|
|
|
|
- ": write permission denied on pool " +
|
|
|
|
- existing.getPool().getPoolName());
|
|
|
|
- }
|
|
|
|
|
|
+ PathBasedCacheEntry existing = getById(id);
|
|
|
|
+ checkWritePermission(pc, existing.getPool());
|
|
removeInternal(existing);
|
|
removeInternal(existing);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.warn("removeDirective " + id + " failed.", e);
|
|
|
|
|
|
+ LOG.warn("removeDirective of " + id + " failed: ", e);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
if (monitor != null) {
|
|
if (monitor != null) {
|
|
monitor.kick();
|
|
monitor.kick();
|
|
}
|
|
}
|
|
- LOG.info("removeDirective " + id + ": succeeded.");
|
|
|
|
|
|
+ LOG.info("removeDirective of " + id + " successful.");
|
|
}
|
|
}
|
|
|
|
|
|
public BatchedListEntries<PathBasedCacheDirective>
|
|
public BatchedListEntries<PathBasedCacheDirective>
|
|
@@ -449,18 +454,13 @@ public final class CacheManager {
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
String filterPath = null;
|
|
String filterPath = null;
|
|
if (filter.getId() != null) {
|
|
if (filter.getId() != null) {
|
|
- throw new IOException("we currently don't support filtering by ID");
|
|
|
|
|
|
+ throw new IOException("Filtering by ID is unsupported.");
|
|
}
|
|
}
|
|
if (filter.getPath() != null) {
|
|
if (filter.getPath() != null) {
|
|
- filterPath = filter.getPath().toUri().getPath();
|
|
|
|
- if (!DFSUtil.isValidName(filterPath)) {
|
|
|
|
- throw new IOException("listPathBasedCacheDirectives: invalid " +
|
|
|
|
- "path name '" + filterPath + "'");
|
|
|
|
- }
|
|
|
|
|
|
+ filterPath = validatePath(filter);
|
|
}
|
|
}
|
|
if (filter.getReplication() != null) {
|
|
if (filter.getReplication() != null) {
|
|
- throw new IOException("we currently don't support filtering " +
|
|
|
|
- "by replication");
|
|
|
|
|
|
+ throw new IOException("Filtering by replication is unsupported.");
|
|
}
|
|
}
|
|
ArrayList<PathBasedCacheDirective> replies =
|
|
ArrayList<PathBasedCacheDirective> replies =
|
|
new ArrayList<PathBasedCacheDirective>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
new ArrayList<PathBasedCacheDirective>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
@@ -481,8 +481,15 @@ public final class CacheManager {
|
|
!directive.getPath().toUri().getPath().equals(filterPath)) {
|
|
!directive.getPath().toUri().getPath().equals(filterPath)) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
- if ((pc == null) ||
|
|
|
|
- (pc.checkPermission(curEntry.getPool(), FsAction.READ))) {
|
|
|
|
|
|
+ boolean hasPermission = true;
|
|
|
|
+ if (pc != null) {
|
|
|
|
+ try {
|
|
|
|
+ pc.checkPermission(curEntry.getPool(), FsAction.READ);
|
|
|
|
+ } catch (AccessControlException e) {
|
|
|
|
+ hasPermission = false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (hasPermission) {
|
|
replies.add(cur.getValue().toDirective());
|
|
replies.add(cur.getValue().toDirective());
|
|
numReplies++;
|
|
numReplies++;
|
|
}
|
|
}
|
|
@@ -505,12 +512,13 @@ public final class CacheManager {
|
|
String poolName = info.getPoolName();
|
|
String poolName = info.getPoolName();
|
|
CachePool pool = cachePools.get(poolName);
|
|
CachePool pool = cachePools.get(poolName);
|
|
if (pool != null) {
|
|
if (pool != null) {
|
|
- throw new IOException("cache pool " + poolName + " already exists.");
|
|
|
|
|
|
+ throw new InvalidRequestException("Cache pool " + poolName
|
|
|
|
+ + " already exists.");
|
|
}
|
|
}
|
|
pool = CachePool.createFromInfoAndDefaults(info);
|
|
pool = CachePool.createFromInfoAndDefaults(info);
|
|
cachePools.put(pool.getPoolName(), pool);
|
|
cachePools.put(pool.getPoolName(), pool);
|
|
- LOG.info("created new cache pool " + pool);
|
|
|
|
- return pool.getInfo(true);
|
|
|
|
|
|
+ LOG.info("Created new cache pool " + pool);
|
|
|
|
+ return pool.getInfo(null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -528,7 +536,8 @@ public final class CacheManager {
|
|
String poolName = info.getPoolName();
|
|
String poolName = info.getPoolName();
|
|
CachePool pool = cachePools.get(poolName);
|
|
CachePool pool = cachePools.get(poolName);
|
|
if (pool == null) {
|
|
if (pool == null) {
|
|
- throw new IOException("cache pool " + poolName + " does not exist.");
|
|
|
|
|
|
+ throw new InvalidRequestException("Cache pool " + poolName
|
|
|
|
+ + " does not exist.");
|
|
}
|
|
}
|
|
StringBuilder bld = new StringBuilder();
|
|
StringBuilder bld = new StringBuilder();
|
|
String prefix = "";
|
|
String prefix = "";
|
|
@@ -575,7 +584,8 @@ public final class CacheManager {
|
|
CachePoolInfo.validateName(poolName);
|
|
CachePoolInfo.validateName(poolName);
|
|
CachePool pool = cachePools.remove(poolName);
|
|
CachePool pool = cachePools.remove(poolName);
|
|
if (pool == null) {
|
|
if (pool == null) {
|
|
- throw new IOException("can't remove non-existent cache pool " + poolName);
|
|
|
|
|
|
+ throw new InvalidRequestException(
|
|
|
|
+ "Cannot remove non-existent cache pool " + poolName);
|
|
}
|
|
}
|
|
|
|
|
|
// Remove entries using this pool
|
|
// Remove entries using this pool
|
|
@@ -607,11 +617,7 @@ public final class CacheManager {
|
|
if (numListed++ >= maxListCachePoolsResponses) {
|
|
if (numListed++ >= maxListCachePoolsResponses) {
|
|
return new BatchedListEntries<CachePoolInfo>(results, true);
|
|
return new BatchedListEntries<CachePoolInfo>(results, true);
|
|
}
|
|
}
|
|
- if (pc == null) {
|
|
|
|
- results.add(cur.getValue().getInfo(true));
|
|
|
|
- } else {
|
|
|
|
- results.add(cur.getValue().getInfo(pc));
|
|
|
|
- }
|
|
|
|
|
|
+ results.add(cur.getValue().getInfo(pc));
|
|
}
|
|
}
|
|
return new BatchedListEntries<CachePoolInfo>(results, false);
|
|
return new BatchedListEntries<CachePoolInfo>(results, false);
|
|
}
|
|
}
|
|
@@ -755,7 +761,7 @@ public final class CacheManager {
|
|
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
|
|
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
|
|
out.writeInt(cachePools.size());
|
|
out.writeInt(cachePools.size());
|
|
for (CachePool pool: cachePools.values()) {
|
|
for (CachePool pool: cachePools.values()) {
|
|
- pool.getInfo(true).writeTo(out);
|
|
|
|
|
|
+ pool.getInfo(null).writeTo(out);
|
|
counter.increment();
|
|
counter.increment();
|
|
}
|
|
}
|
|
prog.endStep(Phase.SAVING_CHECKPOINT, step);
|
|
prog.endStep(Phase.SAVING_CHECKPOINT, step);
|