|
@@ -35,14 +35,12 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
|
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.CachePool;
|
|
|
import org.apache.hadoop.hdfs.tools.TableListing.Justification;
|
|
|
-import org.apache.hadoop.ipc.RemoteException;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
|
|
@@ -120,6 +118,23 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
return listing;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Parses a time-to-live value from a string
|
|
|
+ * @return The ttl in milliseconds
|
|
|
+ * @throws IOException if it could not be parsed
|
|
|
+ */
|
|
|
+ private static Long parseTtlString(String maxTtlString) throws IOException {
|
|
|
+ Long maxTtl = null;
|
|
|
+ if (maxTtlString != null) {
|
|
|
+ if (maxTtlString.equalsIgnoreCase("never")) {
|
|
|
+ maxTtl = CachePoolInfo.RELATIVE_EXPIRY_NEVER;
|
|
|
+ } else {
|
|
|
+ maxTtl = DFSUtil.parseRelativeTime(maxTtlString);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return maxTtl;
|
|
|
+ }
|
|
|
+
|
|
|
interface Command {
|
|
|
String getName();
|
|
|
String getShortUsage();
|
|
@@ -154,7 +169,7 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
listing.addRow("<replication>", "The cache replication factor to use. " +
|
|
|
"Defaults to 1.");
|
|
|
listing.addRow("<time-to-live>", "How long the directive is " +
|
|
|
- "valid. Can be specified in minutes, hours, and days via e.g. " +
|
|
|
+ "valid. Can be specified in minutes, hours, and days, e.g. " +
|
|
|
"30m, 4h, 2d. Valid units are [smhd]." +
|
|
|
" If unspecified, the directive never expires.");
|
|
|
return getShortUsage() + "\n" +
|
|
@@ -309,7 +324,7 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
"added. You must have write permission on the cache pool "
|
|
|
+ "in order to move a directive into it. (optional)");
|
|
|
listing.addRow("<time-to-live>", "How long the directive is " +
|
|
|
- "valid. Can be specified in minutes, hours, and days via e.g. " +
|
|
|
+ "valid. Can be specified in minutes, hours, and days, e.g. " +
|
|
|
"30m, 4h, 2d. Valid units are [smhd]." +
|
|
|
" If unspecified, the directive never expires.");
|
|
|
return getShortUsage() + "\n" +
|
|
@@ -419,22 +434,27 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
System.err.println("Usage is " + getShortUsage());
|
|
|
return 1;
|
|
|
}
|
|
|
- DistributedFileSystem dfs = getDFS(conf);
|
|
|
- RemoteIterator<CacheDirectiveEntry> iter =
|
|
|
- dfs.listCacheDirectives(
|
|
|
- new CacheDirectiveInfo.Builder().
|
|
|
- setPath(new Path(path)).build());
|
|
|
int exitCode = 0;
|
|
|
- while (iter.hasNext()) {
|
|
|
- CacheDirectiveEntry entry = iter.next();
|
|
|
- try {
|
|
|
- dfs.removeCacheDirective(entry.getInfo().getId());
|
|
|
- System.out.println("Removed cache directive " +
|
|
|
- entry.getInfo().getId());
|
|
|
- } catch (IOException e) {
|
|
|
- System.err.println(prettifyException(e));
|
|
|
- exitCode = 2;
|
|
|
+ try {
|
|
|
+ DistributedFileSystem dfs = getDFS(conf);
|
|
|
+ RemoteIterator<CacheDirectiveEntry> iter =
|
|
|
+ dfs.listCacheDirectives(
|
|
|
+ new CacheDirectiveInfo.Builder().
|
|
|
+ setPath(new Path(path)).build());
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ CacheDirectiveEntry entry = iter.next();
|
|
|
+ try {
|
|
|
+ dfs.removeCacheDirective(entry.getInfo().getId());
|
|
|
+ System.out.println("Removed cache directive " +
|
|
|
+ entry.getInfo().getId());
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ exitCode = 2;
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ exitCode = 2;
|
|
|
}
|
|
|
if (exitCode == 0) {
|
|
|
System.out.println("Removed every cache directive with path " +
|
|
@@ -500,41 +520,46 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
addField("FILES_CACHED", Justification.RIGHT);
|
|
|
}
|
|
|
TableListing tableListing = tableBuilder.build();
|
|
|
-
|
|
|
- DistributedFileSystem dfs = getDFS(conf);
|
|
|
- RemoteIterator<CacheDirectiveEntry> iter =
|
|
|
- dfs.listCacheDirectives(builder.build());
|
|
|
- int numEntries = 0;
|
|
|
- while (iter.hasNext()) {
|
|
|
- CacheDirectiveEntry entry = iter.next();
|
|
|
- CacheDirectiveInfo directive = entry.getInfo();
|
|
|
- CacheDirectiveStats stats = entry.getStats();
|
|
|
- List<String> row = new LinkedList<String>();
|
|
|
- row.add("" + directive.getId());
|
|
|
- row.add(directive.getPool());
|
|
|
- row.add("" + directive.getReplication());
|
|
|
- String expiry;
|
|
|
- if (directive.getExpiration().getMillis() ==
|
|
|
- CacheDirectiveInfo.Expiration.EXPIRY_NEVER) {
|
|
|
- expiry = "never";
|
|
|
- } else {
|
|
|
- expiry = directive.getExpiration().toString();
|
|
|
+ try {
|
|
|
+ DistributedFileSystem dfs = getDFS(conf);
|
|
|
+ RemoteIterator<CacheDirectiveEntry> iter =
|
|
|
+ dfs.listCacheDirectives(builder.build());
|
|
|
+ int numEntries = 0;
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ CacheDirectiveEntry entry = iter.next();
|
|
|
+ CacheDirectiveInfo directive = entry.getInfo();
|
|
|
+ CacheDirectiveStats stats = entry.getStats();
|
|
|
+ List<String> row = new LinkedList<String>();
|
|
|
+ row.add("" + directive.getId());
|
|
|
+ row.add(directive.getPool());
|
|
|
+ row.add("" + directive.getReplication());
|
|
|
+ String expiry;
|
|
|
+ // This is effectively never, round for nice printing
|
|
|
+ if (directive.getExpiration().getMillis() >
|
|
|
+ Expiration.MAX_RELATIVE_EXPIRY_MS / 2) {
|
|
|
+ expiry = "never";
|
|
|
+ } else {
|
|
|
+ expiry = directive.getExpiration().toString();
|
|
|
+ }
|
|
|
+ row.add(expiry);
|
|
|
+ row.add(directive.getPath().toUri().getPath());
|
|
|
+ if (printStats) {
|
|
|
+ row.add("" + stats.getBytesNeeded());
|
|
|
+ row.add("" + stats.getBytesCached());
|
|
|
+ row.add("" + stats.getFilesNeeded());
|
|
|
+ row.add("" + stats.getFilesCached());
|
|
|
+ }
|
|
|
+ tableListing.addRow(row.toArray(new String[0]));
|
|
|
+ numEntries++;
|
|
|
}
|
|
|
- row.add(expiry);
|
|
|
- row.add(directive.getPath().toUri().getPath());
|
|
|
- if (printStats) {
|
|
|
- row.add("" + stats.getBytesNeeded());
|
|
|
- row.add("" + stats.getBytesCached());
|
|
|
- row.add("" + stats.getFilesNeeded());
|
|
|
- row.add("" + stats.getFilesCached());
|
|
|
+ System.out.print(String.format("Found %d entr%s\n",
|
|
|
+ numEntries, numEntries == 1 ? "y" : "ies"));
|
|
|
+ if (numEntries > 0) {
|
|
|
+ System.out.print(tableListing);
|
|
|
}
|
|
|
- tableListing.addRow(row.toArray(new String[0]));
|
|
|
- numEntries++;
|
|
|
- }
|
|
|
- System.out.print(String.format("Found %d entr%s\n",
|
|
|
- numEntries, numEntries == 1 ? "y" : "ies"));
|
|
|
- if (numEntries > 0) {
|
|
|
- System.out.print(tableListing);
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ return 2;
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
@@ -552,7 +577,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
@Override
|
|
|
public String getShortUsage() {
|
|
|
return "[" + NAME + " <name> [-owner <owner>] " +
|
|
|
- "[-group <group>] [-mode <mode>] [-limit <limit>]]\n";
|
|
|
+ "[-group <group>] [-mode <mode>] [-limit <limit>] " +
|
|
|
+ "[-maxttl <maxTtl>]\n";
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -571,7 +597,11 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
listing.addRow("<limit>", "The maximum number of bytes that can be " +
|
|
|
"cached by directives in this pool, in aggregate. By default, " +
|
|
|
"no limit is set.");
|
|
|
-
|
|
|
+ listing.addRow("<maxTtl>", "The maximum allowed time-to-live for " +
|
|
|
+ "directives being added to the pool. This can be specified in " +
|
|
|
+ "seconds, minutes, hours, and days, e.g. 120s, 30m, 4h, 2d. " +
|
|
|
+ "Valid units are [smhd]. By default, no maximum is set. " +
|
|
|
+ "This can also be manually specified by \"never\".");
|
|
|
return getShortUsage() + "\n" +
|
|
|
"Add a new cache pool.\n\n" +
|
|
|
listing.toString();
|
|
@@ -605,6 +635,18 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
long limit = Long.parseLong(limitString);
|
|
|
info.setLimit(limit);
|
|
|
}
|
|
|
+ String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
|
|
|
+ try {
|
|
|
+ Long maxTtl = parseTtlString(maxTtlString);
|
|
|
+ if (maxTtl != null) {
|
|
|
+ info.setMaxRelativeExpiryMs(maxTtl);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.err.println(
|
|
|
+ "Error while parsing maxTtl value: " + e.getMessage());
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
if (!args.isEmpty()) {
|
|
|
System.err.print("Can't understand arguments: " +
|
|
|
Joiner.on(" ").join(args) + "\n");
|
|
@@ -615,7 +657,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
try {
|
|
|
dfs.addCachePool(info);
|
|
|
} catch (IOException e) {
|
|
|
- throw new RemoteException(e.getClass().getName(), e.getMessage());
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ return 2;
|
|
|
}
|
|
|
System.out.println("Successfully added cache pool " + name + ".");
|
|
|
return 0;
|
|
@@ -632,7 +675,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
@Override
|
|
|
public String getShortUsage() {
|
|
|
return "[" + getName() + " <name> [-owner <owner>] " +
|
|
|
- "[-group <group>] [-mode <mode>] [-limit <limit>]]\n";
|
|
|
+ "[-group <group>] [-mode <mode>] [-limit <limit>] " +
|
|
|
+ "[-maxTtl <maxTtl>]]\n";
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -645,6 +689,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
listing.addRow("<mode>", "Unix-style permissions of the pool in octal.");
|
|
|
listing.addRow("<limit>", "Maximum number of bytes that can be cached " +
|
|
|
"by this pool.");
|
|
|
+ listing.addRow("<maxTtl>", "The maximum allowed time-to-live for " +
|
|
|
+ "directives being added to the pool.");
|
|
|
|
|
|
return getShortUsage() + "\n" +
|
|
|
WordUtils.wrap("Modifies the metadata of an existing cache pool. " +
|
|
@@ -663,6 +709,15 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
String limitString = StringUtils.popOptionWithArgument("-limit", args);
|
|
|
Long limit = (limitString == null) ?
|
|
|
null : Long.parseLong(limitString);
|
|
|
+ String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
|
|
|
+ Long maxTtl = null;
|
|
|
+ try {
|
|
|
+ maxTtl = parseTtlString(maxTtlString);
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.err.println(
|
|
|
+ "Error while parsing maxTtl value: " + e.getMessage());
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
String name = StringUtils.popFirstNonOption(args);
|
|
|
if (name == null) {
|
|
|
System.err.println("You must specify a name when creating a " +
|
|
@@ -693,6 +748,10 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
info.setLimit(limit);
|
|
|
changed = true;
|
|
|
}
|
|
|
+ if (maxTtl != null) {
|
|
|
+ info.setMaxRelativeExpiryMs(maxTtl);
|
|
|
+ changed = true;
|
|
|
+ }
|
|
|
if (!changed) {
|
|
|
System.err.println("You must specify at least one attribute to " +
|
|
|
"change in the cache pool.");
|
|
@@ -702,7 +761,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
try {
|
|
|
dfs.modifyCachePool(info);
|
|
|
} catch (IOException e) {
|
|
|
- throw new RemoteException(e.getClass().getName(), e.getMessage());
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ return 2;
|
|
|
}
|
|
|
System.out.print("Successfully modified cache pool " + name);
|
|
|
String prefix = " to have ";
|
|
@@ -722,6 +782,9 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
System.out.print(prefix + "limit " + limit);
|
|
|
prefix = " and ";
|
|
|
}
|
|
|
+ if (maxTtl != null) {
|
|
|
+ System.out.print(prefix + "max time-to-live " + maxTtlString);
|
|
|
+ }
|
|
|
System.out.print("\n");
|
|
|
return 0;
|
|
|
}
|
|
@@ -765,7 +828,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
try {
|
|
|
dfs.removeCachePool(name);
|
|
|
} catch (IOException e) {
|
|
|
- throw new RemoteException(e.getClass().getName(), e.getMessage());
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ return 2;
|
|
|
}
|
|
|
System.out.println("Successfully removed cache pool " + name + ".");
|
|
|
return 0;
|
|
@@ -813,7 +877,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
addField("OWNER", Justification.LEFT).
|
|
|
addField("GROUP", Justification.LEFT).
|
|
|
addField("MODE", Justification.LEFT).
|
|
|
- addField("LIMIT", Justification.RIGHT);
|
|
|
+ addField("LIMIT", Justification.RIGHT).
|
|
|
+ addField("MAXTTL", Justification.RIGHT);
|
|
|
if (printStats) {
|
|
|
builder.
|
|
|
addField("BYTES_NEEDED", Justification.RIGHT).
|
|
@@ -837,12 +902,23 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
row.add(info.getMode() != null ? info.getMode().toString() : null);
|
|
|
Long limit = info.getLimit();
|
|
|
String limitString;
|
|
|
- if (limit != null && limit.equals(CachePool.DEFAULT_LIMIT)) {
|
|
|
+ if (limit != null && limit.equals(CachePoolInfo.LIMIT_UNLIMITED)) {
|
|
|
limitString = "unlimited";
|
|
|
} else {
|
|
|
limitString = "" + limit;
|
|
|
}
|
|
|
row.add(limitString);
|
|
|
+ Long maxTtl = info.getMaxRelativeExpiryMs();
|
|
|
+ String maxTtlString = null;
|
|
|
+
|
|
|
+ if (maxTtl != null) {
|
|
|
+ if (maxTtl.longValue() == CachePoolInfo.RELATIVE_EXPIRY_NEVER) {
|
|
|
+ maxTtlString = "never";
|
|
|
+ } else {
|
|
|
+ maxTtlString = DFSUtil.durationToString(maxTtl);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ row.add(maxTtlString);
|
|
|
if (printStats) {
|
|
|
CachePoolStats stats = entry.getStats();
|
|
|
row.add(Long.toString(stats.getBytesNeeded()));
|
|
@@ -859,7 +935,8 @@ public class CacheAdmin extends Configured implements Tool {
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- throw new RemoteException(e.getClass().getName(), e.getMessage());
|
|
|
+ System.err.println(prettifyException(e));
|
|
|
+ return 2;
|
|
|
}
|
|
|
System.out.print(String.format("Found %d result%s.\n", numResults,
|
|
|
(numResults == 1 ? "" : "s")));
|