Browse Source

MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1463806 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 12 years ago
parent
commit
75ef1b4845
16 changed files with 3565 additions and 48 deletions
  1. 1 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 14 0
      hadoop-tools/hadoop-gridmix/pom.xml
  3. 8 2
      hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
  4. 4 0
      hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
  5. 384 0
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/CommonJobTest.java
  6. 2 2
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
  7. 2 2
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
  8. 59 36
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java
  9. 430 0
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
  10. 989 0
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java
  11. 202 0
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
  12. 5 6
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
  13. 81 0
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestLoadJob.java
  14. 142 0
      hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
  15. 414 0
      hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount.json
  16. 828 0
      hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount2.json

+ 1 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -759,6 +759,7 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
     MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
     the value from the Task commitAttempt member (Robert Parker via jeagles)
     the value from the Task commitAttempt member (Robert Parker via jeagles)
 
 
+    MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
 
 
 Release 0.23.6 - UNRELEASED
 Release 0.23.6 - UNRELEASED
 
 

+ 14 - 0
hadoop-tools/hadoop-gridmix/pom.xml

@@ -91,6 +91,11 @@
       <type>test-jar</type>
       <type>test-jar</type>
       <scope>test</scope>
       <scope>test</scope>
     </dependency>
     </dependency>
+     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   </dependencies>
 
 
   <build>
   <build>
@@ -115,6 +120,15 @@
           </execution>
           </execution>
         </executions>
         </executions>
       </plugin>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        <excludes>
+        <exclude>src/test/resources/data/*</exclude>
+        </excludes>
+        </configuration>
+      </plugin>
       <plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
         <artifactId>maven-jar-plugin</artifactId>

+ 8 - 2
hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/Gridmix.java

@@ -36,7 +36,9 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
 import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
 import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
 import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.Tool;
@@ -693,7 +695,7 @@ public class Gridmix extends Configured implements Tool {
     try {
     try {
       res = ToolRunner.run(new Configuration(), new Gridmix(argv), argv);
       res = ToolRunner.run(new Configuration(), new Gridmix(argv), argv);
     } finally {
     } finally {
-      System.exit(res);
+      ExitUtil.terminate(res);
     }
     }
   }
   }
 
 
@@ -800,6 +802,10 @@ public class Gridmix extends Configured implements Tool {
      */
      */
     void abort();
     void abort();
   }
   }
-
+  // it is need for tests
+  protected Summarizer getSummarizer() {
+    return summarizer;
+  }
+  
 }
 }
 
 

+ 4 - 0
hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java

@@ -175,4 +175,8 @@ public class SerialJobFactory extends JobFactory<JobStats> {
     LOG.info(" Starting Serial submission ");
     LOG.info(" Starting Serial submission ");
     this.rThread.start();
     this.rThread.start();
   }
   }
+  // it is need for test 
+  void setDistCacheEmulator(DistributedCacheEmulator e) {
+    jobCreator.setDistCacheEmulator(e);
+  }
 }
 }

+ 384 - 0
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/CommonJobTest.java

@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.util.ToolRunner;
+
+public class CommonJobTest {
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+  protected static int NJOBS = 2;
+  protected static final long GENDATA = 1; // in megabytes
+  protected static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+  private static File workspace = new File("target" + File.separator
+          + TestGridmixSubmission.class.getName() + "-test");
+
+  static class DebugGridmix extends Gridmix {
+
+    private JobFactory<?> factory;
+    private TestMonitor monitor;
+
+    @Override
+    protected JobMonitor createJobMonitor(Statistics stats, Configuration conf)
+            throws IOException {
+      monitor = new TestMonitor(3, stats);
+      return monitor;
+    }
+
+    @Override
+    protected JobFactory<?> createJobFactory(JobSubmitter submitter,
+                                             String traceIn, Path scratchDir, Configuration conf,
+                                             CountDownLatch startFlag, UserResolver userResolver) throws IOException {
+      factory = DebugJobFactory.getFactory(submitter, scratchDir, NJOBS, conf,
+              startFlag, userResolver);
+      return factory;
+    }
+
+    public void checkMonitor() throws Exception {
+      monitor.verify(((DebugJobFactory.Debuggable) factory).getSubmitted());
+    }
+  }
+
+  static class TestMonitor extends JobMonitor {
+    private final BlockingQueue<Job> retiredJobs;
+    private final int expected;
+    static final long SLOPBYTES = 1024;
+
+    public TestMonitor(int expected, Statistics stats) {
+      super(3, TimeUnit.SECONDS, stats, 1);
+      this.expected = expected;
+      retiredJobs = new LinkedBlockingQueue<Job>();
+    }
+
+    @Override
+    protected void onSuccess(Job job) {
+      LOG.info(" Job Success " + job);
+      retiredJobs.add(job);
+    }
+
+    @Override
+    protected void onFailure(Job job) {
+      fail("Job failure: " + job);
+    }
+
+    public void verify(ArrayList<JobStory> submitted) throws Exception {
+      assertEquals("Bad job count", expected, retiredJobs.size());
+
+      final ArrayList<Job> succeeded = new ArrayList<Job>();
+      assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+      final HashMap<String, JobStory> sub = new HashMap<String, JobStory>();
+      for (JobStory spec : submitted) {
+        sub.put(spec.getJobID().toString(), spec);
+      }
+      for (Job job : succeeded) {
+        final String jobName = job.getJobName();
+        Configuration configuration = job.getConfiguration();
+        if (GenerateData.JOB_NAME.equals(jobName)) {
+          RemoteIterator<LocatedFileStatus> rit = GridmixTestUtils.dfs
+                  .listFiles(new Path("/"), true);
+          while (rit.hasNext()) {
+            System.out.println(rit.next().toString());
+          }
+          final Path in = new Path("foo").makeQualified(
+                  GridmixTestUtils.dfs.getUri(),
+                  GridmixTestUtils.dfs.getWorkingDirectory());
+          // data was compressed. All files = compressed test size+ logs= 1000000/2 + logs
+          final ContentSummary generated = GridmixTestUtils.dfs
+                  .getContentSummary(in);
+          assertEquals(550000, generated.getLength(), 10000);
+
+          Counter counter = job.getCounters()
+                  .getGroup("org.apache.hadoop.mapreduce.FileSystemCounter")
+                  .findCounter("HDFS_BYTES_WRITTEN");
+
+          assertEquals(generated.getLength(), counter.getValue());
+
+          continue;
+        } else if (GenerateDistCacheData.JOB_NAME.equals(jobName)) {
+          continue;
+        }
+
+        final String originalJobId = configuration.get(Gridmix.ORIGINAL_JOB_ID);
+        final JobStory spec = sub.get(originalJobId);
+        assertNotNull("No spec for " + jobName, spec);
+        assertNotNull("No counters for " + jobName, job.getCounters());
+        final String originalJobName = spec.getName();
+        System.out.println("originalJobName=" + originalJobName
+                + ";GridmixJobName=" + jobName + ";originalJobID=" + originalJobId);
+        assertTrue("Original job name is wrong.",
+                originalJobName.equals(configuration.get(Gridmix.ORIGINAL_JOB_NAME)));
+
+        // Gridmix job seqNum contains 6 digits
+        int seqNumLength = 6;
+        String jobSeqNum = new DecimalFormat("000000").format(configuration.getInt(
+                GridmixJob.GRIDMIX_JOB_SEQ, -1));
+        // Original job name is of the format MOCKJOB<6 digit sequence number>
+        // because MockJob jobNames are of this format.
+        assertTrue(originalJobName.substring(
+                originalJobName.length() - seqNumLength).equals(jobSeqNum));
+
+        assertTrue("Gridmix job name is not in the expected format.",
+                jobName.equals(GridmixJob.JOB_NAME_PREFIX + jobSeqNum));
+        final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path(
+                GridmixTestUtils.DEST, "" + Integer.valueOf(jobSeqNum)));
+        assertEquals("Wrong owner for " + jobName, spec.getUser(),
+                stat.getOwner());
+        final int nMaps = spec.getNumberMaps();
+        final int nReds = spec.getNumberReduces();
+
+        final JobClient client = new JobClient(
+                GridmixTestUtils.mrvl.getConfig());
+        final TaskReport[] mReports = client.getMapTaskReports(JobID
+                .downgrade(job.getJobID()));
+        assertEquals("Mismatched map count", nMaps, mReports.length);
+        check(TaskType.MAP, spec, mReports, 0, 0, SLOPBYTES, nReds);
+
+        final TaskReport[] rReports = client.getReduceTaskReports(JobID
+                .downgrade(job.getJobID()));
+        assertEquals("Mismatched reduce count", nReds, rReports.length);
+        check(TaskType.REDUCE, spec, rReports, nMaps * SLOPBYTES, 2 * nMaps, 0,
+                0);
+
+      }
+
+    }
+    // Verify if correct job queue is used
+    private void check(final TaskType type, JobStory spec,
+                       final TaskReport[] runTasks, long extraInputBytes,
+                       int extraInputRecords, long extraOutputBytes, int extraOutputRecords)
+            throws Exception {
+
+      long[] runInputRecords = new long[runTasks.length];
+      long[] runInputBytes = new long[runTasks.length];
+      long[] runOutputRecords = new long[runTasks.length];
+      long[] runOutputBytes = new long[runTasks.length];
+      long[] specInputRecords = new long[runTasks.length];
+      long[] specInputBytes = new long[runTasks.length];
+      long[] specOutputRecords = new long[runTasks.length];
+      long[] specOutputBytes = new long[runTasks.length];
+
+      for (int i = 0; i < runTasks.length; ++i) {
+        final TaskInfo specInfo;
+        final Counters counters = runTasks[i].getCounters();
+        switch (type) {
+          case MAP:
+            runInputBytes[i] = counters.findCounter("FileSystemCounters",
+                    "HDFS_BYTES_READ").getValue()
+                    - counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue();
+            runInputRecords[i] = (int) counters.findCounter(
+                    TaskCounter.MAP_INPUT_RECORDS).getValue();
+            runOutputBytes[i] = counters
+                    .findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
+            runOutputRecords[i] = (int) counters.findCounter(
+                    TaskCounter.MAP_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.MAP, i);
+            specInputRecords[i] = specInfo.getInputRecords();
+            specInputBytes[i] = specInfo.getInputBytes();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+
+            LOG.info(String.format(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+                    specInputBytes[i], specOutputBytes[i], specInputRecords[i],
+                    specOutputRecords[i]));
+            LOG.info(String.format(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
+                    runInputBytes[i], runOutputBytes[i], runInputRecords[i],
+                    runOutputRecords[i]));
+            break;
+          case REDUCE:
+            runInputBytes[i] = 0;
+            runInputRecords[i] = (int) counters.findCounter(
+                    TaskCounter.REDUCE_INPUT_RECORDS).getValue();
+            runOutputBytes[i] = counters.findCounter("FileSystemCounters",
+                    "HDFS_BYTES_WRITTEN").getValue();
+            runOutputRecords[i] = (int) counters.findCounter(
+                    TaskCounter.REDUCE_OUTPUT_RECORDS).getValue();
+
+            specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+            // There is no reliable counter for reduce input bytes. The
+            // variable-length encoding of intermediate records and other noise
+            // make this quantity difficult to estimate. The shuffle and spec
+            // input bytes are included in debug output for reference, but are
+            // not checked
+            specInputBytes[i] = 0;
+            specInputRecords[i] = specInfo.getInputRecords();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            LOG.info(String.format(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
+                    specInfo.getInputBytes(), specOutputBytes[i],
+                    specInputRecords[i], specOutputRecords[i]));
+            LOG.info(String
+                    .format(type + " RUN:  (%9d) -> %9d :: %5d -> %5d\n", counters
+                            .findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES).getValue(),
+                            runOutputBytes[i], runInputRecords[i], runOutputRecords[i]));
+            break;
+          default:
+            fail("Unexpected type: " + type);
+        }
+      }
+
+      // Check input bytes
+      Arrays.sort(specInputBytes);
+      Arrays.sort(runInputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue("Mismatched " + type + " input bytes " + specInputBytes[i]
+                + "/" + runInputBytes[i],
+                eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
+      }
+
+      // Check input records
+      Arrays.sort(specInputRecords);
+      Arrays.sort(runInputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue(
+                "Mismatched " + type + " input records " + specInputRecords[i]
+                        + "/" + runInputRecords[i],
+                eqPlusMinus(runInputRecords[i], specInputRecords[i],
+                        extraInputRecords));
+      }
+
+      // Check output bytes
+      Arrays.sort(specOutputBytes);
+      Arrays.sort(runOutputBytes);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue(
+                "Mismatched " + type + " output bytes " + specOutputBytes[i] + "/"
+                        + runOutputBytes[i],
+                eqPlusMinus(runOutputBytes[i], specOutputBytes[i], extraOutputBytes));
+      }
+
+      // Check output records
+      Arrays.sort(specOutputRecords);
+      Arrays.sort(runOutputRecords);
+      for (int i = 0; i < runTasks.length; ++i) {
+        assertTrue(
+                "Mismatched " + type + " output records " + specOutputRecords[i]
+                        + "/" + runOutputRecords[i],
+                eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
+                        extraOutputRecords));
+      }
+
+    }
+
+    private static boolean eqPlusMinus(long a, long b, long x) {
+      final long diff = Math.abs(a - b);
+      return diff <= x;
+    }
+
+  }
+
+  protected void doSubmission(String jobCreatorName, boolean defaultOutputPath)
+          throws Exception {
+    final Path in = new Path("foo").makeQualified(
+            GridmixTestUtils.dfs.getUri(),
+            GridmixTestUtils.dfs.getWorkingDirectory());
+    final Path out = GridmixTestUtils.DEST.makeQualified(
+            GridmixTestUtils.dfs.getUri(),
+            GridmixTestUtils.dfs.getWorkingDirectory());
+    final Path root = new Path(workspace.getAbsolutePath());
+    if (!workspace.exists()) {
+      assertTrue(workspace.mkdirs());
+    }
+    Configuration conf = null;
+
+    try {
+      ArrayList<String> argsList = new ArrayList<String>();
+
+      argsList.add("-D" + FilePool.GRIDMIX_MIN_FILE + "=0");
+      argsList.add("-D" + Gridmix.GRIDMIX_USR_RSV + "="
+              + EchoUserResolver.class.getName());
+      if (jobCreatorName != null) {
+        argsList.add("-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + jobCreatorName);
+      }
+
+      // Set the config property gridmix.output.directory only if
+      // defaultOutputPath is false. If defaultOutputPath is true, then
+      // let us allow gridmix to use the path foo/gridmix/ as output dir.
+      if (!defaultOutputPath) {
+        argsList.add("-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out);
+      }
+      argsList.add("-generate");
+      argsList.add(String.valueOf(GENDATA) + "m");
+      argsList.add(in.toString());
+      argsList.add("-"); // ignored by DebugGridmix
+
+      String[] argv = argsList.toArray(new String[argsList.size()]);
+
+      DebugGridmix client = new DebugGridmix();
+      conf = GridmixTestUtils.mrvl.getConfig();
+
+      CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+      conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
+
+      conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true);
+      UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+      conf.set(MRJobConfig.USER_NAME, ugi.getUserName());
+
+      // allow synthetic users to create home directories
+      GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 777));
+      GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 777));
+
+      int res = ToolRunner.run(conf, client, argv);
+      assertEquals("Client exited with nonzero status", 0, res);
+      client.checkMonitor();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      in.getFileSystem(conf).delete(in, true);
+      out.getFileSystem(conf).delete(out, true);
+      root.getFileSystem(conf).delete(root, true);
+    }
+  }
+}

