|
@@ -20,15 +20,21 @@ package org.apache.hadoop.mapred.gridmix;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.PrintStream;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
@@ -78,6 +84,12 @@ public class Gridmix extends Configured implements Tool {
|
|
|
*/
|
|
|
public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
|
|
|
|
|
|
+ /**
|
|
|
+ * Class used to resolve users in the trace to the list of target users
|
|
|
+ * on the cluster.
|
|
|
+ */
|
|
|
+ public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
|
|
|
+
|
|
|
// Submit data structures
|
|
|
private JobFactory factory;
|
|
|
private JobSubmitter submitter;
|
|
@@ -128,14 +140,16 @@ public class Gridmix extends Configured implements Tool {
|
|
|
* @param startFlag Semaphore for starting job trace pipeline
|
|
|
*/
|
|
|
private void startThreads(Configuration conf, String traceIn, Path ioPath,
|
|
|
- Path scratchDir, CountDownLatch startFlag) throws IOException {
|
|
|
+ Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
|
|
|
+ throws IOException {
|
|
|
monitor = createJobMonitor();
|
|
|
submitter = createJobSubmitter(monitor,
|
|
|
conf.getInt(GRIDMIX_SUB_THR,
|
|
|
Runtime.getRuntime().availableProcessors() + 1),
|
|
|
conf.getInt(GRIDMIX_QUE_DEP, 5),
|
|
|
new FilePool(conf, ioPath));
|
|
|
- factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
|
|
|
+ factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag,
|
|
|
+ userResolver);
|
|
|
monitor.start();
|
|
|
submitter.start();
|
|
|
factory.start();
|
|
@@ -151,45 +165,64 @@ public class Gridmix extends Configured implements Tool {
|
|
|
}
|
|
|
|
|
|
protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
|
|
|
- Path scratchDir, Configuration conf, CountDownLatch startFlag)
|
|
|
- throws IOException {
|
|
|
+ Path scratchDir, Configuration conf, CountDownLatch startFlag,
|
|
|
+ UserResolver userResolver) throws IOException {
|
|
|
return new JobFactory(submitter, createInputStream(traceIn), scratchDir,
|
|
|
- conf, startFlag);
|
|
|
+ conf, startFlag, userResolver);
|
|
|
}
|
|
|
|
|
|
- public int run(String[] argv) throws IOException, InterruptedException {
|
|
|
+ public int run(final String[] argv) throws IOException, InterruptedException {
|
|
|
if (argv.length < 2) {
|
|
|
printUsage(System.err);
|
|
|
return 1;
|
|
|
}
|
|
|
- long genbytes = 0;
|
|
|
+ final Configuration conf = getConf();
|
|
|
+ long genbytes = -1L;
|
|
|
String traceIn = null;
|
|
|
Path ioPath = null;
|
|
|
+ URI userRsrc = null;
|
|
|
+ final UserResolver userResolver = ReflectionUtils.newInstance(
|
|
|
+ conf.getClass(GRIDMIX_USR_RSV, SubmitterUserResolver.class,
|
|
|
+ UserResolver.class), conf);
|
|
|
try {
|
|
|
- int i = 0;
|
|
|
- genbytes = "-generate".equals(argv[i++])
|
|
|
- ? StringUtils.TraditionalBinaryPrefix.string2long(argv[i++])
|
|
|
- : --i;
|
|
|
- ioPath = new Path(argv[i++]);
|
|
|
- traceIn = argv[i++];
|
|
|
- if (i != argv.length) {
|
|
|
- printUsage(System.err);
|
|
|
- return 1;
|
|
|
+ for (int i = 0; i < argv.length - 2; ++i) {
|
|
|
+ if ("-generate".equals(argv[i])) {
|
|
|
+ genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
|
|
|
+ } else if ("-users".equals(argv[i])) {
|
|
|
+ userRsrc = new URI(argv[++i]);
|
|
|
+ } else {
|
|
|
+ printUsage(System.err);
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!userResolver.setTargetUsers(userRsrc, conf)) {
|
|
|
+ LOG.warn("Resource " + userRsrc + " ignored");
|
|
|
}
|
|
|
+ ioPath = new Path(argv[argv.length - 2]);
|
|
|
+ traceIn = argv[argv.length - 1];
|
|
|
} catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
printUsage(System.err);
|
|
|
return 1;
|
|
|
}
|
|
|
+ return start(conf, traceIn, ioPath, genbytes, userResolver);
|
|
|
+ }
|
|
|
+
|
|
|
+ int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
|
|
|
+ UserResolver userResolver) throws IOException, InterruptedException {
|
|
|
InputStream trace = null;
|
|
|
try {
|
|
|
- final Configuration conf = getConf();
|
|
|
Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
|
|
|
+ final FileSystem scratchFs = scratchDir.getFileSystem(conf);
|
|
|
+ scratchFs.mkdirs(scratchDir, new FsPermission((short) 0777));
|
|
|
+ scratchFs.setPermission(scratchDir, new FsPermission((short) 0777));
|
|
|
// add shutdown hook for SIGINT, etc.
|
|
|
Runtime.getRuntime().addShutdownHook(sdh);
|
|
|
CountDownLatch startFlag = new CountDownLatch(1);
|
|
|
try {
|
|
|
// Create, start job submission threads
|
|
|
- startThreads(conf, traceIn, ioPath, scratchDir, startFlag);
|
|
|
+ startThreads(conf, traceIn, ioPath, scratchDir, startFlag,
|
|
|
+ userResolver);
|
|
|
// Write input data if specified
|
|
|
if (genbytes > 0) {
|
|
|
writeInputData(genbytes, ioPath);
|
|
@@ -301,7 +334,7 @@ public class Gridmix extends Configured implements Tool {
|
|
|
|
|
|
protected void printUsage(PrintStream out) {
|
|
|
ToolRunner.printGenericCommandUsage(out);
|
|
|
- out.println("Usage: gridmix [-generate <MiB>] <iopath> <trace>");
|
|
|
+ out.println("Usage: gridmix [-generate <MiB>] [-users URI] <iopath> <trace>");
|
|
|
out.println(" e.g. gridmix -generate 100m foo -");
|
|
|
out.println("Configuration parameters:");
|
|
|
out.printf(" %-40s : Output directory\n", GRIDMIX_OUT_DIR);
|
|
@@ -309,6 +342,7 @@ public class Gridmix extends Configured implements Tool {
|
|
|
out.printf(" %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
|
|
|
out.printf(" %-40s : Key fraction of rec\n",
|
|
|
AvgRecordFactory.GRIDMIX_KEY_FRC);
|
|
|
+ out.printf(" %-40s : User resolution class\n", GRIDMIX_USR_RSV);
|
|
|
}
|
|
|
|
|
|
/**
|