소스 검색

MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1144403 13f79535-47bb-0310-9956-ffa450edef68
Amar Kamat 14 년 전
부모
커밋
787dcfb8cd

+ 3 - 1
mapreduce/CHANGES.txt

@@ -35,7 +35,9 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
-    MAPREDUCE-2563. [Gridmix]  Add High-Ram emulation system tests to 
+    MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk)
+
+    MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to 
     Gridmix. (Vinay Kumar Thota via amarrk)
 
     MAPREDUCE-2104. [Rumen] Add Cpu, Memory and Heap usages to 

+ 117 - 0
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ClusterSummarizer.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+/**
+ * Summarizes the Hadoop cluster used in this {@link Gridmix} run. 
+ * Statistics that are reported are
+ * <ul>
+ *   <li>Total number of active trackers in the cluster</li>
+ *   <li>Total number of blacklisted trackers in the cluster</li>
+ *   <li>Max map task capacity of the cluster</li>
+ *   <li>Max reduce task capacity of the cluster</li>
+ * </ul>
+ * 
+ * Apart from these statistics, {@link JobTracker} and {@link FileSystem} 
+ * addresses are also recorded in the summary.
+ */
+class ClusterSummarizer implements StatListener<ClusterStats> {
+  static final Log LOG = LogFactory.getLog(ClusterSummarizer.class);
+  
+  private int numBlacklistedTrackers;
+  private int numActiveTrackers;
+  private int maxMapTasks;
+  private int maxReduceTasks;
+  private String jobTrackerInfo = Summarizer.NA;
+  private String namenodeInfo = Summarizer.NA;
+  
+  @Override
+  @SuppressWarnings("deprecation")
+  public void update(ClusterStats item) {
+    try {
+      numBlacklistedTrackers = item.getStatus().getBlacklistedTrackers();
+      numActiveTrackers = item.getStatus().getTaskTrackers();
+      maxMapTasks = item.getStatus().getMaxMapTasks();
+      maxReduceTasks = item.getStatus().getMaxReduceTasks();
+    } catch (Exception e) {
+      long time = System.currentTimeMillis();
+      LOG.info("Error in processing cluster status at " 
+               + FastDateFormat.getInstance().format(time));
+    }
+  }
+  
+  /**
+   * Summarizes the cluster used for this {@link Gridmix} run.
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Cluster Summary:-");
+    builder.append("\nJobTracker: ").append(getJobTrackerInfo());
+    builder.append("\nFileSystem: ").append(getNamenodeInfo());
+    builder.append("\nNumber of blacklisted trackers: ")
+           .append(getNumBlacklistedTrackers());
+    builder.append("\nNumber of active trackers: ")
+           .append(getNumActiveTrackers());
+    builder.append("\nMax map task capacity: ")
+           .append(getMaxMapTasks());
+    builder.append("\nMax reduce task capacity: ").append(getMaxReduceTasks());
+    builder.append("\n\n");
+    return builder.toString();
+  }
+  
+  void start(Configuration conf) {
+    jobTrackerInfo = conf.get(JTConfig.JT_IPC_ADDRESS);
+    namenodeInfo = conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY);
+  }
+  
+  // Getters
+  protected int getNumBlacklistedTrackers() {
+    return numBlacklistedTrackers;
+  }
+  
+  protected int getNumActiveTrackers() {
+    return numActiveTrackers;
+  }
+  
+  protected int getMaxMapTasks() {
+    return maxMapTasks;
+  }
+  
+  protected int getMaxReduceTasks() {
+    return maxReduceTasks;
+  }
+  
+  protected String getJobTrackerInfo() {
+    return jobTrackerInfo;
+  }
+  
+  protected String getNamenodeInfo() {
+    return namenodeInfo;
+  }
+}

+ 5 - 2
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
 import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -307,8 +308,8 @@ class CompressionEmulationUtil {
    *   <li>Random text word size</li>
    * </ul>
    */
-  static void publishCompressedDataStatistics(Path inputDir, Configuration conf,
-                                              long uncompressedDataSize) 
+  static DataStatistics publishCompressedDataStatistics(Path inputDir, 
+                          Configuration conf, long uncompressedDataSize) 
   throws IOException {
     FileSystem fs = inputDir.getFileSystem(conf);
     CompressionCodecFactory compressionCodecs = 
@@ -356,6 +357,8 @@ class CompressionEmulationUtil {
       // publish the compression ratio
       LOG.info("Input Data Compression Ratio : " + ratio);
     }
+    
+    return new DataStatistics(compressedDataSize, numCompressedFiles, true);
   }
   
   /**

+ 307 - 0
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java

@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Summarizes a {@link Gridmix} run. Statistics that are reported are
+ * <ul>
+ *   <li>Total number of jobs in the input trace</li>
+ *   <li>Trace signature</li>
+ *   <li>Total number of jobs processed from the input trace</li>
+ *   <li>Total number of jobs submitted</li>
+ *   <li>Total number of successful and failed jobs</li>
+ *   <li>Total number of map/reduce tasks launched</li>
+ *   <li>Gridmix start & end time</li>
+ *   <li>Total time for the Gridmix run (data-generation and simulation)</li>
+ *   <li>Gridmix Configuration (i.e job-type, submission-type, resolver)</li>
+ * </ul>
+ */
+class ExecutionSummarizer implements StatListener<JobStats> {
+  static final Log LOG = LogFactory.getLog(ExecutionSummarizer.class);
+  private static final FastDateFormat UTIL = FastDateFormat.getInstance();
+  
+  private int numJobsInInputTrace;
+  private int totalSuccessfulJobs;
+  private int totalFailedJobs;
+  private int totalMapTasksLaunched;
+  private int totalReduceTasksLaunched;
+  private long totalSimulationTime;
+  private long totalRuntime;
+  private final String commandLineArgs;
+  private long startTime;
+  private long endTime;
+  private long simulationStartTime;
+  private String inputTraceLocation;
+  private String inputTraceSignature;
+  private String jobSubmissionPolicy;
+  private String resolver;
+  private DataStatistics dataStats;
+  private String expectedDataSize;
+  
+  /**
+   * Basic constructor initialized with the runtime arguments. 
+   */
+  ExecutionSummarizer(String[] args) {
+    startTime = System.currentTimeMillis();
+    // flatten the args string and store it
+    commandLineArgs = 
+      org.apache.commons.lang.StringUtils.join(args, ' '); 
+  }
+  
+  /**
+   * Default constructor. 
+   */
+  ExecutionSummarizer() {
+    startTime = System.currentTimeMillis();
+    commandLineArgs = Summarizer.NA; 
+  }
+  
+  void start(Configuration conf) {
+    simulationStartTime = System.currentTimeMillis();
+  }
+  
+  private void processJobState(JobStats stats) throws Exception {
+    Job job = stats.getJob();
+    if (job.isSuccessful()) {
+      ++totalSuccessfulJobs;
+    } else {
+      ++totalFailedJobs;
+    }
+  }
+  
+  private void processJobTasks(JobStats stats) throws Exception {
+    totalMapTasksLaunched += stats.getNoOfMaps();
+    Job job = stats.getJob();
+    totalReduceTasksLaunched += job.getNumReduceTasks();
+  }
+  
+  private void process(JobStats stats) {
+    try {
+      // process the job run state
+      processJobState(stats);
+      
+      // process the tasks information
+      processJobTasks(stats);
+    } catch (Exception e) {
+      LOG.info("Error in processing job " + stats.getJob().getJobID() + ".");
+    }
+  }
+  
+  @Override
+  public void update(JobStats item) {
+    // process only if the simulation has started
+    if (simulationStartTime > 0) {
+      process(item);
+      totalSimulationTime = 
+        System.currentTimeMillis() - getSimulationStartTime();
+    }
+  }
+  
+  // Generates a signature for the trace file based on
+  //   - filename
+  //   - modification time
+  //   - file length
+  //   - owner
+  protected static String getTraceSignature(String input) throws IOException {
+    Path inputPath = new Path(input);
+    FileSystem fs = inputPath.getFileSystem(new Configuration());
+    FileStatus status = fs.getFileStatus(inputPath);
+    Path qPath = fs.makeQualified(status.getPath());
+    String traceID = status.getModificationTime() + qPath.toString()
+                     + status.getOwner() + status.getLen();
+    return MD5Hash.digest(traceID).toString();
+  }
+  
+  @SuppressWarnings("unchecked")
+  void finalize(JobFactory factory, String inputPath, long dataSize, 
+                UserResolver userResolver, DataStatistics stats,
+                Configuration conf) 
+  throws IOException {
+    numJobsInInputTrace = factory.numJobsInTrace;
+    endTime = System.currentTimeMillis();
+    Path inputTracePath = new Path(inputPath);
+    FileSystem fs = inputTracePath.getFileSystem(conf);
+    inputTraceLocation = fs.makeQualified(inputTracePath).toString();
+    inputTraceSignature = getTraceSignature(inputTraceLocation);
+    jobSubmissionPolicy = Gridmix.getJobSubmissionPolicy(conf).name();
+    resolver = userResolver.getClass().getName();
+    if (dataSize > 0) {
+      expectedDataSize = StringUtils.humanReadableInt(dataSize);
+    } else {
+      expectedDataSize = Summarizer.NA;
+    }
+    dataStats = stats;
+    totalRuntime = System.currentTimeMillis() - getStartTime();
+  }
+  
+  /**
+   * Summarizes the current {@link Gridmix} run.
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Execution Summary:-");
+    builder.append("\nInput trace: ").append(getInputTraceLocation());
+    builder.append("\nInput trace signature: ")
+           .append(getInputTraceSignature());
+    builder.append("\nTotal number of jobs in trace: ")
+           .append(getNumJobsInTrace());
+    builder.append("\nExpected input data size: ")
+           .append(getExpectedDataSize());
+    builder.append("\nInput data statistics: ")
+           .append(getInputDataStatistics());
+    builder.append("\nTotal number of jobs processed: ")
+           .append(getNumSubmittedJobs());
+    builder.append("\nTotal number of successful jobs: ")
+           .append(getNumSuccessfulJobs());
+    builder.append("\nTotal number of failed jobs: ")
+           .append(getNumFailedJobs());
+    builder.append("\nTotal number of map tasks launched: ")
+           .append(getNumMapTasksLaunched());
+    builder.append("\nTotal number of reduce task launched: ")
+           .append(getNumReduceTasksLaunched());
+    builder.append("\nGridmix start time: ")
+           .append(UTIL.format(getStartTime()));
+    builder.append("\nGridmix end time: ").append(UTIL.format(getEndTime()));
+    builder.append("\nGridmix simulation start time: ")
+           .append(UTIL.format(getStartTime()));
+    builder.append("\nGridmix runtime: ")
+           .append(StringUtils.formatTime(getRuntime()));
+    builder.append("\nTime spent in initialization (data-gen etc): ")
+           .append(StringUtils.formatTime(getInitTime()));
+    builder.append("\nTime spent in simulation: ")
+           .append(StringUtils.formatTime(getSimulationTime()));
+    builder.append("\nGridmix configuration parameters: ")
+           .append(getCommandLineArgsString());
+    builder.append("\nGridmix job submission policy: ")
+           .append(getJobSubmissionPolicy());
+    builder.append("\nGridmix resolver: ").append(getUserResolver());
+    builder.append("\n\n");
+    return builder.toString();
+  }
+  
+  // Gets the stringified version of DataStatistics
+  static String stringifyDataStatistics(DataStatistics stats) {
+    if (stats != null) {
+      StringBuffer buffer = new StringBuffer();
+      String compressionStatus = stats.isDataCompressed() 
+                                 ? "Compressed" 
+                                 : "Uncompressed";
+      buffer.append(compressionStatus).append(" input data size: ");
+      buffer.append(StringUtils.humanReadableInt(stats.getDataSize()));
+      buffer.append(", ");
+      buffer.append("Number of files: ").append(stats.getNumFiles());
+
+      return buffer.toString();
+    } else {
+      return Summarizer.NA;
+    }
+  }
+  
+  // Getters
+  protected String getExpectedDataSize() {
+    return expectedDataSize;
+  }
+  
+  protected String getUserResolver() {
+    return resolver;
+  }
+  
+  protected String getInputDataStatistics() {
+    return stringifyDataStatistics(dataStats);
+  }
+  
+  protected String getInputTraceSignature() {
+    return inputTraceSignature;
+  }
+  
+  protected String getInputTraceLocation() {
+    return inputTraceLocation;
+  }
+  
+  protected int getNumJobsInTrace() {
+    return numJobsInInputTrace;
+  }
+  
+  protected int getNumSuccessfulJobs() {
+    return totalSuccessfulJobs;
+  }
+  
+  protected int getNumFailedJobs() {
+    return totalFailedJobs;
+  }
+  
+  protected int getNumSubmittedJobs() {
+    return totalSuccessfulJobs + totalFailedJobs;
+  }
+  
+  protected int getNumMapTasksLaunched() {
+    return totalMapTasksLaunched;
+  }
+  
+  protected int getNumReduceTasksLaunched() {
+    return totalReduceTasksLaunched;
+  }
+  
+  protected long getStartTime() {
+    return startTime;
+  }
+  
+  protected long getEndTime() {
+    return endTime;
+  }
+  
+  protected long getInitTime() {
+    return simulationStartTime - startTime;
+  }
+  
+  protected long getSimulationStartTime() {
+    return simulationStartTime;
+  }
+  
+  protected long getSimulationTime() {
+    return totalSimulationTime;
+  }
+  
+  protected long getRuntime() {
+    return totalRuntime;
+  }
+  
+  protected String getCommandLineArgsString() {
+    return commandLineArgs;
+  }
+  
+  protected String getJobSubmissionPolicy() {
+    return jobSubmissionPolicy;
+  }
+}

+ 36 - 6
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java

@@ -99,21 +99,49 @@ class GenerateData extends GridmixJob {
     FileOutputFormat.setOutputPath(job, outdir);
   }
 
+  /**
+   * Represents the input data characteristics.
+   */
+  static class DataStatistics {
+    private long dataSize;
+    private long numFiles;
+    private boolean isDataCompressed;
+    
+    DataStatistics(long dataSize, long numFiles, boolean isCompressed) {
+      this.dataSize = dataSize;
+      this.numFiles = numFiles;
+      this.isDataCompressed = isCompressed;
+    }
+    
+    long getDataSize() {
+      return dataSize;
+    }
+    
+    long getNumFiles() {
+      return numFiles;
+    }
+    
+    boolean isDataCompressed() {
+      return isDataCompressed;
+    }
+  }
+  
   /**
    * Publish the data statistics.
    */
-  static void publishDataStatistics(Path inputDir, long genBytes, 
-                                    Configuration conf) 
+  static DataStatistics publishDataStatistics(Path inputDir, long genBytes, 
+                                              Configuration conf) 
   throws IOException {
     if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
-      CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, 
-                                                               conf, genBytes);
+      return CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, 
+                                        conf, genBytes);
     } else {
-      publishPlainDataStatistics(conf, inputDir);
+      return publishPlainDataStatistics(conf, inputDir);
     }
   }
   