+ 2 - 2
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java

@@ -29,13 +29,13 @@ import java.util.concurrent.CountDownLatch;
 /**
 /**
  * Component generating random job traces for testing on a single node.
  * Component generating random job traces for testing on a single node.
  */
  */
-class DebugJobFactory {
+public class DebugJobFactory {
 
 
   interface Debuggable {
   interface Debuggable {
     ArrayList<JobStory> getSubmitted();
     ArrayList<JobStory> getSubmitted();
   }
   }
 
 
-  public static JobFactory getFactory(
+  public static JobFactory<?> getFactory(
     JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
     JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
     CountDownLatch startFlag, UserResolver resolver) throws IOException {
     CountDownLatch startFlag, UserResolver resolver) throws IOException {
     GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
     GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(

+ 2 - 2
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java

@@ -216,7 +216,7 @@ public class DebugJobProducer implements JobStoryProducer {
      if (user == null) {
      if (user == null) {
        user = String.format("foobar%d", id);
        user = String.format("foobar%d", id);
      }
      }
-     GridmixTestUtils.createHomeAndStagingDirectory(user, (JobConf)conf);
+     GridmixTestUtils.createHomeAndStagingDirectory(user, conf);
      return user;
      return user;
    }
    }
 
 
@@ -300,7 +300,7 @@ public class DebugJobProducer implements JobStoryProducer {
 
 
     @Override
     @Override
     public String getQueueName() {
     public String getQueueName() {
-      String qName = "q" + ((id % 2) + 1);
+      String qName = "default";
       return qName;
       return qName;
     }
     }
     
     

+ 59 - 36
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java

@@ -4,55 +4,76 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.Groups;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
 /**
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  * <p/>
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
  * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
  */
 public class GridmixTestUtils {
 public class GridmixTestUtils {
   private static final Log LOG = LogFactory.getLog(GridmixTestUtils.class);
   private static final Log LOG = LogFactory.getLog(GridmixTestUtils.class);
   static final Path DEST = new Path("/gridmix");
   static final Path DEST = new Path("/gridmix");
   static FileSystem dfs = null;
   static FileSystem dfs = null;
   static MiniDFSCluster dfsCluster = null;
   static MiniDFSCluster dfsCluster = null;
-  static MiniMRCluster mrCluster = null;
+  static MiniMRClientCluster mrvl = null;
+  protected static final String GRIDMIX_USE_QUEUE_IN_TRACE = 
+      "gridmix.job-submission.use-queue-in-trace";
+  protected static final String GRIDMIX_DEFAULT_QUEUE = 
+      "gridmix.job-submission.default-queue";
 
 
-  public static void initCluster() throws IOException {
+  public static void initCluster(Class<?> caller) throws IOException {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
-    conf.set("mapred.queue.names", "default,q1,q2");
-    dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+//    conf.set("mapred.queue.names", "default,q1,q2");
+  conf.set("mapred.queue.names", "default");
+    conf.set("yarn.scheduler.capacity.root.queues", "default");
+    conf.set("yarn.scheduler.capacity.root.default.capacity", "100.0");
+    
+    
+    conf.setBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false);
+    conf.set(GRIDMIX_DEFAULT_QUEUE, "default");
+    
+
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
+        .build();// MiniDFSCluster(conf, 3, true, null);
     dfs = dfsCluster.getFileSystem();
     dfs = dfsCluster.getFileSystem();
     conf.set(JTConfig.JT_RETIREJOBS, "false");
     conf.set(JTConfig.JT_RETIREJOBS, "false");
-    mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null, 
-                                  new JobConf(conf));
+    mrvl = MiniMRClientClusterFactory.create(caller, 2, conf);
+    
+    conf = mrvl.getConfig();
+    String[] files = conf.getStrings(MRJobConfig.CACHE_FILES);
+    if (files != null) {
+      String[] timestamps = new String[files.length];
+      for (int i = 0; i < files.length; i++) {
+        timestamps[i] = Long.toString(System.currentTimeMillis());
+      }
+      conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timestamps);
+    }
+    
   }
   }
 
 
   public static void shutdownCluster() throws IOException {
   public static void shutdownCluster() throws IOException {
-    if (mrCluster != null) {
-      mrCluster.shutdown();
+    if (mrvl != null) {
+      mrvl.stop();
     }
     }
     if (dfsCluster != null) {
     if (dfsCluster != null) {
       dfsCluster.shutdown();
       dfsCluster.shutdown();
@@ -61,23 +82,25 @@ public class GridmixTestUtils {
 
 
   /**
   /**
    * Methods to generate the home directory for dummy users.
    * Methods to generate the home directory for dummy users.
-   *
+   * 
    * @param conf
    * @param conf
    */
    */
-  public static void createHomeAndStagingDirectory(String user, JobConf conf) {
+  public static void createHomeAndStagingDirectory(String user,
+      Configuration conf) {
     try {
     try {
       FileSystem fs = dfsCluster.getFileSystem();
       FileSystem fs = dfsCluster.getFileSystem();
       String path = "/user/" + user;
       String path = "/user/" + user;
       Path homeDirectory = new Path(path);
       Path homeDirectory = new Path(path);
-      if(fs.exists(homeDirectory)) {
-        fs.delete(homeDirectory,true);
-      }
-      LOG.info("Creating Home directory : " + homeDirectory);
-      fs.mkdirs(homeDirectory);
-      changePermission(user,homeDirectory, fs);
-      Path stagingArea = 
-        new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
-                          "/tmp/hadoop/mapred/staging"));
+      if (!fs.exists(homeDirectory)) {
+        LOG.info("Creating Home directory : " + homeDirectory);
+        fs.mkdirs(homeDirectory);
+        changePermission(user, homeDirectory, fs);
+
+      }    
+      changePermission(user, homeDirectory, fs);
+      Path stagingArea = new Path(
+          conf.get("mapreduce.jobtracker.staging.root.dir",
+              "/tmp/hadoop/mapred/staging"));
       LOG.info("Creating Staging root directory : " + stagingArea);
       LOG.info("Creating Staging root directory : " + stagingArea);
       fs.mkdirs(stagingArea);
       fs.mkdirs(stagingArea);
       fs.setPermission(stagingArea, new FsPermission((short) 0777));
       fs.setPermission(stagingArea, new FsPermission((short) 0777));
@@ -87,7 +110,7 @@ public class GridmixTestUtils {
   }
   }
 
 
   static void changePermission(String user, Path homeDirectory, FileSystem fs)
   static void changePermission(String user, Path homeDirectory, FileSystem fs)
-    throws IOException {
+      throws IOException {
     fs.setOwner(homeDirectory, user, "");
     fs.setOwner(homeDirectory, user, "");
   }
   }
 }
 }

+ 430 - 0
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java

@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Validate emulation of distributed cache load in gridmix simulated jobs.
+ * 
+ */
+public class TestDistCacheEmulation {
+
+  private DistributedCacheEmulator dce = null;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster(TestDistCacheEmulation.class);
+    File target=new File("target"+File.separator+TestDistCacheEmulation.class.getName());
+    if(!target.exists()){
+      assertTrue(target.mkdirs());
+    }
+    
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+  /**
+   * Validate the dist cache files generated by GenerateDistCacheData job.
+   * 
+   * @param jobConf
+   *          configuration of GenerateDistCacheData job.
+   * @param sortedFileSizes
+   *          array of sorted distributed cache file sizes
+   * @throws IOException
+   * @throws FileNotFoundException
+   */
+  private void validateDistCacheData(Configuration jobConf,
+      long[] sortedFileSizes) throws FileNotFoundException, IOException {
+    Path distCachePath = dce.getDistributedCacheDir();
+    String filesListFile = jobConf
+        .get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST);
+    FileSystem fs = FileSystem.get(jobConf);
+
+    // Validate the existence of Distributed Cache files list file directly
+    // under distributed cache directory
+    Path listFile = new Path(filesListFile);
+    assertTrue("Path of Distributed Cache files list file is wrong.",
+        distCachePath.equals(listFile.getParent().makeQualified(fs.getUri(), fs.getWorkingDirectory())));
+
+    // Delete the dist cache files list file
+    assertTrue(
+        "Failed to delete distributed Cache files list file " + listFile,
+        fs.delete(listFile,true));
+
+    List<Long> fileSizes = new ArrayList<Long>();
+    for (long size : sortedFileSizes) {
+      fileSizes.add(size);
+    }
+    // validate dist cache files after deleting the 'files list file'
+    validateDistCacheFiles(fileSizes, distCachePath);
+  }
+
+  /**
+   * Validate private/public distributed cache files.
+   * 
+   * @param filesSizesExpected
+   *          list of sizes of expected dist cache files
+   * @param distCacheDir
+   *          the distributed cache dir to be validated
+   * @throws IOException
+   * @throws FileNotFoundException
+   */
+  private void validateDistCacheFiles(List<Long> filesSizesExpected, Path distCacheDir)
+      throws FileNotFoundException, IOException {
+    // RemoteIterator<LocatedFileStatus> iter =
+    FileStatus[] statuses = GridmixTestUtils.dfs.listStatus(distCacheDir);
+    int numFiles = filesSizesExpected.size();
+    assertEquals("Number of files under distributed cache dir is wrong.",
+        numFiles, statuses.length);
+    for (int i = 0; i < numFiles; i++) {
+      FileStatus stat = statuses[i];
+      assertTrue("File size of distributed cache file "
+          + stat.getPath().toUri().getPath() + " is wrong.",
+          filesSizesExpected.remove(stat.getLen()));
+
+      FsPermission perm = stat.getPermission();
+      assertEquals("Wrong permissions for distributed cache file "
+          + stat.getPath().toUri().getPath(), new FsPermission((short) 0644),
+          perm);
+    }
+  }
+
+  /**
+   * Configures 5 HDFS-based dist cache files and 1 local-FS-based dist cache
+   * file in the given Configuration object <code>conf</code>.
+   * 
+   * @param conf
+   *          configuration where dist cache config properties are to be set
+   * @return array of sorted HDFS-based distributed cache file sizes
+   * @throws IOException
+   */
+  private long[] configureDummyDistCacheFiles(Configuration conf)
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    conf.set("user.name", user);
+    
+    // Set some dummy dist cache files in gridmix configuration so that they go
+    // into the configuration of JobStory objects.
+    String[] distCacheFiles = { "hdfs:///tmp/file1.txt",
+        "/tmp/" + user + "/.staging/job_1/file2.txt",
+        "hdfs:///user/user1/file3.txt", "/home/user2/file4.txt",
+        "subdir1/file5.txt", "subdir2/file6.gz" };
+
+    String[] fileSizes = { "400", "2500", "700", "1200", "1500", "500" };
+
+    String[] visibilities = { "true", "false", "false", "true", "true", "false" };
+    String[] timeStamps = { "1234", "2345", "34567", "5434", "125", "134" };
+
+    // DistributedCache.setCacheFiles(fileCaches, conf);
+    conf.setStrings(MRJobConfig.CACHE_FILES, distCacheFiles);
+    conf.setStrings(MRJobConfig.CACHE_FILES_SIZES, fileSizes);
+    conf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, visibilities);
+    conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timeStamps);
+
+    // local FS based dist cache file whose path contains <user>/.staging is
+    // not created on HDFS. So file size 2500 is not added to sortedFileSizes.
+    long[] sortedFileSizes = new long[] { 1500, 1200, 700, 500, 400 };
+    return sortedFileSizes;
+  }
+
+  /**
+   * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and and
+   * returns the jobConf. Fills the array <code>sortedFileSizes</code> that can
+   * be used for validation. Validation of exit code from
+   * setupGenerateDistCacheData() is done.
+   * 
+   * @param generate
+   *          true if -generate option is specified
+   * @param sortedFileSizes
+   *          sorted HDFS-based distributed cache file sizes
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private Configuration runSetupGenerateDistCacheData(boolean generate,
+      long[] sortedFileSizes) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    long[] fileSizes = configureDummyDistCacheFiles(conf);
+    System.arraycopy(fileSizes, 0, sortedFileSizes, 0, fileSizes.length);
+
+    // Job stories of all 3 jobs will have same dist cache files in their
+    // configurations
+    final int numJobs = 3;
+    DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf);
+
+    Configuration jobConf = GridmixTestUtils.mrvl.getConfig();
+    Path ioPath = new Path("testSetupGenerateDistCacheData")
+        .makeQualified(GridmixTestUtils.dfs.getUri(),GridmixTestUtils.dfs.getWorkingDirectory());
+    FileSystem fs = FileSystem.get(jobConf);
+    if (fs.exists(ioPath)) {
+      fs.delete(ioPath, true);
+    }
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short) 0777));
+
+    dce = createDistributedCacheEmulator(jobConf, ioPath, generate);
+    int exitCode = dce.setupGenerateDistCacheData(jobProducer);
+    int expectedExitCode = generate ? 0
+        : Gridmix.MISSING_DIST_CACHE_FILES_ERROR;
+    assertEquals("setupGenerateDistCacheData failed.", expectedExitCode,
+        exitCode);
+
+    // reset back
+    resetDistCacheConfigProperties(jobConf);
+    return jobConf;
+  }
+
+  /**
+   * Reset the config properties related to Distributed Cache in the given job
+   * configuration <code>jobConf</code>.
+   * 
+   * @param jobConf
+   *          job configuration
+   */
+  private void resetDistCacheConfigProperties(Configuration jobConf) {
+    // reset current/latest property names
+    jobConf.setStrings(MRJobConfig.CACHE_FILES, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "");
+    jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, "");
+    // reset old property names
+    jobConf.setStrings("mapred.cache.files", "");
+    jobConf.setStrings("mapred.cache.files.filesizes", "");
+    jobConf.setStrings("mapred.cache.files.visibilities", "");
+    jobConf.setStrings("mapred.cache.files.timestamps", "");
+  }
+
+  /**
+   * Validate GenerateDistCacheData job if it creates dist cache files properly.
+   * 
+   * @throws Exception
+   */
+  @Test (timeout=200000)
+  public void testGenerateDistCacheData() throws Exception {
+    long[] sortedFileSizes = new long[5];
+    Configuration jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes);
+    GridmixJob gridmixJob = new GenerateDistCacheData(jobConf);
+    Job job = gridmixJob.call();
+    assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.",
+        0, job.getNumReduceTasks());
+    assertTrue("GenerateDistCacheData job failed.",
+        job.waitForCompletion(false));
+    validateDistCacheData(jobConf, sortedFileSizes);
+  }
+
+  /**
+   * Validate setupGenerateDistCacheData by validating <li>permissions of the
+   * distributed cache directories and <li>content of the generated sequence
+   * file. This includes validation of dist cache file paths and their file
+   * sizes.
+   */
+  private void validateSetupGenDC(Configuration jobConf, long[] sortedFileSizes)
+      throws IOException, InterruptedException {
+    // build things needed for validation
+    long sumOfFileSizes = 0;
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      sumOfFileSizes += sortedFileSizes[i];
+    }
+
+    FileSystem fs = FileSystem.get(jobConf);
+    assertEquals("Number of distributed cache files to be generated is wrong.",
+        sortedFileSizes.length,
+        jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
+    assertEquals("Total size of dist cache files to be generated is wrong.",
+        sumOfFileSizes,
+        jobConf.getLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+    Path filesListFile = new Path(
+        jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
+    FileStatus stat = fs.getFileStatus(filesListFile);
+    assertEquals("Wrong permissions of dist Cache files list file "
+        + filesListFile, new FsPermission((short) 0644), stat.getPermission());
+
+    InputSplit split = new FileSplit(filesListFile, 0, stat.getLen(),
+        (String[]) null);
+    TaskAttemptContext taskContext = MapReduceTestUtil
+        .createDummyMapTaskAttemptContext(jobConf);
+    RecordReader<LongWritable, BytesWritable> reader = new GenerateDistCacheData.GenDCDataFormat()
+        .createRecordReader(split, taskContext);
+    MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable> mapContext = new MapContextImpl<LongWritable, BytesWritable, NullWritable, BytesWritable>(
+        jobConf, taskContext.getTaskAttemptID(), reader, null, null,
+        MapReduceTestUtil.createDummyReporter(), split);
+    reader.initialize(split, mapContext);
+
+    // start validating setupGenerateDistCacheData
+    doValidateSetupGenDC(reader, fs, sortedFileSizes);
+  }
+
+  /**
+   * Validate setupGenerateDistCacheData by validating <li>permissions of the
+   * distributed cache directory and <li>content of the generated sequence file.
+   * This includes validation of dist cache file paths and their file sizes.
+   */
+  private void doValidateSetupGenDC(
+      RecordReader<LongWritable, BytesWritable> reader, FileSystem fs,
+      long[] sortedFileSizes) throws IOException, InterruptedException {
+
+    // Validate permissions of dist cache directory
+    Path distCacheDir = dce.getDistributedCacheDir();
+    assertEquals(
+        "Wrong permissions for distributed cache dir " + distCacheDir,
+        fs.getFileStatus(distCacheDir).getPermission().getOtherAction()
+            .and(FsAction.EXECUTE), FsAction.EXECUTE);
+
+    // Validate the content of the sequence file generated by
+    // dce.setupGenerateDistCacheData().
+    LongWritable key = new LongWritable();
+    BytesWritable val = new BytesWritable();
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      assertTrue("Number of files written to the sequence file by "
+          + "setupGenerateDistCacheData is less than the expected.",
+          reader.nextKeyValue());
+      key = reader.getCurrentKey();
+      val = reader.getCurrentValue();
+      long fileSize = key.get();
+      String file = new String(val.getBytes(), 0, val.getLength());
+
+      // Dist Cache files should be sorted based on file size.
+      assertEquals("Dist cache file size is wrong.", sortedFileSizes[i],
+          fileSize);
+
+      // Validate dist cache file path.
+
+      // parent dir of dist cache file
+      Path parent = new Path(file).getParent().makeQualified(fs.getUri(),fs.getWorkingDirectory());
+      // should exist in dist cache dir
+      assertTrue("Public dist cache file path is wrong.",
+          distCacheDir.equals(parent));
+    }
+  }
+
+  /**
+   * Test if DistributedCacheEmulator's setup of GenerateDistCacheData is
+   * working as expected.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test  (timeout=20000)
+  public void testSetupGenerateDistCacheData() throws IOException,
+      InterruptedException {
+    long[] sortedFileSizes = new long[5];
+    Configuration jobConf = runSetupGenerateDistCacheData(true, sortedFileSizes);
+    validateSetupGenDC(jobConf, sortedFileSizes);
+
+    // Verify if correct exit code is seen when -generate option is missing and
+    // distributed cache files are missing in the expected path.
+    runSetupGenerateDistCacheData(false, sortedFileSizes);
+  }
+
+  /**
+   * Create DistributedCacheEmulator object and do the initialization by calling
+   * init() on it with dummy trace. Also configure the pseudo local FS.
+   */
+  private DistributedCacheEmulator createDistributedCacheEmulator(
+      Configuration conf, Path ioPath, boolean generate) throws IOException {
+    DistributedCacheEmulator dce = new DistributedCacheEmulator(conf, ioPath);
+    JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
+    jobCreator.setDistCacheEmulator(dce);
+    dce.init("dummytrace", jobCreator, generate);
+    return dce;
+  }
+
+  /**
+   * Test the configuration property for disabling/enabling emulation of
+   * distributed cache load.
+   */
+  @Test  (timeout=2000)
+  public void testDistCacheEmulationConfigurability() throws IOException {
+    Configuration jobConf = GridmixTestUtils.mrvl.getConfig();
+    Path ioPath = new Path("testDistCacheEmulationConfigurability")
+        .makeQualified(GridmixTestUtils.dfs.getUri(),GridmixTestUtils.dfs.getWorkingDirectory());
+    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short) 0777));
+
+    // default config
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertTrue("Default configuration of "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+    // config property set to false
+    jobConf.setBoolean(
+        DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false);
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertFalse("Disabling of emulation of distributed cache load by setting "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " to false is not working.", dce.shouldEmulateDistCacheLoad());
+  }
+/** 
+ * test method configureDistCacheFiles
+ * 
+ */
+  @Test  (timeout=2000)
+  public void testDistCacheEmulator() throws Exception {
+
+    Configuration conf = new Configuration();
+    configureDummyDistCacheFiles(conf);
+    File ws = new File("target" + File.separator + this.getClass().getName());
+    Path ioPath = new Path(ws.getAbsolutePath());
+
+    DistributedCacheEmulator dce = new DistributedCacheEmulator(conf, ioPath);
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+    File fin=new File("src"+File.separator+"test"+File.separator+"resources"+File.separator+"data"+File.separator+"wordcount.json");
+    dce.init(fin.getAbsolutePath(), JobCreator.LOADJOB, true);
+    dce.configureDistCacheFiles(conf, jobConf);
+    
+    String[] caches=conf.getStrings(MRJobConfig.CACHE_FILES);
+    String[] tmpfiles=conf.getStrings("tmpfiles");
+    // this method should fill caches AND tmpfiles  from MRJobConfig.CACHE_FILES property 
+    assertEquals(6, ((caches==null?0:caches.length)+(tmpfiles==null?0:tmpfiles.length)));
+  }
+}

+ 989 - 0
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridMixClasses.java

@@ -0,0 +1,989 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.CustomOutputCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.gridmix.GridmixKey.Spec;
+import org.apache.hadoop.mapred.gridmix.SleepJob.SleepReducer;
+import org.apache.hadoop.mapred.gridmix.SleepJob.SleepSplit;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.counters.GenericCounter;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+import org.apache.hadoop.util.Progress;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+import static org.junit.Assert.*;
+
+public class TestGridMixClasses {
+  private static final Log LOG = LogFactory.getLog(TestGridMixClasses.class);
+
+  /*
+   * simple test LoadSplit (getters,copy, write, read...)
+   */
+  @Test (timeout=1000)
+  public void testLoadSplit() throws Exception {
+
+    LoadSplit test = getLoadSplit();
+
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(data);
+    test.write(out);
+    LoadSplit copy = new LoadSplit();
+    copy.readFields(new DataInputStream(new ByteArrayInputStream(data
+            .toByteArray())));
+
+    // data should be the same
+    assertEquals(test.getId(), copy.getId());
+    assertEquals(test.getMapCount(), copy.getMapCount());
+    assertEquals(test.getInputRecords(), copy.getInputRecords());
+
+    assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
+    assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
+    assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
+    assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
+    assertEquals(test.getMapResourceUsageMetrics().getCumulativeCpuUsage(),
+            copy.getMapResourceUsageMetrics().getCumulativeCpuUsage());
+    assertEquals(test.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage(),
+            copy.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage());
+
+  }
+
+  /*
+   * simple test GridmixSplit (copy, getters, write, read..)
+   */
+  @Test (timeout=1000)
+  public void testGridmixSplit() throws Exception {
+    Path[] files = {new Path("one"), new Path("two")};
+    long[] start = {1, 2};
+    long[] lengths = {100, 200};
+    String[] locations = {"locOne", "loctwo"};
+
+    CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
+            locations);
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+    metrics.setCumulativeCpuUsage(200);
+
+    double[] reduceBytes = {8.1d, 8.2d};
+    double[] reduceRecords = {9.1d, 9.2d};
+    long[] reduceOutputBytes = {101L, 102L};
+    long[] reduceOutputRecords = {111L, 112L};
+
+    GridmixSplit test = new GridmixSplit(cfSplit, 2, 3, 4L, 5L, 6L, 7L,
+            reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords);
+
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(data);
+    test.write(out);
+    GridmixSplit copy = new GridmixSplit();
+    copy.readFields(new DataInputStream(new ByteArrayInputStream(data
+            .toByteArray())));
+
+    // data should be the same
+    assertEquals(test.getId(), copy.getId());
+    assertEquals(test.getMapCount(), copy.getMapCount());
+    assertEquals(test.getInputRecords(), copy.getInputRecords());
+
+    assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
+    assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
+    assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
+    assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
+
+  }
+
+  /*
+   * test LoadMapper loadMapper should write to writer record for each reduce
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Test (timeout=10000)
+  public void testLoadMapper() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.NUM_REDUCES, 2);
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+
+    TaskAttemptID taskId = new TaskAttemptID();
+    RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
+
+    LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
+
+    OutputCommitter committer = new CustomOutputCommitter();
+    StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
+    LoadSplit split = getLoadSplit();
+
+    MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
+            conf, taskId, reader, writer, committer, reporter, split);
+    // context
+    Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
+            .getMapContext(mapContext);
+
+    reader.initialize(split, ctx);
+    ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+    CompressionEmulationUtil.setCompressionEmulationEnabled(
+            ctx.getConfiguration(), true);
+
+    LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
+    // setup, map, clean
+    mapper.run(ctx);
+
+    Map<GridmixKey, GridmixRecord> data = writer.getData();
+    // check result
+    assertEquals(2, data.size());
+
+  }
+
+  private LoadSplit getLoadSplit() throws Exception {
+
+    Path[] files = {new Path("one"), new Path("two")};
+    long[] start = {1, 2};
+    long[] lengths = {100, 200};
+    String[] locations = {"locOne", "loctwo"};
+
+    CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
+            locations);
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+    metrics.setCumulativeCpuUsage(200);
+    ResourceUsageMetrics[] rMetrics = {metrics};
+
+    double[] reduceBytes = {8.1d, 8.2d};
+    double[] reduceRecords = {9.1d, 9.2d};
+    long[] reduceOutputBytes = {101L, 102L};
+    long[] reduceOutputRecords = {111L, 112L};
+
+    return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L,
+            reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
+            metrics, rMetrics);
+  }
+
+  private class FakeRecordLLReader extends
+          RecordReader<LongWritable, LongWritable> {
+
+    int counter = 10;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      counter--;
+      return counter > 0;
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException,
+            InterruptedException {
+
+      return new LongWritable(counter);
+    }
+
+    @Override
+    public LongWritable getCurrentValue() throws IOException,
+            InterruptedException {
+      return new LongWritable(counter * 10);
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return counter / 10.0f;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // restore data
+      counter = 10;
+    }
+  }
+
+  private class FakeRecordReader extends
+          RecordReader<NullWritable, GridmixRecord> {
+
+    int counter = 10;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      counter--;
+      return counter > 0;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException,
+            InterruptedException {
+
+      return NullWritable.get();
+    }
+
+    @Override
+    public GridmixRecord getCurrentValue() throws IOException,
+            InterruptedException {
+      return new GridmixRecord(100, 100L);
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return counter / 10.0f;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // restore data
+      counter = 10;
+    }
+  }
+
+  private class LoadRecordGkGrWriter extends
+          RecordWriter<GridmixKey, GridmixRecord> {
+    private Map<GridmixKey, GridmixRecord> data = new HashMap<GridmixKey, GridmixRecord>();
+
+    @Override
+    public void write(GridmixKey key, GridmixRecord value) throws IOException,
+            InterruptedException {
+      data.put(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+    }
+
+    public Map<GridmixKey, GridmixRecord> getData() {
+      return data;
+    }
+
+  }
+
+  private class LoadRecordGkNullWriter extends
+          RecordWriter<GridmixKey, NullWritable> {
+    private Map<GridmixKey, NullWritable> data = new HashMap<GridmixKey, NullWritable>();
+
+    @Override
+    public void write(GridmixKey key, NullWritable value) throws IOException,
+            InterruptedException {
+      data.put(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+    }
+
+    public Map<GridmixKey, NullWritable> getData() {
+      return data;
+    }
+
+  }
+
+  private class LoadRecordWriter extends
+          RecordWriter<NullWritable, GridmixRecord> {
+    private Map<NullWritable, GridmixRecord> data = new HashMap<NullWritable, GridmixRecord>();
+
+    @Override
+    public void write(NullWritable key, GridmixRecord value)
+            throws IOException, InterruptedException {
+      data.put(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+    }
+
+    public Map<NullWritable, GridmixRecord> getData() {
+      return data;
+    }
+
+  }
+
+  /*
+   * test LoadSortComparator
+   */
+  @Test (timeout=1000)
+  public void testLoadJobLoadSortComparator() throws Exception {
+    LoadJob.LoadSortComparator test = new LoadJob.LoadSortComparator();
+
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(data);
+    WritableUtils.writeVInt(dos, 2);
+    WritableUtils.writeVInt(dos, 1);
+    WritableUtils.writeVInt(dos, 4);
+    WritableUtils.writeVInt(dos, 7);
+    WritableUtils.writeVInt(dos, 4);
+
+    byte[] b1 = data.toByteArray();
+
+    byte[] b2 = data.toByteArray();
+
+    // the same data should be equals
+    assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1));
+    b2[2] = 5;
+    // compare like GridMixKey first byte: shift count -1=4-5
+    assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
+    b2[2] = 2;
+    // compare like GridMixKey first byte: shift count 2=4-2
+    assertEquals(2, test.compare(b1, 0, 1, b2, 0, 1));
+    // compare arrays by first byte witch offset (2-1) because 4==4
+    b2[2] = 4;
+    assertEquals(1, test.compare(b1, 0, 1, b2, 1, 1));
+
+  }
+
+  /*
+   * test SpecGroupingComparator
+   */
+  @Test (timeout=1000)
+  public void testGridmixJobSpecGroupingComparator() throws Exception {
+    GridmixJob.SpecGroupingComparator test = new GridmixJob.SpecGroupingComparator();
+
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(data);
+    WritableUtils.writeVInt(dos, 2);
+    WritableUtils.writeVInt(dos, 1);
+    // 0: REDUCE SPEC
+    WritableUtils.writeVInt(dos, 0);
+    WritableUtils.writeVInt(dos, 7);
+    WritableUtils.writeVInt(dos, 4);
+
+    byte[] b1 = data.toByteArray();
+
+    byte[] b2 = data.toByteArray();
+
+    // the same object should be equals
+    assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1));
+    b2[2] = 1;
+    // for Reduce
+    assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
+    // by Reduce spec
+    b2[2] = 1; // 1: DATA SPEC
+    assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
+    // compare GridmixKey the same objects should be equals
+    assertEquals(0, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2),
+            new GridmixKey(GridmixKey.DATA, 100, 2)));
+    // REDUSE SPEC
+    assertEquals(-1, test.compare(
+            new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2), new GridmixKey(
+            GridmixKey.DATA, 100, 2)));
+    assertEquals(1, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2),
+            new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2)));
+    // only DATA
+    assertEquals(2, test.compare(new GridmixKey(GridmixKey.DATA, 102, 2),
+            new GridmixKey(GridmixKey.DATA, 100, 2)));
+
+  }
+
+  /*
+   * test CompareGridmixJob only equals and compare
+   */
+  @Test (timeout=10000)
+  public void testCompareGridmixJob() throws Exception {
+    Configuration conf = new Configuration();
+    Path outRoot = new Path("target");
+    JobStory jobDesc = mock(JobStory.class);
+    when(jobDesc.getName()).thenReturn("JobName");
+    when(jobDesc.getJobConf()).thenReturn(new JobConf(conf));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    GridmixJob j1 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 0);
+    GridmixJob j2 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 0);
+    GridmixJob j3 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 1);
+    GridmixJob j4 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 1);
+
+    assertTrue(j1.equals(j2));
+    assertEquals(0, j1.compareTo(j2));
+    // Only one parameter matters
+    assertFalse(j1.equals(j3));
+    // compare id and submissionMillis
+    assertEquals(-1, j1.compareTo(j3));
+    assertEquals(-1, j1.compareTo(j4));
+
+  }
+
+  /*
+   * test ReadRecordFactory. should read all data from inputstream
+   */
+  @Test (timeout=1000)
+  public void testReadRecordFactory() throws Exception {
+
+    // RecordFactory factory, InputStream src, Configuration conf
+    RecordFactory rf = new FakeRecordFactory();
+    FakeInputStream input = new FakeInputStream();
+    ReadRecordFactory test = new ReadRecordFactory(rf, input,
+            new Configuration());
+    GridmixKey key = new GridmixKey(GridmixKey.DATA, 100, 2);
+    GridmixRecord val = new GridmixRecord(200, 2);
+    while (test.next(key, val)) {
+
+    }
+    // should be read 10* (GridmixKey.size +GridmixRecord.value)
+    assertEquals(3000, input.getCounter());
+    // should be -1 because all data readed;
+    assertEquals(-1, rf.getProgress(), 0.01);
+
+    test.close();
+  }
+
+  private class FakeRecordFactory extends RecordFactory {
+
+    private int counter = 10;
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
+      counter--;
+      return counter >= 0;
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return counter;
+    }
+
+  }
+
+  private class FakeInputStream extends InputStream implements Seekable,
+          PositionedReadable {
+    private long counter;
+
+    @Override
+    public int read() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int realLen = len - off;
+      counter += realLen;
+      for (int i = 0; i < b.length; i++) {
+        b[i] = 0;
+      }
+      return realLen;
+    }
+
+    public long getCounter() {
+      return counter;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return counter;
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length)
+            throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length)
+            throws IOException {
+
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+
+    }
+  }
+
+  private class FakeFSDataInputStream extends FSDataInputStream {
+
+    public FakeFSDataInputStream(InputStream in) throws IOException {
+      super(in);
+
+    }
+
+  }
+
+  /*
+   * test LoadRecordReader. It class reads data from some files.
+   */
+  @Test (timeout=1000)
+  public void testLoadJobLoadRecordReader() throws Exception {
+    LoadJob.LoadRecordReader test = new LoadJob.LoadRecordReader();
+    Configuration conf = new Configuration();
+
+    FileSystem fs1 = mock(FileSystem.class);
+    when(fs1.open((Path) anyObject())).thenReturn(
+            new FakeFSDataInputStream(new FakeInputStream()));
+    Path p1 = mock(Path.class);
+    when(p1.getFileSystem((JobConf) anyObject())).thenReturn(fs1);
+
+    FileSystem fs2 = mock(FileSystem.class);
+    when(fs2.open((Path) anyObject())).thenReturn(
+            new FakeFSDataInputStream(new FakeInputStream()));
+    Path p2 = mock(Path.class);
+    when(p2.getFileSystem((JobConf) anyObject())).thenReturn(fs2);
+
+    Path[] paths = {p1, p2};
+
+    long[] start = {0, 0};
+    long[] lengths = {1000, 1000};
+    String[] locations = {"temp1", "temp2"};
+    CombineFileSplit cfsplit = new CombineFileSplit(paths, start, lengths,
+            locations);
+    double[] reduceBytes = {100, 100};
+    double[] reduceRecords = {2, 2};
+    long[] reduceOutputBytes = {500, 500};
+    long[] reduceOutputRecords = {2, 2};
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+    ResourceUsageMetrics[] rMetrics = {new ResourceUsageMetrics(),
+            new ResourceUsageMetrics()};
+    LoadSplit input = new LoadSplit(cfsplit, 2, 3, 1500L, 2L, 3000L, 2L,
+            reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
+            metrics, rMetrics);
+    TaskAttemptID taskId = new TaskAttemptID();
+    TaskAttemptContext ctx = new TaskAttemptContextImpl(conf, taskId);
+    test.initialize(input, ctx);
+    GridmixRecord gr = test.getCurrentValue();
+    int counter = 0;
+    while (test.nextKeyValue()) {
+      gr = test.getCurrentValue();
+      if (counter == 0) {
+        // read first file
+        assertEquals(0.5, test.getProgress(), 0.001);
+      } else if (counter == 1) {
+        // read second file
+        assertEquals(1.0, test.getProgress(), 0.001);
+      }
+      //
+      assertEquals(1000, gr.getSize());
+      counter++;
+    }
+    assertEquals(1000, gr.getSize());
+    // Two files have been read
+    assertEquals(2, counter);
+
+    test.close();
+  }
+
+  /*
+   * test LoadReducer
+   */
+
+  @Test (timeout=1000)
+  public void testLoadJobLoadReducer() throws Exception {
+    LoadJob.LoadReducer test = new LoadJob.LoadReducer();
+
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.NUM_REDUCES, 2);
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+    TaskAttemptID taskid = new TaskAttemptID();
+
+    RawKeyValueIterator input = new FakeRawKeyValueIterator();
+
+    Counter counter = new GenericCounter();
+    Counter inputValueCounter = new GenericCounter();
+    LoadRecordWriter output = new LoadRecordWriter();
+
+    OutputCommitter committer = new CustomOutputCommitter();
+
+    StatusReporter reporter = new DummyReporter();
+    RawComparator<GridmixKey> comparator = new FakeRawComparator();
+
+    ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
+            conf, taskid, input, counter, inputValueCounter, output, committer,
+            reporter, comparator, GridmixKey.class, GridmixRecord.class);
+    // read for previous data
+    reduceContext.nextKeyValue();
+    org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
+            .getReducerContext(reduceContext);
+
+    // test.setup(context);
+    test.run(context);
+    // have been readed 9 records (-1 for previous)
+    assertEquals(9, counter.getValue());
+    assertEquals(10, inputValueCounter.getValue());
+    assertEquals(1, output.getData().size());
+    GridmixRecord record = output.getData().values().iterator()
+            .next();
+
+    assertEquals(1593, record.getSize());
+  }
+
+  protected class FakeRawKeyValueIterator implements RawKeyValueIterator {
+
+    int counter = 10;
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+      ByteArrayOutputStream dt = new ByteArrayOutputStream();
+      GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L);
+      Spec spec = new Spec();
+      spec.rec_in = counter;
+      spec.rec_out = counter;
+      spec.bytes_out = counter * 100;
+
+      key.setSpec(spec);
+      key.write(new DataOutputStream(dt));
+      DataInputBuffer result = new DataInputBuffer();
+      byte[] b = dt.toByteArray();
+      result.reset(b, 0, b.length);
+      return result;
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+      ByteArrayOutputStream dt = new ByteArrayOutputStream();
+      GridmixRecord key = new GridmixRecord(100, 1);
+      key.write(new DataOutputStream(dt));
+      DataInputBuffer result = new DataInputBuffer();
+      byte[] b = dt.toByteArray();
+      result.reset(b, 0, b.length);
+      return result;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      counter--;
+      return counter >= 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Progress getProgress() {
+      return null;
+    }
+
+  }
+
+  private class FakeRawComparator implements RawComparator<GridmixKey> {
+
+    @Override
+    public int compare(GridmixKey o1, GridmixKey o2) {
+      return o1.compareTo(o2);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      if ((l1 - s1) != (l2 - s2)) {
+        return (l1 - s1) - (l2 - s2);
+      }
+      int len = l1 - s1;
+      for (int i = 0; i < len; i++) {
+        if (b1[s1 + i] != b2[s2 + i]) {
+          return b1[s1 + i] - b2[s2 + i];
+        }
+      }
+      return 0;
+    }
+
+  }
+
+  /*
+   * test SerialJobFactory
+   */
+  @Test (timeout=40000)
+  public void testSerialReaderThread() throws Exception {
+
+    Configuration conf = new Configuration();
+    File fin = new File("src" + File.separator + "test" + File.separator
+            + "resources" + File.separator + "data" + File.separator
+            + "wordcount2.json");
+    // read couple jobs from wordcount2.json
+    JobStoryProducer jobProducer = new ZombieJobProducer(new Path(
+            fin.getAbsolutePath()), null, conf);
+    CountDownLatch startFlag = new CountDownLatch(1);
+    UserResolver resolver = new SubmitterUserResolver();
+    FakeJobSubmitter submitter = new FakeJobSubmitter();
+    File ws = new File("target" + File.separator + this.getClass().getName());
+    if (!ws.exists()) {
+      Assert.assertTrue(ws.mkdirs());
+    }
+
+    SerialJobFactory jobFactory = new SerialJobFactory(submitter, jobProducer,
+            new Path(ws.getAbsolutePath()), conf, startFlag, resolver);
+
+    Path ioPath = new Path(ws.getAbsolutePath());
+    jobFactory.setDistCacheEmulator(new DistributedCacheEmulator(conf, ioPath));
+    Thread test = jobFactory.createReaderThread();
+    test.start();
+    Thread.sleep(1000);
+    // SerialReaderThread waits startFlag
+    assertEquals(0, submitter.getJobs().size());
+    // start!
+    startFlag.countDown();
+    while (test.isAlive()) {
+      Thread.sleep(1000);
+      jobFactory.update(null);
+    }
+    // submitter was called twice
+    assertEquals(2, submitter.getJobs().size());
+  }
+
+  private class FakeJobSubmitter extends JobSubmitter {
+    // counter for submitted jobs
+    private List<GridmixJob> jobs = new ArrayList<GridmixJob>();
+
+    public FakeJobSubmitter() {
+      super(null, 1, 1, null, null);
+
+    }
+
+    @Override
+    public void add(GridmixJob job) throws InterruptedException {
+      jobs.add(job);
+    }
+
+    public List<GridmixJob> getJobs() {
+      return jobs;
+    }
+  }
+
+  /*
+   * test SleepMapper
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Test (timeout=10000)
+  public void testSleepMapper() throws Exception {
+    SleepJob.SleepMapper test = new SleepJob.SleepMapper();
+
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.NUM_REDUCES, 2);
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+    TaskAttemptID taskId = new TaskAttemptID();
+    FakeRecordLLReader reader = new FakeRecordLLReader();
+    LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
+    OutputCommitter committer = new CustomOutputCommitter();
+    StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
+    SleepSplit split = getSleepSplit();
+    MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
+            conf, taskId, reader, writer, committer, reporter, split);
+    Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
+            .getMapContext(mapcontext);
+
+    long start = System.currentTimeMillis();
+    LOG.info("start:" + start);
+    LongWritable key = new LongWritable(start + 2000);
+    LongWritable value = new LongWritable(start + 2000);
+    // should slip 2 sec
+    test.map(key, value, context);
+    LOG.info("finish:" + System.currentTimeMillis());
+    assertTrue(System.currentTimeMillis() >= (start + 2000));
+
+    test.cleanup(context);
+    assertEquals(1, writer.getData().size());
+  }
+
+  private SleepSplit getSleepSplit() throws Exception {
+
+    String[] locations = {"locOne", "loctwo"};
+
+    long[] reduceDurations = {101L, 102L};
+
+    return new SleepSplit(0, 2000L, reduceDurations, 2, locations);
+  }
+
+  /*
+   * test SleepReducer
+   */
+  @Test (timeout=1000)
+  public void testSleepReducer() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.NUM_REDUCES, 2);
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+    TaskAttemptID taskId = new TaskAttemptID();
+
+    RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();
+
+    Counter counter = new GenericCounter();
+    Counter inputValueCounter = new GenericCounter();
+    RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();
+
+    OutputCommitter committer = new CustomOutputCommitter();
+
+    StatusReporter reporter = new DummyReporter();
+    RawComparator<GridmixKey> comparator = new FakeRawComparator();
+
+    ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
+            conf, taskId, input, counter, inputValueCounter, output, committer,
+            reporter, comparator, GridmixKey.class, NullWritable.class);
+    org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
+            .getReducerContext(reducecontext);
+
+    SleepReducer test = new SleepReducer();
+    long start = System.currentTimeMillis();
+    test.setup(context);
+    long sleeper = context.getCurrentKey().getReduceOutputBytes();
+    // status has been changed
+    assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus());
+    // should sleep 0.9 sec
+
+    assertTrue(System.currentTimeMillis() >= (start + sleeper));
+    test.cleanup(context);
+    // status has been changed again
+
+    assertEquals("Slept for " + sleeper, context.getStatus());
+
+  }
+
+  private class LoadRecordReduceWriter extends
+          RecordWriter<NullWritable, NullWritable> {
+
+    @Override
+    public void write(NullWritable key, NullWritable value) throws IOException,
+            InterruptedException {
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+    }
+
+  }
+
+  protected class FakeRawKeyValueReducerIterator implements RawKeyValueIterator {
+
+    int counter = 10;
+
+    @Override
+    public DataInputBuffer getKey() throws IOException {
+      ByteArrayOutputStream dt = new ByteArrayOutputStream();
+      GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L);
+      Spec spec = new Spec();
+      spec.rec_in = counter;
+      spec.rec_out = counter;
+      spec.bytes_out = counter * 100;
+
+      key.setSpec(spec);
+      key.write(new DataOutputStream(dt));
+      DataInputBuffer result = new DataInputBuffer();
+      byte[] b = dt.toByteArray();
+      result.reset(b, 0, b.length);
+      return result;
+    }
+
+    @Override
+    public DataInputBuffer getValue() throws IOException {
+      ByteArrayOutputStream dt = new ByteArrayOutputStream();
+      NullWritable key = NullWritable.get();
+      key.write(new DataOutputStream(dt));
+      DataInputBuffer result = new DataInputBuffer();
+      byte[] b = dt.toByteArray();
+      result.reset(b, 0, b.length);
+      return result;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      counter--;
+      return counter >= 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Progress getProgress() {
+      return null;
+    }
+
+  }
+}