-  static void publishPlainDataStatistics(Configuration conf, Path inputDir) 
+  static DataStatistics publishPlainDataStatistics(Configuration conf, 
+                                                   Path inputDir) 
   throws IOException {
     FileSystem fs = inputDir.getFileSystem(conf);
 
@@ -134,6 +162,8 @@ class GenerateData extends GridmixJob {
     LOG.info("Total size of input data : " 
              + StringUtils.humanReadableInt(dataSize));
     LOG.info("Total number of input data files : " + fileCount);
+    
+    return new DataStatistics(dataSize, fileCount, false);
   }
   
   @Override

+ 36 - 4
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -113,10 +114,19 @@ public class Gridmix extends Configured implements Tool {
   private JobSubmitter submitter;
   private JobMonitor monitor;
   private Statistics statistics;
+  private Summarizer summarizer;
 
   // Shutdown hook
   private final Shutdown sdh = new Shutdown();
 
+  Gridmix(String[] args) {
+    summarizer = new Summarizer(args);
+  }
+  
+  Gridmix() {
+    summarizer = new Summarizer();
+  }
+  
   // Get the input data directory for Gridmix. Input directory is 
   // <io-path>/input
   static Path getGridmixInputDataPath(Path ioPath) {
@@ -205,6 +215,13 @@ public class Gridmix extends Configured implements Tool {
     return new ZombieJobProducer(new Path(traceIn), null, conf);
   }
 
+  // get the gridmix job submission policy
+  protected static GridmixJobSubmissionPolicy getJobSubmissionPolicy(
+                                                Configuration conf) {
+    return GridmixJobSubmissionPolicy.getPolicy(conf, 
+                                        GridmixJobSubmissionPolicy.STRESS);
+  }
+  
   /**
    * Create each component in the pipeline and start it.
    * @param conf Configuration data, no keys specific to this context
@@ -221,8 +238,7 @@ public class Gridmix extends Configured implements Tool {
       throws IOException {
     try {
       Path inputDir = getGridmixInputDataPath(ioPath);
-      GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
-        conf, GridmixJobSubmissionPolicy.STRESS);
+      GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf);
       LOG.info(" Submission policy is " + policy.name());
       statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
       monitor = createJobMonitor(statistics);
@@ -248,6 +264,10 @@ public class Gridmix extends Configured implements Tool {
         statistics.addClusterStatsObservers(factory);
       }
 
+      // add the gridmix run summarizer to the statistics
+      statistics.addJobStatsListeners(summarizer.getExecutionSummarizer());
+      statistics.addClusterStatsObservers(summarizer.getClusterSummarizer());
+      
       monitor.start();
       submitter.start();
     }catch(Exception e) {
@@ -293,6 +313,10 @@ public class Gridmix extends Configured implements Tool {
         return runJob(conf, argv);
       }
     });
+    
+    // print the run summary
+    System.out.print("\n\n");
+    System.out.println(summarizer.toString());
     return val; 
   }
 
@@ -373,6 +397,7 @@ public class Gridmix extends Configured implements Tool {
   int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
       UserResolver userResolver, boolean generate)
       throws IOException, InterruptedException {
+    DataStatistics stats = null;
     InputStream trace = null;
     ioPath = ioPath.makeQualified(ioPath.getFileSystem(conf));
 
@@ -395,7 +420,7 @@ public class Gridmix extends Configured implements Tool {
         }
         
         // publish the data statistics
-        GenerateData.publishDataStatistics(inputDir, genbytes, conf);
+        stats = GenerateData.publishDataStatistics(inputDir, genbytes, conf);
         
         // scan input dir contents
         submitter.refreshFilePool();
@@ -407,6 +432,9 @@ public class Gridmix extends Configured implements Tool {
           return exitCode;
         }
 
+        // start the summarizer
+        summarizer.start(conf);
+        
         factory.start();
         statistics.start();
       } catch (Throwable e) {
@@ -436,6 +464,10 @@ public class Gridmix extends Configured implements Tool {
 
       }
     } finally {
+      if (factory != null) {
+        summarizer.finalize(factory, traceIn, genbytes, userResolver, stats, 
+                            conf);
+      }
       IOUtils.cleanup(LOG, trace);
     }
     return 0;
@@ -567,7 +599,7 @@ public class Gridmix extends Configured implements Tool {
   public static void main(String[] argv) throws Exception {
     int res = -1;
     try {
-      res = ToolRunner.run(new Configuration(), new Gridmix(), argv);
+      res = ToolRunner.run(new Configuration(), new Gridmix(argv), argv);
     } finally {
       System.exit(res);
     }

+ 26 - 6
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
-import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
@@ -64,6 +63,7 @@ abstract class JobFactory<T> implements Gridmix.Component<Void>,StatListener<T>
   protected volatile IOException error = null;
   protected final JobStoryProducer jobProducer;
   protected final ReentrantLock lock = new ReentrantLock(true);
+  protected int numJobsInTrace = 0;
 
   /**
    * Creating a new instance does not start the thread.
@@ -168,13 +168,33 @@ abstract class JobFactory<T> implements Gridmix.Component<Void>,StatListener<T>
 
   protected abstract Thread createReaderThread() ; 
 
+  // gets the next job from the trace and does some bookkeeping for the same
+  private JobStory getNextJobFromTrace() throws IOException {
+    JobStory story = jobProducer.getNextJob();
+    if (story != null) {
+      ++numJobsInTrace;
+    }
+    return story;
+  }
+  
   protected JobStory getNextJobFiltered() throws IOException {
-    JobStory job;
-    do {
-      job = jobProducer.getNextJob();
-    } while (job != null &&
+    JobStory job = getNextJobFromTrace();
+    while (job != null &&
       (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
-        job.getSubmissionTime() < 0));
+        job.getSubmissionTime() < 0)) {
+      if (LOG.isDebugEnabled()) {
+        String reason = null;
+        if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) {
+          reason = "STATE (" + job.getOutcome().name() + ") ";
+        }
+        if (job.getSubmissionTime() < 0) {
+          reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")";
+        }
+        LOG.debug("Ignoring job " + job.getJobID() + " from the input trace."
+                  + " Reason: " + reason == null ? "N/A" : reason);
+      }
+      job = getNextJobFromTrace();
+    }
     return null == job ? null : new FilterJobStory(job) {
       @Override
       public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {

+ 2 - 2
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java

@@ -78,13 +78,13 @@ class JobMonitor implements Gridmix.Component<Job> {
   }
 
   /**
-   * Add a submission failed job , such tht it can be communicated
+   * Add a submission failed job , such that it can be communicated
    * back to serial.
    * TODO: Cleaner solution for this problem
    * @param job
    */
   public void submissionFailed(Job job) {
-    LOG.info(" Job submission failed notify if anyone is waiting " + job);
+    LOG.info("Job submission failed notification for job " + job.getJobID());
     this.statistics.add(job);
   }
 

+ 1 - 1
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java

@@ -126,7 +126,7 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
         monitor.submissionFailed(job.getJob());
       } catch(Exception e) {
         //Due to some exception job wasnt submitted.
-        LOG.info(" Job " + job.getJob() + " submission failed " , e);
+        LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e);
         monitor.submissionFailed(job.getJob());
       } finally {
         sem.release();

+ 75 - 0
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Summarizer.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+
+/**
+ * Summarizes various aspects of a {@link Gridmix} run.
+ */
+class Summarizer {
+  private ExecutionSummarizer executionSummarizer;
+  private ClusterSummarizer clusterSummarizer;
+  protected static final String NA = "N/A";
+  
+  Summarizer() {
+    this(new String[]{NA});
+  }
+  
+  Summarizer(String[] args) {
+    executionSummarizer = new ExecutionSummarizer(args);
+    clusterSummarizer = new ClusterSummarizer();
+  }
+  
+  ExecutionSummarizer getExecutionSummarizer() {
+    return executionSummarizer;
+  }
+  
+  ClusterSummarizer getClusterSummarizer() {
+    return clusterSummarizer;
+  }
+  
+  void start(Configuration conf) {
+    executionSummarizer.start(conf);
+    clusterSummarizer.start(conf);
+  }
+  
+  /**
+   * This finalizes the summarizer.
+   */
+  @SuppressWarnings("unchecked")
+  void finalize(JobFactory factory, String path, long size, 
+                UserResolver resolver, DataStatistics stats, Configuration conf)
+  throws IOException {
+    executionSummarizer.finalize(factory, path, size, resolver, stats, conf);
+  }
+  
+  /**
+   * Summarizes the current {@link Gridmix} run and the cluster used. 
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(executionSummarizer.toString());
+    builder.append(clusterSummarizer.toString());
+    return builder.toString();
+  }
+}

+ 371 - 0
mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java

@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.junit.Test;
+
+/**
+ * Test {@link ExecutionSummarizer} and {@link ClusterSummarizer}.
+ */
+public class TestGridmixSummary {
+  
+  /**
+   * Test {@link DataStatistics}.
+   */
+  @Test
+  public void testDataStatistics() throws Exception {
+    // test data-statistics getters with compression enabled
+    DataStatistics stats = new DataStatistics(10, 2, true);
+    assertEquals("Data size mismatch", 10, stats.getDataSize());
+    assertEquals("Num files mismatch", 2, stats.getNumFiles());
+    assertTrue("Compression configuration mismatch", stats.isDataCompressed());
+    
+    // test data-statistics getters with compression disabled
+    stats = new DataStatistics(100, 5, false);
+    assertEquals("Data size mismatch", 100, stats.getDataSize());
+    assertEquals("Num files mismatch", 5, stats.getNumFiles());
+    assertFalse("Compression configuration mismatch", stats.isDataCompressed());
+    
+    // test publish data stats
+    Configuration conf = new Configuration();
+    Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp"));
+    Path testDir = new Path(rootTempDir, "testDataStatistics");
+    FileSystem fs = testDir.getFileSystem(conf);
+    fs.delete(testDir, true);
+    Path testInputDir = new Path(testDir, "test");
+    fs.mkdirs(testInputDir);
+    
+    // test empty folder (compression = true)
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    Boolean failed = null;
+    try {
+      GenerateData.publishDataStatistics(testInputDir, 1024L, conf);
+      failed = false;
+    } catch (RuntimeException e) {
+      failed = true;
+    }
+    assertNotNull("Expected failure!", failed);
+    assertTrue("Compression data publishing error", failed);
+    
+    // test with empty folder (compression = off)
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+    stats = GenerateData.publishDataStatistics(testInputDir, 1024L, conf);
+    assertEquals("Data size mismatch", 0, stats.getDataSize());
+    assertEquals("Num files mismatch", 0, stats.getNumFiles());
+    assertFalse("Compression configuration mismatch", stats.isDataCompressed());
+    
+    // test with some plain input data (compression = off)
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+    Path inputDataFile = new Path(testInputDir, "test");
+    long size = 
+      UtilsForTests.createTmpFileDFS(fs, inputDataFile, 
+          FsPermission.createImmutable((short)777), "hi hello bye").size();
+    stats = GenerateData.publishDataStatistics(testInputDir, -1, conf);
+    assertEquals("Data size mismatch", size, stats.getDataSize());
+    assertEquals("Num files mismatch", 1, stats.getNumFiles());
+    assertFalse("Compression configuration mismatch", stats.isDataCompressed());
+    
+    // test with some plain input data (compression = on)
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    failed = null;
+    try {
+      GenerateData.publishDataStatistics(testInputDir, 1234L, conf);
+      failed = false;
+    } catch (RuntimeException e) {
+      failed = true;
+    }
+    assertNotNull("Expected failure!", failed);
+    assertTrue("Compression data publishing error", failed);
+    
+    // test with some compressed input data (compression = off)
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+    fs.delete(inputDataFile, false);
+    inputDataFile = new Path(testInputDir, "test.gz");
+    size = 
+      UtilsForTests.createTmpFileDFS(fs, inputDataFile, 
+          FsPermission.createImmutable((short)777), "hi hello").size();
+    stats =  GenerateData.publishDataStatistics(testInputDir, 1234L, conf);
+    assertEquals("Data size mismatch", size, stats.getDataSize());
+    assertEquals("Num files mismatch", 1, stats.getNumFiles());
+    assertFalse("Compression configuration mismatch", stats.isDataCompressed());
+    
+    // test with some compressed input data (compression = on)
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    stats = GenerateData.publishDataStatistics(testInputDir, 1234L, conf);
+    assertEquals("Data size mismatch", size, stats.getDataSize());
+    assertEquals("Num files mismatch", 1, stats.getNumFiles());
+    assertTrue("Compression configuration mismatch", stats.isDataCompressed());
+  }
+  
+  /**
+   * A fake {@link JobFactory}.
+   */
+  @SuppressWarnings("unchecked")
+  private static class FakeJobFactory extends JobFactory {
+    /**
+     * A fake {@link JobStoryProducer} for {@link FakeJobFactory}.
+     */
+    private static class FakeJobStoryProducer implements JobStoryProducer {
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public JobStory getNextJob() throws IOException {
+        return null;
+      }
+    }
+    
+    FakeJobFactory(Configuration conf) {
+      super(null, new FakeJobStoryProducer(), null, conf, null, null);
+    }
+    
+    @Override
+    public void update(Object item) {
+    }
+    
+    @Override
+    protected Thread createReaderThread() {
+      return null;
+    }
+  }
+  
+  /**
+   * Test {@link ExecutionSummarizer}.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testExecutionSummarizer() throws IOException {
+    Configuration conf = new Configuration();
+    
+    ExecutionSummarizer es = new ExecutionSummarizer();
+    assertEquals("ExecutionSummarizer init failed", 
+                 Summarizer.NA, es.getCommandLineArgsString());
+    
+    long startTime = System.currentTimeMillis();
+    // test configuration parameters
+    String[] initArgs = new String[] {"-Xmx20m", "-Dtest.args='test'"};
+    es = new ExecutionSummarizer(initArgs);
+    
+    assertEquals("ExecutionSummarizer init failed", 
+                 "-Xmx20m -Dtest.args='test'", 
+                 es.getCommandLineArgsString());
+    
+    // test start time
+    assertTrue("Start time mismatch", es.getStartTime() >= startTime);
+    assertTrue("Start time mismatch", 
+               es.getStartTime() <= System.currentTimeMillis());
+    
+    // test start() of ExecutionSummarizer
+    es.update(null);
+    assertEquals("ExecutionSummarizer init failed", 0, 
+                 es.getSimulationStartTime());
+    testExecutionSummarizer(0, 0, 0, 0, 0, 0, es);
+    
+    long simStartTime = System.currentTimeMillis();
+    es.start(null);
+    assertTrue("Simulation start time mismatch", 
+               es.getSimulationStartTime() >= simStartTime);
+    assertTrue("Simulation start time mismatch", 
+               es.getSimulationStartTime() <= System.currentTimeMillis());
+    
+    // test with job stats
+    JobStats stats = generateFakeJobStats(1, 10, true);
+    es.update(stats);
+    testExecutionSummarizer(1, 10, 0, 1, 1, 0, es);
+    
+    // test with failed job 
+    stats = generateFakeJobStats(5, 1, false);
+    es.update(stats);
+    testExecutionSummarizer(6, 11, 0, 2, 1, 1, es);
+    
+    // test finalize
+    //  define a fake job factory
+    JobFactory factory = new FakeJobFactory(conf);
+    
+    // fake the num jobs in trace
+    factory.numJobsInTrace = 3;
+    
+    Path rootTempDir = new Path(System.getProperty("test.build.data", "/tmp"));
+    Path testDir = new Path(rootTempDir, "testGridmixSummary");
+    Path testTraceFile = new Path(testDir, "test-trace.json");
+    FileSystem fs = FileSystem.getLocal(conf);
+    fs.create(testTraceFile).close();
+    
+    // finalize the summarizer
+    UserResolver resolver = new RoundRobinUserResolver();
+    DataStatistics dataStats = new DataStatistics(100, 2, true);
+    String policy = GridmixJobSubmissionPolicy.REPLAY.name();
+    conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
+    es.finalize(factory, testTraceFile.toString(), 1024L, resolver, dataStats, 
+                conf);
+    
+    // test num jobs in trace
+    assertEquals("Mismtach in num jobs in trace", 3, es.getNumJobsInTrace());
+    
+    // test trace signature
+    String tid = 
+      ExecutionSummarizer.getTraceSignature(testTraceFile.toString());
+    assertEquals("Mismatch in trace signature", 
+                 tid, es.getInputTraceSignature());
+    // test trace location
+    Path qPath = fs.makeQualified(testTraceFile);
+    assertEquals("Mismatch in trace signature", 
+                 qPath.toString(), es.getInputTraceLocation());
+    // test expected data size
+    assertEquals("Mismatch in expected data size", 
+                 "1.0k", es.getExpectedDataSize());
+    // test input data statistics
+    assertEquals("Mismatch in input data statistics", 
+                 ExecutionSummarizer.stringifyDataStatistics(dataStats), 
+                 es.getInputDataStatistics());
+    // test user resolver
+    assertEquals("Mismatch in user resolver", 
+                 resolver.getClass().getName(), es.getUserResolver());
+    // test policy
+    assertEquals("Mismatch in policy", policy, es.getJobSubmissionPolicy());
+    
+    // test data stringification using large data
+    es.finalize(factory, testTraceFile.toString(), 1024*1024*1024*10L, resolver,
+                dataStats, conf);
+    assertEquals("Mismatch in expected data size", 
+                 "10.0g", es.getExpectedDataSize());
+    
+    // test trace signature uniqueness
+    //  touch the trace file
+    fs.delete(testTraceFile, false);
+    //  sleep for 1 sec
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ie) {}
+    fs.create(testTraceFile).close();
+    es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats, 
+                conf);
+    // test missing expected data size
+    assertEquals("Mismatch in trace signature", 
+                 Summarizer.NA, es.getExpectedDataSize());
+    assertFalse("Mismatch in trace signature", 
+                tid.equals(es.getInputTraceSignature()));
+    // get the new identifier
+    tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString());
+    assertEquals("Mismatch in trace signature", 
+                 tid, es.getInputTraceSignature());
+    
+    testTraceFile = new Path(testDir, "test-trace2.json");
+    fs.create(testTraceFile).close();
+    es.finalize(factory, testTraceFile.toString(), 0L, resolver, dataStats, 
+                conf);
+    assertFalse("Mismatch in trace signature", 
+                tid.equals(es.getInputTraceSignature()));
+    // get the new identifier
+    tid = ExecutionSummarizer.getTraceSignature(testTraceFile.toString());
+    assertEquals("Mismatch in trace signature", 
+                 tid, es.getInputTraceSignature());
+    
+  }
+  
+  // test the ExecutionSummarizer
+  private static void testExecutionSummarizer(int numMaps, int numReds,
+      int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob, 
+      int numFailedJobs, ExecutionSummarizer es) {
+    assertEquals("ExecutionSummarizer test failed [num-maps]", 
+                 numMaps, es.getNumMapTasksLaunched());
+    assertEquals("ExecutionSummarizer test failed [num-reducers]", 
+                 numReds, es.getNumReduceTasksLaunched());
+    assertEquals("ExecutionSummarizer test failed [num-jobs-in-trace]", 
+                 totalJobsInTrace, es.getNumJobsInTrace());
+    assertEquals("ExecutionSummarizer test failed [num-submitted jobs]", 
+                 totalJobSubmitted, es.getNumSubmittedJobs());
+    assertEquals("ExecutionSummarizer test failed [num-successful-jobs]", 
+                 numSuccessfulJob, es.getNumSuccessfulJobs());
+    assertEquals("ExecutionSummarizer test failed [num-failed jobs]", 
+                 numFailedJobs, es.getNumFailedJobs());
+  }
+  
+  // generate fake job stats
+  @SuppressWarnings("deprecation")
+  private static JobStats generateFakeJobStats(final int numMaps, 
+      final int numReds, final boolean isSuccessful) 
+  throws IOException {
+    // A fake job 
+    Job fakeJob = new Job() {
+      @Override
+      public int getNumReduceTasks() {
+        return numReds;
+      };
+      
+      @Override
+      public boolean isSuccessful() throws IOException, InterruptedException {
+        return isSuccessful;
+      };
+    };
+    return new JobStats(numMaps, fakeJob);
+  }
+  
+  /**
+   * Test {@link ClusterSummarizer}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testClusterSummarizer() throws IOException {
+    ClusterSummarizer cs = new ClusterSummarizer();
+    Configuration conf = new Configuration();
+    
+    String jt = "test-jt:1234";
+    String nn = "test-nn:5678";
+    conf.set(JTConfig.JT_IPC_ADDRESS, jt);
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn);
+    cs.start(conf);
+    
+    assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo());
+    assertEquals("NN name mismatch", nn, cs.getNamenodeInfo());
+    
+    ClusterStats cstats = ClusterStats.getClusterStats();
+    conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local");
+    JobClient jc = new JobClient(conf);
+    cstats.setClusterMetric(jc.getClusterStatus());
+    
+    cs.update(cstats);
+    
+    // test
+    assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks());
+    assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks());
+    assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers());
+    assertEquals("Cluster summary test failed!", 0, 
+                 cs.getNumBlacklistedTrackers());
+  }
+}