+ 202 - 0
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.zip.GZIPInputStream;
+
+import static org.junit.Assert.*;
+
+public class TestGridmixSubmission extends CommonJobTest {
+  private static File inSpace = new File("src" + File.separator + "test"
+          + File.separator + "resources" + File.separator + "data");
+
+
+  static {
+    ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix"))
+            .getLogger().setLevel(Level.DEBUG);
+  }
+
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster(TestGridmixSubmission.class);
+
+    System.setProperty("src.test.data", inSpace.getAbsolutePath());
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+  /**
+   * Verifies that the given {@code JobStory} corresponds to the checked-in
+   * WordCount {@code JobStory}. The verification is effected via JUnit
+   * assertions.
+   *
+   * @param js the candidate JobStory.
+   */
+  private void verifyWordCountJobStory(JobStory js) {
+    assertNotNull("Null JobStory", js);
+    String expectedJobStory = "WordCount:johndoe:default:1285322645148:3:1";
+    String actualJobStory = js.getName() + ":" + js.getUser() + ":"
+            + js.getQueueName() + ":" + js.getSubmissionTime() + ":"
+            + js.getNumberMaps() + ":" + js.getNumberReduces();
+    assertEquals("Unexpected JobStory", expectedJobStory, actualJobStory);
+  }
+
+  /**
+   * Expands a file compressed using {@code gzip}.
+   *
+   * @param fs  the {@code FileSystem} corresponding to the given file.
+   * @param in  the path to the compressed file.
+   * @param out the path to the uncompressed output.
+   * @throws Exception if there was an error during the operation.
+   */
+  private void expandGzippedTrace(FileSystem fs, Path in, Path out)
+          throws Exception {
+    byte[] buff = new byte[4096];
+    GZIPInputStream gis = new GZIPInputStream(fs.open(in));
+    FSDataOutputStream fsdOs = fs.create(out);
+    int numRead;
+    while ((numRead = gis.read(buff, 0, buff.length)) != -1) {
+      fsdOs.write(buff, 0, numRead);
+    }
+    gis.close();
+    fsdOs.close();
+  }
+
+  /**
+   * Tests the reading of traces in GridMix3. These traces are generated by
+   * Rumen and are in the JSON format. The traces can optionally be compressed
+   * and uncompressed traces can also be passed to GridMix3 via its standard
+   * input stream. The testing is effected via JUnit assertions.
+   *
+   * @throws Exception if there was an error.
+   */
+  @Test (timeout=20000)
+  public void testTraceReader() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    Path rootInputDir = new Path(System.getProperty("src.test.data"));
+    rootInputDir = rootInputDir.makeQualified(lfs.getUri(),
+            lfs.getWorkingDirectory());
+    Path rootTempDir = new Path(System.getProperty("test.build.data",
+            System.getProperty("java.io.tmpdir")), "testTraceReader");
+    rootTempDir = rootTempDir.makeQualified(lfs.getUri(),
+            lfs.getWorkingDirectory());
+    Path inputFile = new Path(rootInputDir, "wordcount.json.gz");
+    Path tempFile = new Path(rootTempDir, "gridmix3-wc.json");
+
+    InputStream origStdIn = System.in;
+    InputStream tmpIs = null;
+    try {
+      DebugGridmix dgm = new DebugGridmix();
+      JobStoryProducer jsp = dgm.createJobStoryProducer(inputFile.toString(),
+              conf);
+
+      LOG.info("Verifying JobStory from compressed trace...");
+      verifyWordCountJobStory(jsp.getNextJob());
+
+      expandGzippedTrace(lfs, inputFile, tempFile);
+      jsp = dgm.createJobStoryProducer(tempFile.toString(), conf);
+      LOG.info("Verifying JobStory from uncompressed trace...");
+      verifyWordCountJobStory(jsp.getNextJob());
+
+      tmpIs = lfs.open(tempFile);
+      System.setIn(tmpIs);
+      LOG.info("Verifying JobStory from trace in standard input...");
+      jsp = dgm.createJobStoryProducer("-", conf);
+      verifyWordCountJobStory(jsp.getNextJob());
+    } finally {
+      System.setIn(origStdIn);
+      if (tmpIs != null) {
+        tmpIs.close();
+      }
+      lfs.delete(rootTempDir, true);
+    }
+  }
+
+  @Test (timeout=500000)
+  public void testReplaySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.REPLAY;
+    LOG.info(" Replay started at " + System.currentTimeMillis());
+    doSubmission(null, false);
+    LOG.info(" Replay ended at " + System.currentTimeMillis());
+
+  }
+
+  @Test (timeout=500000)
+  public void testStressSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    LOG.info(" Stress started at " + System.currentTimeMillis());
+    doSubmission(null, false);
+    LOG.info(" Stress ended at " + System.currentTimeMillis());
+  }
+
+  // test empty request should be hint message
+  @Test (timeout=100000)
+  public void testMain() throws Exception {
+
+    SecurityManager securityManager = System.getSecurityManager();
+
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    final PrintStream out = new PrintStream(bytes);
+    final PrintStream oldOut = System.out;
+    System.setErr(out);
+    ExitUtil.disableSystemExit();
+    try {
+      String[] argv = new String[0];
+      DebugGridmix.main(argv);
+
+    } catch (ExitUtil.ExitException e) {
+      assertEquals("ExitException", e.getMessage());
+      ExitUtil.resetFirstExitException();
+    } finally {
+      System.setErr(oldOut);
+      System.setSecurityManager(securityManager);
+    }
+    String print = bytes.toString();
+    // should be printed tip in std error stream
+    assertTrue(print
+            .contains("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>"));
+    assertTrue(print.contains("e.g. gridmix -generate 100m foo -"));
+  }
+
+ 
+}

+ 5 - 6
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java

@@ -133,7 +133,7 @@ public class TestGridmixSummary {
   /**
   /**
    * A fake {@link JobFactory}.
    * A fake {@link JobFactory}.
    */
    */
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings("rawtypes")
   private static class FakeJobFactory extends JobFactory {
   private static class FakeJobFactory extends JobFactory {
     /**
     /**
      * A fake {@link JobStoryProducer} for {@link FakeJobFactory}.
      * A fake {@link JobStoryProducer} for {@link FakeJobFactory}.
@@ -167,7 +167,7 @@ public class TestGridmixSummary {
    * Test {@link ExecutionSummarizer}.
    * Test {@link ExecutionSummarizer}.
    */
    */
   @Test
   @Test
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   public void testExecutionSummarizer() throws IOException {
   public void testExecutionSummarizer() throws IOException {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     
     
@@ -360,7 +360,6 @@ public class TestGridmixSummary {
    * Test {@link ClusterSummarizer}.
    * Test {@link ClusterSummarizer}.
    */
    */
   @Test
   @Test
-  @SuppressWarnings("deprecation")
   public void testClusterSummarizer() throws IOException {
   public void testClusterSummarizer() throws IOException {
     ClusterSummarizer cs = new ClusterSummarizer();
     ClusterSummarizer cs = new ClusterSummarizer();
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
@@ -374,13 +373,13 @@ public class TestGridmixSummary {
     assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo());
     assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo());
     assertEquals("NN name mismatch", nn, cs.getNamenodeInfo());
     assertEquals("NN name mismatch", nn, cs.getNamenodeInfo());
     
     
-    ClusterStats cstats = ClusterStats.getClusterStats();
+    ClusterStats cStats = ClusterStats.getClusterStats();
     conf.set(JTConfig.JT_IPC_ADDRESS, "local");
     conf.set(JTConfig.JT_IPC_ADDRESS, "local");
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local");
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local");
     JobClient jc = new JobClient(conf);
     JobClient jc = new JobClient(conf);
-    cstats.setClusterMetric(jc.getClusterStatus());
+    cStats.setClusterMetric(jc.getClusterStatus());
     
     
-    cs.update(cstats);
+    cs.update(cStats);
     
     
     // test
     // test
     assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks());
     assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks());

+ 81 - 0
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestLoadJob.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/*
+ Test LoadJob Gridmix sends data to job and after that
+ */
+public class TestLoadJob extends CommonJobTest {
+
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+  static {
+    ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix"))
+            .getLogger().setLevel(Level.DEBUG);
+    ((Log4JLogger) LogFactory.getLog(StressJobFactory.class)).getLogger()
+            .setLevel(Level.DEBUG);
+  }
+
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster(TestLoadJob.class);
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+
+  /*
+  * test serial policy  with LoadJob. Task should execute without exceptions
+  */
+  @Test (timeout=500000)
+  public void testSerialSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.SERIAL;
+    LOG.info("Serial started at " + System.currentTimeMillis());
+    doSubmission(JobCreator.LOADJOB.name(), false);
+
+    LOG.info("Serial ended at " + System.currentTimeMillis());
+  }
+
+  /*
+   * test reply policy with LoadJob
+   */
+  @Test  (timeout=500000)
+  public void testReplaySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.REPLAY;
+    LOG.info(" Replay started at " + System.currentTimeMillis());
+    doSubmission(JobCreator.LOADJOB.name(), false);
+
+    LOG.info(" Replay ended at " + System.currentTimeMillis());
+  }
+
+
+}

+ 142 - 0
hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestSleepJob.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestSleepJob extends CommonJobTest {
+
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+  static {
+    ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.mapred.gridmix"))
+            .getLogger().setLevel(Level.DEBUG);
+  }
+
+  static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster(TestSleepJob.class);
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+
+  /*
+  * test RandomLocation
+  */
+  @Test
+  public void testRandomLocation() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+
+    testRandomLocation(1, 10, ugi);
+    testRandomLocation(2, 10, ugi);
+  }
+
+  @Test
+  public void testMapTasksOnlySleepJobs() throws Exception {
+    Configuration configuration = GridmixTestUtils.mrvl.getConfig();
+
+    DebugJobProducer jobProducer = new DebugJobProducer(5, configuration);
+    configuration.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true);
+
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    JobStory story;
+    int seq = 1;
+    while ((story = jobProducer.getNextJob()) != null) {
+      GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(configuration, 0,
+              story, new Path("ignored"), ugi, seq++);
+      gridmixJob.buildSplits(null);
+      Job job = gridmixJob.call();
+      assertEquals(0, job.getNumReduceTasks());
+    }
+    jobProducer.close();
+    assertEquals(6, seq);
+  }
+
+  // test Serial submit
+  @Test
+  public void testSerialSubmit() throws Exception {
+    // set policy
+    policy = GridmixJobSubmissionPolicy.SERIAL;
+    LOG.info("Serial started at " + System.currentTimeMillis());
+    doSubmission(JobCreator.SLEEPJOB.name(), false);
+    LOG.info("Serial ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testReplaySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.REPLAY;
+    LOG.info(" Replay started at " + System.currentTimeMillis());
+    doSubmission(JobCreator.SLEEPJOB.name(), false);
+    LOG.info(" Replay ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testStressSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    LOG.info(" Replay started at " + System.currentTimeMillis());
+    doSubmission(JobCreator.SLEEPJOB.name(), false);
+    LOG.info(" Replay ended at " + System.currentTimeMillis());
+  }
+
+  private void testRandomLocation(int locations, int njobs,
+                                  UserGroupInformation ugi) throws Exception {
+    Configuration configuration = new Configuration();
+
+    DebugJobProducer jobProducer = new DebugJobProducer(njobs, configuration);
+    Configuration jconf = GridmixTestUtils.mrvl.getConfig();
+    jconf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations);
+
+    JobStory story;
+    int seq = 1;
+    while ((story = jobProducer.getNextJob()) != null) {
+      GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
+              story, new Path("ignored"), ugi, seq++);
+      gridmixJob.buildSplits(null);
+      List<InputSplit> splits = new SleepJob.SleepInputFormat()
+              .getSplits(gridmixJob.getJob());
+      for (InputSplit split : splits) {
+        assertEquals(locations, split.getLocations().length);
+      }
+    }
+    jobProducer.close();
+  }
+
+}

+ 414 - 0
hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount.json

@@ -0,0 +1,414 @@
+{
+  "priority" : "NORMAL",
+  "jobID" : "job_201009241532_0001",
+  "user" : "johndoe",
+  "jobName" : "WordCount",
+  "mapTasks" : [ {
+    "startTime" : 1285322651360,
+    "taskID" : "task_201009241532_0001_m_000000",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322651366,
+      "finishTime" : 1285322658262,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000000_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 704270,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 48266,
+      "mapInputRecords" : 13427,
+      "mapOutputBytes" : 1182333,
+      "mapOutputRecords" : 126063,
+      "combineInputRecords" : 126063,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 6612,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322660778,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 704270,
+    "inputRecords" : 13427,
+    "outputBytes" : 48266,
+    "outputRecords" : 126063
+  }, {
+    "startTime" : 1285322651361,
+    "taskID" : "task_201009241532_0001_m_000001",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322651378,
+      "finishTime" : 1285322657906,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000001_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 577214,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 58143,
+      "mapInputRecords" : 13015,
+      "mapOutputBytes" : 985534,
+      "mapOutputRecords" : 108400,
+      "combineInputRecords" : 108400,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 8214,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322660781,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 577214,
+    "inputRecords" : 13015,
+    "outputBytes" : 58143,
+    "outputRecords" : 108400
+  }, {
+    "startTime" : 1285322660789,
+    "taskID" : "task_201009241532_0001_m_000002",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322660807,
+      "finishTime" : 1285322664865,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000002_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 163907,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 21510,
+      "mapInputRecords" : 3736,
+      "mapOutputBytes" : 275796,
+      "mapOutputRecords" : 30528,
+      "combineInputRecords" : 30528,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 3040,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322666805,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 163907,
+    "inputRecords" : 3736,
+    "outputBytes" : 21510,
+    "outputRecords" : 30528
+  } ],
+  "finishTime" : 1285322675837,
+  "reduceTasks" : [ {
+    "startTime" : 1285322660790,
+    "taskID" : "task_201009241532_0001_r_000000",
+    "taskType" : "REDUCE",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322660807,
+      "finishTime" : 1285322670759,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_r_000000_0",
+      "shuffleFinished" : 1285322667962,
+      "sortFinished" : 1285322668146,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : 122793,
+      "fileBytesRead" : 111026,
+      "fileBytesWritten" : 111026,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : 0,
+      "reduceInputGroups" : 11713,
+      "reduceInputRecords" : 17866,
+      "reduceShuffleBytes" : 127823,
+      "reduceOutputRecords" : 11713,
+      "spilledRecords" : 17866,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322672821,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 127823,
+    "inputRecords" : 17866,
+    "outputBytes" : 122793,
+    "outputRecords" : 11713
+  } ],
+  "submitTime" : 1285322645148,
+  "launchTime" : 1285322645614,
+  "totalMaps" : 3,
+  "totalReduces" : 1,
+  "otherTasks" : [ {
+    "startTime" : 1285322648294,
+    "taskID" : "task_201009241532_0001_m_000004",
+    "taskType" : "SETUP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322648482,
+      "finishTime" : 1285322649588,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000004_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : -1,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : -1,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 0,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322651351,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : -1,
+    "inputRecords" : -1,
+    "outputBytes" : -1,
+    "outputRecords" : -1
+  }, {
+    "startTime" : 1285322672829,
+    "taskID" : "task_201009241532_0001_m_000003",
+    "taskType" : "CLEANUP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322672838,
+      "finishTime" : 1285322673971,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000003_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : -1,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : -1,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 0,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322675835,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : -1,
+    "inputRecords" : -1,
+    "outputBytes" : -1,
+    "outputRecords" : -1
+  } ],
+  "computonsPerMapInputByte" : -1,
+  "computonsPerMapOutputByte" : -1,
+  "computonsPerReduceInputByte" : -1,
+  "computonsPerReduceOutputByte" : -1,
+  "heapMegabytes" : 1024,
+  "outcome" : "SUCCESS",
+  "jobtype" : "JAVA",
+  "directDependantJobs" : [ ],
+  "successfulMapAttemptCDFs" : [ {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 6896,
+    "minimum" : 4058,
+    "rankings" : [ {
+      "datum" : 4058,
+      "relativeRanking" : 0.05
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.1
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.15
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.2
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.25
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.3
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.35
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.4
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.45
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.5
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.55
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.6
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.65
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.7
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.75
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.8
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.85
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.9
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.95
+    } ],
+    "numberValues" : 3
+  } ],
+  "failedMapAttemptCDFs" : [ {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  } ],
+  "successfulReduceAttemptCDF" : {
+    "maximum" : 9952,
+    "minimum" : 9952,
+    "rankings" : [ {
+      "datum" : 9952,
+      "relativeRanking" : 0.05
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.1
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.15
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.2
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.25
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.3
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.35
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.4
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.45
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.5
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.55
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.6
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.65
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.7
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.75
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.8
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.85
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.9
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.95
+    } ],
+    "numberValues" : 1
+  },
+  "failedReduceAttemptCDF" : {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  },
+  "mapperTriesToSucceed" : [ 1.0 ],
+  "failedMapperFraction" : 0.0,
+  "relativeTime" : 0,
+  "queue" : "default",
+  "clusterMapMB" : -1,
+  "clusterReduceMB" : -1,
+  "jobMapMB" : 1024,
+  "jobReduceMB" : 1024
+}

+ 828 - 0
hadoop-tools/hadoop-gridmix/src/test/resources/data/wordcount2.json

@@ -0,0 +1,828 @@
+{
+  "priority" : "NORMAL",
+  "jobID" : "job_201009241532_0001",
+  "user" : "johndoe",
+  "jobName" : "WordCount",
+  "mapTasks" : [ {
+    "startTime" : 1285322651360,
+    "taskID" : "task_201009241532_0001_m_000000",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322651366,
+      "finishTime" : 1285322658262,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000000_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 704270,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 48266,
+      "mapInputRecords" : 13427,
+      "mapOutputBytes" : 1182333,
+      "mapOutputRecords" : 126063,
+      "combineInputRecords" : 126063,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 6612,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322660778,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 704270,
+    "inputRecords" : 13427,
+    "outputBytes" : 48266,
+    "outputRecords" : 126063
+  }, {
+    "startTime" : 1285322651361,
+    "taskID" : "task_201009241532_0001_m_000001",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322651378,
+      "finishTime" : 1285322657906,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000001_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 577214,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 58143,
+      "mapInputRecords" : 13015,
+      "mapOutputBytes" : 985534,
+      "mapOutputRecords" : 108400,
+      "combineInputRecords" : 108400,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 8214,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322660781,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 577214,
+    "inputRecords" : 13015,
+    "outputBytes" : 58143,
+    "outputRecords" : 108400
+  }, {
+    "startTime" : 1285322660789,
+    "taskID" : "task_201009241532_0001_m_000002",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322660807,
+      "finishTime" : 1285322664865,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000002_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 163907,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 21510,
+      "mapInputRecords" : 3736,
+      "mapOutputBytes" : 275796,
+      "mapOutputRecords" : 30528,
+      "combineInputRecords" : 30528,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 3040,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322666805,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 163907,
+    "inputRecords" : 3736,
+    "outputBytes" : 21510,
+    "outputRecords" : 30528
+  } ],
+  "finishTime" : 1285322675837,
+  "reduceTasks" : [ {
+    "startTime" : 1285322660790,
+    "taskID" : "task_201009241532_0001_r_000000",
+    "taskType" : "REDUCE",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322660807,
+      "finishTime" : 1285322670759,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_r_000000_0",
+      "shuffleFinished" : 1285322667962,
+      "sortFinished" : 1285322668146,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : 122793,
+      "fileBytesRead" : 111026,
+      "fileBytesWritten" : 111026,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : 0,
+      "reduceInputGroups" : 11713,
+      "reduceInputRecords" : 17866,
+      "reduceShuffleBytes" : 127823,
+      "reduceOutputRecords" : 11713,
+      "spilledRecords" : 17866,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322672821,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 127823,
+    "inputRecords" : 17866,
+    "outputBytes" : 122793,
+    "outputRecords" : 11713
+  } ],
+  "submitTime" : 1285322645148,
+  "launchTime" : 1285322645614,
+  "totalMaps" : 3,
+  "totalReduces" : 1,
+  "otherTasks" : [ {
+    "startTime" : 1285322648294,
+    "taskID" : "task_201009241532_0001_m_000004",
+    "taskType" : "SETUP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322648482,
+      "finishTime" : 1285322649588,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000004_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : -1,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : -1,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 0,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322651351,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : -1,
+    "inputRecords" : -1,
+    "outputBytes" : -1,
+    "outputRecords" : -1
+  }, {
+    "startTime" : 1285322672829,
+    "taskID" : "task_201009241532_0001_m_000003",
+    "taskType" : "CLEANUP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322672838,
+      "finishTime" : 1285322673971,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000003_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : -1,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : -1,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 0,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322675835,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : -1,
+    "inputRecords" : -1,
+    "outputBytes" : -1,
+    "outputRecords" : -1
+  } ],
+  "computonsPerMapInputByte" : -1,
+  "computonsPerMapOutputByte" : -1,
+  "computonsPerReduceInputByte" : -1,
+  "computonsPerReduceOutputByte" : -1,
+  "heapMegabytes" : 1024,
+  "outcome" : "SUCCESS",
+  "jobtype" : "JAVA",
+  "directDependantJobs" : [ ],
+  "successfulMapAttemptCDFs" : [ {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 6896,
+    "minimum" : 4058,
+    "rankings" : [ {
+      "datum" : 4058,
+      "relativeRanking" : 0.05
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.1
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.15
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.2
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.25
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.3
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.35
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.4
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.45
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.5
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.55
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.6
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.65
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.7
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.75
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.8
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.85
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.9
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.95
+    } ],
+    "numberValues" : 3
+  } ],
+  "failedMapAttemptCDFs" : [ {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  } ],
+  "successfulReduceAttemptCDF" : {
+    "maximum" : 9952,
+    "minimum" : 9952,
+    "rankings" : [ {
+      "datum" : 9952,
+      "relativeRanking" : 0.05
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.1
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.15
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.2
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.25
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.3
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.35
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.4
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.45
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.5
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.55
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.6
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.65
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.7
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.75
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.8
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.85
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.9
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.95
+    } ],
+    "numberValues" : 1
+  },
+  "failedReduceAttemptCDF" : {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  },
+  "mapperTriesToSucceed" : [ 1.0 ],
+  "failedMapperFraction" : 0.0,
+  "relativeTime" : 0,
+  "queue" : "default",
+  "clusterMapMB" : -1,
+  "clusterReduceMB" : -1,
+  "jobMapMB" : 1024,
+  "jobReduceMB" : 1024
+}
+{
+  "priority" : "NORMAL",
+  "jobID" : "job_201009241532_0001",
+  "user" : "johndoe",
+  "jobName" : "WordCount",
+  "mapTasks" : [ {
+    "startTime" : 1285322651360,
+    "taskID" : "task_201009241532_0001_m_000000",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322651366,
+      "finishTime" : 1285322658262,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000000_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 704270,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 48266,
+      "mapInputRecords" : 13427,
+      "mapOutputBytes" : 1182333,
+      "mapOutputRecords" : 126063,
+      "combineInputRecords" : 126063,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 6612,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322660778,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 704270,
+    "inputRecords" : 13427,
+    "outputBytes" : 48266,
+    "outputRecords" : 126063
+  }, {
+    "startTime" : 1285322651361,
+    "taskID" : "task_201009241532_0001_m_000001",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322651378,
+      "finishTime" : 1285322657906,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000001_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 577214,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 58143,
+      "mapInputRecords" : 13015,
+      "mapOutputBytes" : 985534,
+      "mapOutputRecords" : 108400,
+      "combineInputRecords" : 108400,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 8214,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322660781,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 577214,
+    "inputRecords" : 13015,
+    "outputBytes" : 58143,
+    "outputRecords" : 108400
+  }, {
+    "startTime" : 1285322660789,
+    "taskID" : "task_201009241532_0001_m_000002",
+    "taskType" : "MAP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322660807,
+      "finishTime" : 1285322664865,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000002_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : 163907,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : 21510,
+      "mapInputRecords" : 3736,
+      "mapOutputBytes" : 275796,
+      "mapOutputRecords" : 30528,
+      "combineInputRecords" : 30528,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 3040,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322666805,
+    "preferredLocations" : [ {
+      "layers" : [ "default-rack", "foo.example.com" ]
+    } ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 163907,
+    "inputRecords" : 3736,
+    "outputBytes" : 21510,
+    "outputRecords" : 30528
+  } ],
+  "finishTime" : 1285322675837,
+  "reduceTasks" : [ {
+    "startTime" : 1285322660790,
+    "taskID" : "task_201009241532_0001_r_000000",
+    "taskType" : "REDUCE",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322660807,
+      "finishTime" : 1285322670759,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_r_000000_0",
+      "shuffleFinished" : 1285322667962,
+      "sortFinished" : 1285322668146,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : 122793,
+      "fileBytesRead" : 111026,
+      "fileBytesWritten" : 111026,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : 0,
+      "reduceInputGroups" : 11713,
+      "reduceInputRecords" : 17866,
+      "reduceShuffleBytes" : 127823,
+      "reduceOutputRecords" : 11713,
+      "spilledRecords" : 17866,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322672821,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : 127823,
+    "inputRecords" : 17866,
+    "outputBytes" : 122793,
+    "outputRecords" : 11713
+  } ],
+  "submitTime" : 1285322645148,
+  "launchTime" : 1285322645614,
+  "totalMaps" : 3,
+  "totalReduces" : 1,
+  "otherTasks" : [ {
+    "startTime" : 1285322648294,
+    "taskID" : "task_201009241532_0001_m_000004",
+    "taskType" : "SETUP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322648482,
+      "finishTime" : 1285322649588,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000004_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : -1,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : -1,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 0,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322651351,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : -1,
+    "inputRecords" : -1,
+    "outputBytes" : -1,
+    "outputRecords" : -1
+  }, {
+    "startTime" : 1285322672829,
+    "taskID" : "task_201009241532_0001_m_000003",
+    "taskType" : "CLEANUP",
+    "attempts" : [ {
+      "location" : null,
+      "hostName" : "/default-rack/foo.example.com",
+      "startTime" : 1285322672838,
+      "finishTime" : 1285322673971,
+      "result" : "SUCCESS",
+      "attemptID" : "attempt_201009241532_0001_m_000003_0",
+      "shuffleFinished" : -1,
+      "sortFinished" : -1,
+      "hdfsBytesRead" : -1,
+      "hdfsBytesWritten" : -1,
+      "fileBytesRead" : -1,
+      "fileBytesWritten" : -1,
+      "mapInputRecords" : -1,
+      "mapOutputBytes" : -1,
+      "mapOutputRecords" : -1,
+      "combineInputRecords" : -1,
+      "reduceInputGroups" : -1,
+      "reduceInputRecords" : -1,
+      "reduceShuffleBytes" : -1,
+      "reduceOutputRecords" : -1,
+      "spilledRecords" : 0,
+      "mapInputBytes" : -1
+    } ],
+    "finishTime" : 1285322675835,
+    "preferredLocations" : [ ],
+    "taskStatus" : "SUCCESS",
+    "inputBytes" : -1,
+    "inputRecords" : -1,
+    "outputBytes" : -1,
+    "outputRecords" : -1
+  } ],
+  "computonsPerMapInputByte" : -1,
+  "computonsPerMapOutputByte" : -1,
+  "computonsPerReduceInputByte" : -1,
+  "computonsPerReduceOutputByte" : -1,
+  "heapMegabytes" : 1024,
+  "outcome" : "SUCCESS",
+  "jobtype" : "JAVA",
+  "directDependantJobs" : [ ],
+  "successfulMapAttemptCDFs" : [ {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 6896,
+    "minimum" : 4058,
+    "rankings" : [ {
+      "datum" : 4058,
+      "relativeRanking" : 0.05
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.1
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.15
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.2
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.25
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.3
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.35
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.4
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.45
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.5
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.55
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.6
+    }, {
+      "datum" : 4058,
+      "relativeRanking" : 0.65
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.7
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.75
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.8
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.85
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.9
+    }, {
+      "datum" : 6528,
+      "relativeRanking" : 0.95
+    } ],
+    "numberValues" : 3
+  } ],
+  "failedMapAttemptCDFs" : [ {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  }, {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  } ],
+  "successfulReduceAttemptCDF" : {
+    "maximum" : 9952,
+    "minimum" : 9952,
+    "rankings" : [ {
+      "datum" : 9952,
+      "relativeRanking" : 0.05
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.1
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.15
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.2
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.25
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.3
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.35
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.4
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.45
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.5
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.55
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.6
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.65
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.7
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.75
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.8
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.85
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.9
+    }, {
+      "datum" : 9952,
+      "relativeRanking" : 0.95
+    } ],
+    "numberValues" : 1
+  },
+  "failedReduceAttemptCDF" : {
+    "maximum" : 9223372036854775807,
+    "minimum" : -9223372036854775808,
+    "rankings" : [ ],
+    "numberValues" : 0
+  },
+  "mapperTriesToSucceed" : [ 1.0 ],
+  "failedMapperFraction" : 0.0,
+  "relativeTime" : 0,
+  "queue" : "default",
+  "clusterMapMB" : -1,
+  "clusterReduceMB" : -1,
+  "jobMapMB" : 1024,
+  "jobReduceMB" : 1024
+}