Browse Source

HDFS-14824. [Dynamometer] Dynamometer in org.apache.hadoop.tools does not output the benchmark results. (#1685)

Takanobu Asanuma 5 years ago
parent
commit
477505ccfc
12 changed files with 366 additions and 48 deletions
  1. 7 0
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java
  2. 6 1
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java
  3. 2 1
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java
  4. 10 17
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java
  5. 18 13
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java
  6. 30 9
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
  7. 44 0
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java
  8. 18 0
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java
  9. 82 0
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java
  10. 111 0
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java
  11. 35 7
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java
  12. 3 0
      hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md

+ 7 - 0
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java

@@ -166,6 +166,7 @@ public class Client extends Configured implements Tool {
   public static final String WORKLOAD_REPLAY_ENABLE_ARG =
   public static final String WORKLOAD_REPLAY_ENABLE_ARG =
       "workload_replay_enable";
       "workload_replay_enable";
   public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path";
   public static final String WORKLOAD_INPUT_PATH_ARG = "workload_input_path";
+  public static final String WORKLOAD_OUTPUT_PATH_ARG = "workload_output_path";
   public static final String WORKLOAD_THREADS_PER_MAPPER_ARG =
   public static final String WORKLOAD_THREADS_PER_MAPPER_ARG =
       "workload_threads_per_mapper";
       "workload_threads_per_mapper";
   public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay";
   public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay";
@@ -231,6 +232,8 @@ public class Client extends Configured implements Tool {
   private volatile Job workloadJob;
   private volatile Job workloadJob;
   // The input path for the workload job.
   // The input path for the workload job.
   private String workloadInputPath = "";
   private String workloadInputPath = "";
+  // The output path for the workload job metric results.
+  private String workloadOutputPath = "";
   // The number of threads to use per mapper for the workload job.
   // The number of threads to use per mapper for the workload job.
   private int workloadThreadsPerMapper;
   private int workloadThreadsPerMapper;
   // The startup delay for the workload job.
   // The startup delay for the workload job.
@@ -347,6 +350,8 @@ public class Client extends Configured implements Tool {
         + "audit logs against the HDFS cluster which is started.");
         + "audit logs against the HDFS cluster which is started.");
     opts.addOption(WORKLOAD_INPUT_PATH_ARG, true,
     opts.addOption(WORKLOAD_INPUT_PATH_ARG, true,
         "Location of the audit traces to replay (Required for workload)");
         "Location of the audit traces to replay (Required for workload)");
+    opts.addOption(WORKLOAD_OUTPUT_PATH_ARG, true,
+        "Location of the metrics output (Required for workload)");
     opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads "
     opts.addOption(WORKLOAD_THREADS_PER_MAPPER_ARG, true, "Number of threads "
         + "per mapper to use to replay the workload. (default "
         + "per mapper to use to replay the workload. (default "
         + AuditReplayMapper.NUM_THREADS_DEFAULT + ")");
         + AuditReplayMapper.NUM_THREADS_DEFAULT + ")");
@@ -476,6 +481,7 @@ public class Client extends Configured implements Tool {
       }
       }
       launchWorkloadJob = true;
       launchWorkloadJob = true;
       workloadInputPath = commandLine.getOptionValue(WORKLOAD_INPUT_PATH_ARG);
       workloadInputPath = commandLine.getOptionValue(WORKLOAD_INPUT_PATH_ARG);
+      workloadOutputPath = commandLine.getOptionValue(WORKLOAD_OUTPUT_PATH_ARG);
       workloadThreadsPerMapper = Integer
       workloadThreadsPerMapper = Integer
           .parseInt(commandLine.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG,
           .parseInt(commandLine.getOptionValue(WORKLOAD_THREADS_PER_MAPPER_ARG,
               String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT)));
               String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT)));
@@ -1032,6 +1038,7 @@ public class Client extends Configured implements Tool {
           + workloadStartDelayMs;
           + workloadStartDelayMs;
       Configuration workloadConf = new Configuration(getConf());
       Configuration workloadConf = new Configuration(getConf());
       workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
       workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
+      workloadConf.set(AuditReplayMapper.OUTPUT_PATH_KEY, workloadOutputPath);
       workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY,
       workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY,
           workloadThreadsPerMapper);
           workloadThreadsPerMapper);
       workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY,
       workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY,

+ 6 - 1
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java

@@ -122,7 +122,7 @@ public class TestDynamometerInfra {
   private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path";
   private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path";
   private static final String HADOOP_BIN_VERSION_KEY =
   private static final String HADOOP_BIN_VERSION_KEY =
       "dyno.hadoop.bin.version";
       "dyno.hadoop.bin.version";
-  private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.2";
+  private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.3";
   private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740";
   private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740";
   private static final String VERSION_FILENAME = "VERSION";
   private static final String VERSION_FILENAME = "VERSION";
 
 
@@ -132,6 +132,8 @@ public class TestDynamometerInfra {
   private static final String NAMENODE_NODELABEL = "dyno_namenode";
   private static final String NAMENODE_NODELABEL = "dyno_namenode";
   private static final String DATANODE_NODELABEL = "dyno_datanode";
   private static final String DATANODE_NODELABEL = "dyno_datanode";
 
 
+  private static final String OUTPUT_PATH = "/tmp/trace_output_direct";
+
   private static MiniDFSCluster miniDFSCluster;
   private static MiniDFSCluster miniDFSCluster;
   private static MiniYARNCluster miniYARNCluster;
   private static MiniYARNCluster miniYARNCluster;
   private static YarnClient yarnClient;
   private static YarnClient yarnClient;
@@ -408,6 +410,7 @@ public class TestDynamometerInfra {
         return false;
         return false;
       }
       }
     }, 3000, 60000);
     }, 3000, 60000);
+    assertTrue(fs.exists(new Path(OUTPUT_PATH)));
   }
   }
 
 
   private void assertClusterIsFunctional(Configuration localConf,
   private void assertClusterIsFunctional(Configuration localConf,
@@ -477,6 +480,8 @@ public class TestDynamometerInfra {
             "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
             "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
             "-" + Client.WORKLOAD_INPUT_PATH_ARG,
             "-" + Client.WORKLOAD_INPUT_PATH_ARG,
             fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
             fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
+            "-" + Client.WORKLOAD_OUTPUT_PATH_ARG,
+            fs.makeQualified(new Path(OUTPUT_PATH)).toString(),
             "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
             "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
             "-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
             "-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
             "-" + AMOptions.NAMENODE_ARGS_ARG,
             "-" + AMOptions.NAMENODE_ARGS_ARG,

+ 2 - 1
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/CreateFileMapper.java

@@ -48,7 +48,8 @@ import org.apache.hadoop.mapreduce.Mapper;
  * </ul>
  * </ul>
  */
  */
 public class CreateFileMapper
 public class CreateFileMapper
-    extends WorkloadMapper<NullWritable, NullWritable> {
+    extends WorkloadMapper<NullWritable, NullWritable, NullWritable,
+    NullWritable> {
 
 
   public static final String NUM_MAPPERS_KEY = "createfile.num-mappers";
   public static final String NUM_MAPPERS_KEY = "createfile.num-mappers";
   public static final String DURATION_MIN_KEY = "createfile.duration-min";
   public static final String DURATION_MIN_KEY = "createfile.duration-min";

+ 10 - 17
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java

@@ -32,9 +32,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.ToolRunner;
@@ -124,7 +122,7 @@ public class WorkloadDriver extends Configured implements Tool {
       startTimestampMs = tmpConf.getTimeDuration(tmpConfKey, 0,
       startTimestampMs = tmpConf.getTimeDuration(tmpConfKey, 0,
           TimeUnit.MILLISECONDS) + System.currentTimeMillis();
           TimeUnit.MILLISECONDS) + System.currentTimeMillis();
     }
     }
-    Class<? extends WorkloadMapper<?, ?>> mapperClass = getMapperClass(
+    Class<? extends WorkloadMapper<?, ?, ?, ?>> mapperClass = getMapperClass(
         cli.getOptionValue(MAPPER_CLASS_NAME));
         cli.getOptionValue(MAPPER_CLASS_NAME));
     if (!mapperClass.newInstance().verifyConfigurations(getConf())) {
     if (!mapperClass.newInstance().verifyConfigurations(getConf())) {
       System.err
       System.err
@@ -140,8 +138,9 @@ public class WorkloadDriver extends Configured implements Tool {
   }
   }
 
 
   public static Job getJobForSubmission(Configuration baseConf, String nnURI,
   public static Job getJobForSubmission(Configuration baseConf, String nnURI,
-      long startTimestampMs, Class<? extends WorkloadMapper<?, ?>> mapperClass)
-      throws IOException, InstantiationException, IllegalAccessException {
+      long startTimestampMs, Class<? extends WorkloadMapper<?, ?, ?, ?>>
+      mapperClass) throws IOException, InstantiationException,
+      IllegalAccessException {
     Configuration conf = new Configuration(baseConf);
     Configuration conf = new Configuration(baseConf);
     conf.set(NN_URI, nnURI);
     conf.set(NN_URI, nnURI);
     conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
     conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
@@ -153,16 +152,9 @@ public class WorkloadDriver extends Configured implements Tool {
     conf.setLong(START_TIMESTAMP_MS, startTimestampMs);
     conf.setLong(START_TIMESTAMP_MS, startTimestampMs);
 
 
     Job job = Job.getInstance(conf, "Dynamometer Workload Driver");
     Job job = Job.getInstance(conf, "Dynamometer Workload Driver");
-    job.setOutputFormatClass(NullOutputFormat.class);
     job.setJarByClass(mapperClass);
     job.setJarByClass(mapperClass);
     job.setMapperClass(mapperClass);
     job.setMapperClass(mapperClass);
-    job.setInputFormatClass(mapperClass.newInstance().getInputFormat(conf));
-    job.setOutputFormatClass(NullOutputFormat.class);
-    job.setNumReduceTasks(0);
-    job.setMapOutputKeyClass(NullWritable.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(NullWritable.class);
+    mapperClass.newInstance().configureJob(job);
 
 
     return job;
     return job;
   }
   }
@@ -175,8 +167,8 @@ public class WorkloadDriver extends Configured implements Tool {
   // The cast is actually checked via isAssignableFrom but the compiler doesn't
   // The cast is actually checked via isAssignableFrom but the compiler doesn't
   // recognize this
   // recognize this
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
-  private Class<? extends WorkloadMapper<?, ?>> getMapperClass(String className)
-      throws ClassNotFoundException {
+  private Class<? extends WorkloadMapper<?, ?, ?, ?>> getMapperClass(
+      String className) throws ClassNotFoundException {
     if (!className.contains(".")) {
     if (!className.contains(".")) {
       className = WorkloadDriver.class.getPackage().getName() + "." + className;
       className = WorkloadDriver.class.getPackage().getName() + "." + className;
     }
     }
@@ -185,13 +177,14 @@ public class WorkloadDriver extends Configured implements Tool {
       throw new IllegalArgumentException(className + " is not a subclass of "
       throw new IllegalArgumentException(className + " is not a subclass of "
           + WorkloadMapper.class.getCanonicalName());
           + WorkloadMapper.class.getCanonicalName());
     }
     }
-    return (Class<? extends WorkloadMapper<?, ?>>) mapperClass;
+    return (Class<? extends WorkloadMapper<?, ?, ?, ?>>) mapperClass;
   }
   }
 
 
   private String getMapperUsageInfo(String mapperClassName)
   private String getMapperUsageInfo(String mapperClassName)
       throws ClassNotFoundException, InstantiationException,
       throws ClassNotFoundException, InstantiationException,
       IllegalAccessException {
       IllegalAccessException {
-    WorkloadMapper<?, ?> mapper = getMapperClass(mapperClassName).newInstance();
+    WorkloadMapper<?, ?, ?, ?> mapper = getMapperClass(mapperClassName)
+        .newInstance();
     StringBuilder builder = new StringBuilder("Usage for ");
     StringBuilder builder = new StringBuilder("Usage for ");
     builder.append(mapper.getClass().getSimpleName());
     builder.append(mapper.getClass().getSimpleName());
     builder.append(":\n");
     builder.append(":\n");

+ 18 - 13
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadMapper.java

@@ -21,25 +21,18 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 
 
 /**
 /**
  * Represents the base class for a generic workload-generating mapper. By
  * Represents the base class for a generic workload-generating mapper. By
  * default, it will expect to use {@link VirtualInputFormat} as its
  * default, it will expect to use {@link VirtualInputFormat} as its
- * {@link InputFormat}. Subclasses expecting a different {@link InputFormat}
- * should override the {@link #getInputFormat(Configuration)} method.
+ * {@link InputFormat}. Subclasses requiring a reducer or expecting a different
+ * {@link InputFormat} should override the {@link #configureJob(Job)} method.
  */
  */
-public abstract class WorkloadMapper<KEYIN, VALUEIN>
-    extends Mapper<KEYIN, VALUEIN, NullWritable, NullWritable> {
-
-  /**
-   * Return the input class to be used by this mapper.
-   * @param conf configuration.
-   * @return the {@link InputFormat} implementation for the mapper.
-   */
-  public Class<? extends InputFormat> getInputFormat(Configuration conf) {
-    return VirtualInputFormat.class;
-  }
+public abstract class WorkloadMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
+    Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
 
   /**
   /**
    * Get the description of the behavior of this mapper.
    * Get the description of the behavior of this mapper.
@@ -62,4 +55,16 @@ public abstract class WorkloadMapper<KEYIN, VALUEIN>
    */
    */
   public abstract boolean verifyConfigurations(Configuration conf);
   public abstract boolean verifyConfigurations(Configuration conf);
 
 
+  /**
+   * Setup input and output formats and optional reducer.
+   */
+  public void configureJob(Job job) {
+    job.setInputFormatClass(VirtualInputFormat.class);
+
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
 }
 }

+ 30 - 9
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java

@@ -20,6 +20,10 @@ package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Lists;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.function.Function;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadDriver;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadDriver;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper;
 import java.io.IOException;
 import java.io.IOException;
@@ -35,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -73,9 +76,11 @@ import static org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditR
  * are replayed. For example, a rate factor of 2 would make the replay occur
  * are replayed. For example, a rate factor of 2 would make the replay occur
  * twice as fast, and a rate factor of 0.5 would make it occur half as fast.
  * twice as fast, and a rate factor of 0.5 would make it occur half as fast.
  */
  */
-public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
+public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text,
+    UserCommandKey, CountTimeWritable> {
 
 
   public static final String INPUT_PATH_KEY = "auditreplay.input-path";
   public static final String INPUT_PATH_KEY = "auditreplay.input-path";
+  public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
   public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
   public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
   public static final int NUM_THREADS_DEFAULT = 1;
   public static final int NUM_THREADS_DEFAULT = 1;
   public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
   public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
@@ -170,11 +175,6 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
   private AuditCommandParser commandParser;
   private AuditCommandParser commandParser;
   private ScheduledThreadPoolExecutor progressExecutor;
   private ScheduledThreadPoolExecutor progressExecutor;
 
 
-  @Override
-  public Class<? extends InputFormat> getInputFormat(Configuration conf) {
-    return NoSplitTextInputFormat.class;
-  }
-
   @Override
   @Override
   public String getDescription() {
   public String getDescription() {
     return "This mapper replays audit log files.";
     return "This mapper replays audit log files.";
@@ -185,6 +185,7 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
     return Lists.newArrayList(
     return Lists.newArrayList(
         INPUT_PATH_KEY
         INPUT_PATH_KEY
             + " (required): Path to directory containing input files.",
             + " (required): Path to directory containing input files.",
+        OUTPUT_PATH_KEY + " (required): Path to destination for output files.",
         NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT
         NUM_THREADS_KEY + " (default " + NUM_THREADS_DEFAULT
             + "): Number of threads to use per mapper for replay.",
             + "): Number of threads to use per mapper for replay.",
         CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT
         CREATE_BLOCKS_KEY + " (default " + CREATE_BLOCKS_DEFAULT
@@ -199,7 +200,8 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
 
 
   @Override
   @Override
   public boolean verifyConfigurations(Configuration conf) {
   public boolean verifyConfigurations(Configuration conf) {
-    return conf.get(INPUT_PATH_KEY) != null;
+    return conf.get(INPUT_PATH_KEY) != null
+        && conf.get(OUTPUT_PATH_KEY) != null;
   }
   }
 
 
   @Override
   @Override
@@ -256,7 +258,8 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
   }
   }
 
 
   @Override
   @Override
-  public void cleanup(Mapper.Context context) throws InterruptedException {
+  public void cleanup(Mapper.Context context)
+      throws InterruptedException, IOException {
     for (AuditReplayThread t : threads) {
     for (AuditReplayThread t : threads) {
       // Add in an indicator for each thread to shut down after the last real
       // Add in an indicator for each thread to shut down after the last real
       // command
       // command
@@ -266,6 +269,7 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
     for (AuditReplayThread t : threads) {
     for (AuditReplayThread t : threads) {
       t.join();
       t.join();
       t.drainCounters(context);
       t.drainCounters(context);
+      t.drainCommandLatencies(context);
       if (t.getException() != null) {
       if (t.getException() != null) {
         threadException = Optional.of(t.getException());
         threadException = Optional.of(t.getException());
       }
       }
@@ -287,4 +291,21 @@ public class AuditReplayMapper extends WorkloadMapper<LongWritable, Text> {
       LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps);
       LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps);
     }
     }
   }
   }
+
+  @Override
+  public void configureJob(Job job) {
+    job.setMapOutputKeyClass(UserCommandKey.class);
+    job.setMapOutputValueClass(CountTimeWritable.class);
+    job.setInputFormatClass(NoSplitTextInputFormat.class);
+
+    job.setNumReduceTasks(1);
+    job.setReducerClass(AuditReplayReducer.class);
+    job.setOutputKeyClass(UserCommandKey.class);
+    job.setOutputValueClass(CountTimeWritable.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    TextOutputFormat.setOutputPath(job, new Path(
+        job.getConfiguration().get(OUTPUT_PATH_KEY)));
+    job.getConfiguration().set(TextOutputFormat.SEPARATOR, ",");
+  }
 }
 }

+ 44 - 0
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayReducer.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
+
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+/**
+ * AuditReplayReducer aggregates the returned latency values from
+ * {@link AuditReplayMapper} and sums them up by {@link UserCommandKey}, which
+ * combines the user's id that ran the command and the type of the command
+ * (READ/WRITE).
+ */
+public class AuditReplayReducer extends Reducer<UserCommandKey,
+    CountTimeWritable, UserCommandKey, CountTimeWritable> {
+
+  @Override
+  protected void reduce(UserCommandKey key, Iterable<CountTimeWritable> values,
+      Context context) throws IOException, InterruptedException {
+    long countSum = 0;
+    long timeSum = 0;
+    for (CountTimeWritable v : values) {
+      countSum += v.getCount();
+      timeSum += v.getTime();
+    }
+    context.write(key, new CountTimeWritable(countSum, timeSum));
+  }
+}

+ 18 - 0
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayThread.java

@@ -76,6 +76,8 @@ public class AuditReplayThread extends Thread {
   // and merge them all together at the end.
   // and merge them all together at the end.
   private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
   private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
   private Map<String, Counter> individualCommandsMap = new HashMap<>();
   private Map<String, Counter> individualCommandsMap = new HashMap<>();
+  private Map<UserCommandKey, CountTimeWritable> commandLatencyMap
+      = new HashMap<>();
 
 
   AuditReplayThread(Mapper.Context mapperContext,
   AuditReplayThread(Mapper.Context mapperContext,
       DelayQueue<AuditReplayCommand> queue,
       DelayQueue<AuditReplayCommand> queue,
@@ -123,6 +125,14 @@ public class AuditReplayThread extends Thread {
     }
     }
   }
   }
 
 
+  void drainCommandLatencies(Mapper.Context context)
+      throws InterruptedException, IOException {
+    for (Map.Entry<UserCommandKey, CountTimeWritable> ent
+        : commandLatencyMap.entrySet()) {
+      context.write(ent.getKey(), ent.getValue());
+    }
+  }
+
   /**
   /**
    * Add a command to this thread's processing queue.
    * Add a command to this thread's processing queue.
    *
    *
@@ -279,6 +289,14 @@ public class AuditReplayThread extends Thread {
         throw new RuntimeException("Unexpected command: " + replayCommand);
         throw new RuntimeException("Unexpected command: " + replayCommand);
       }
       }
       long latency = System.currentTimeMillis() - startTime;
       long latency = System.currentTimeMillis() - startTime;
+
+      UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(),
+          replayCommand.toString(), replayCommand.getType().toString());
+      commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable());
+      CountTimeWritable latencyWritable = commandLatencyMap.get(userCommandKey);
+      latencyWritable.setCount(latencyWritable.getCount() + 1);
+      latencyWritable.setTime(latencyWritable.getTime() + latency);
+
       switch (replayCommand.getType()) {
       switch (replayCommand.getType()) {
       case WRITE:
       case WRITE:
         replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY)
         replayCountersMap.get(REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY)

+ 82 - 0
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/CountTimeWritable.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * UserCommandKey is a {@link Writable} used as a composite value that
+ * accumulates the count and cumulative latency of replayed commands. It is
+ * used as the output value for AuditReplayMapper and AuditReplayReducer.
+ */
+public class CountTimeWritable implements Writable {
+  private LongWritable count;
+  private LongWritable time;
+
+  public CountTimeWritable() {
+    count = new LongWritable();
+    time = new LongWritable();
+  }
+
+  public CountTimeWritable(LongWritable count, LongWritable time) {
+    this.count = count;
+    this.time = time;
+  }
+
+  public CountTimeWritable(long count, long time) {
+    this.count = new LongWritable(count);
+    this.time = new LongWritable(time);
+  }
+
+  public long getCount() {
+    return count.get();
+  }
+
+  public long getTime() {
+    return time.get();
+  }
+
+  public void setCount(long count) {
+    this.count.set(count);
+  }
+
+  public void setTime(long time) {
+    this.time.set(time);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    count.write(out);
+    time.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    count.readFields(in);
+    time.readFields(in);
+  }
+
+  @Override
+  public String toString() {
+    return getCount() + "," + getTime();
+  }
+}

+ 111 - 0
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/UserCommandKey.java

@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+import javax.annotation.Nonnull;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * UserCommandKey is a {@link org.apache.hadoop.io.Writable} used as a composite
+ * key combining the user id, name, and type of a replayed command. It is used
+ * as the output key for AuditReplayMapper and the keys for AuditReplayReducer.
+ */
+public class UserCommandKey implements WritableComparable {
+  private Text user;
+  private Text command;
+  private Text type;
+
+  public UserCommandKey() {
+    user = new Text();
+    command = new Text();
+    type = new Text();
+  }
+
+  public UserCommandKey(Text user, Text command, Text type) {
+    this.user = user;
+    this.command = command;
+    this.type = type;
+  }
+
+  public UserCommandKey(String user, String command, String type) {
+    this.user = new Text(user);
+    this.command = new Text(command);
+    this.type = new Text(type);
+  }
+
+  public String getUser() {
+    return user.toString();
+  }
+
+  public String getCommand() {
+    return command.toString();
+  }
+
+  public String getType() {
+    return type.toString();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    user.write(out);
+    command.write(out);
+    type.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    user.readFields(in);
+    command.readFields(in);
+    type.readFields(in);
+  }
+
+  @Override
+  public int compareTo(@Nonnull Object o) {
+    return toString().compareTo(o.toString());
+  }
+
+  @Override
+  public String toString() {
+    return getUser() + "," + getType() + "," + getCommand();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UserCommandKey that = (UserCommandKey) o;
+    return getUser().equals(that.getUser()) &&
+        getCommand().equals(that.getCommand()) &&
+        getType().equals(that.getType());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getUser(), getCommand(), getType());
+  }
+}

+ 35 - 7
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/TestWorkloadGenerator.java

@@ -17,11 +17,15 @@
  */
  */
 package org.apache.hadoop.tools.dynamometer.workloadgenerator;
 package org.apache.hadoop.tools.dynamometer.workloadgenerator;
 
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditCommandParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditCommandParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogHiveTableParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper;
 import java.io.IOException;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,9 +38,12 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ImpersonationProvider;
 import org.apache.hadoop.security.authorize.ImpersonationProvider;
+import org.jline.utils.Log;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -46,6 +53,8 @@ import static org.junit.Assert.assertTrue;
 
 
 /** Tests for {@link WorkloadDriver} and related classes. */
 /** Tests for {@link WorkloadDriver} and related classes. */
 public class TestWorkloadGenerator {
 public class TestWorkloadGenerator {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestWorkloadGenerator.class);
 
 
   private Configuration conf;
   private Configuration conf;
   private MiniDFSCluster miniCluster;
   private MiniDFSCluster miniCluster;
@@ -73,22 +82,27 @@ public class TestWorkloadGenerator {
   }
   }
 
 
   @Test
   @Test
-  public void testAuditWorkloadDirectParser() throws Exception {
+  public void testAuditWorkloadDirectParserWithOutput() throws Exception {
     String workloadInputPath = TestWorkloadGenerator.class.getClassLoader()
     String workloadInputPath = TestWorkloadGenerator.class.getClassLoader()
         .getResource("audit_trace_direct").toString();
         .getResource("audit_trace_direct").toString();
+    String auditOutputPath = "/tmp/trace_output_direct";
     conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
     conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
+    conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath);
     conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60 * 1000);
     conf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60 * 1000);
-    testAuditWorkload();
+    testAuditWorkloadWithOutput(auditOutputPath);
   }
   }
 
 
   @Test
   @Test
-  public void testAuditWorkloadHiveParser() throws Exception {
-    String workloadInputPath = TestWorkloadGenerator.class.getClassLoader()
-        .getResource("audit_trace_hive").toString();
+  public void testAuditWorkloadHiveParserWithOutput() throws Exception {
+    String workloadInputPath =
+        TestWorkloadGenerator.class.getClassLoader()
+            .getResource("audit_trace_hive").toString();
+    String auditOutputPath = "/tmp/trace_output_hive";
     conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
     conf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
+    conf.set(AuditReplayMapper.OUTPUT_PATH_KEY, auditOutputPath);
     conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY,
     conf.setClass(AuditReplayMapper.COMMAND_PARSER_KEY,
         AuditLogHiveTableParser.class, AuditCommandParser.class);
         AuditLogHiveTableParser.class, AuditCommandParser.class);
-    testAuditWorkload();
+    testAuditWorkloadWithOutput(auditOutputPath);
   }
   }
 
 
   /**
   /**
@@ -114,7 +128,8 @@ public class TestWorkloadGenerator {
     }
     }
   }
   }
 
 
-  private void testAuditWorkload() throws Exception {
+  private void testAuditWorkloadWithOutput(String auditOutputPath)
+      throws Exception {
     long workloadStartTime = System.currentTimeMillis() + 10000;
     long workloadStartTime = System.currentTimeMillis() + 10000;
     Job workloadJob = WorkloadDriver.getJobForSubmission(conf,
     Job workloadJob = WorkloadDriver.getJobForSubmission(conf,
         dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class);
         dfs.getUri().toString(), workloadStartTime, AuditReplayMapper.class);
@@ -132,5 +147,18 @@ public class TestWorkloadGenerator {
     assertTrue(
     assertTrue(
         dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory());
         dfs.getFileStatus(new Path("/tmp/testDirRenamed")).isDirectory());
     assertFalse(dfs.exists(new Path("/denied")));
     assertFalse(dfs.exists(new Path("/denied")));
+
+    assertTrue(dfs.exists(new Path(auditOutputPath)));
+    try (FSDataInputStream auditOutputFile = dfs.open(new Path(auditOutputPath,
+        "part-r-00000"))) {
+      String auditOutput = IOUtils.toString(auditOutputFile,
+          StandardCharsets.UTF_8);
+      Log.info(auditOutput);
+      assertTrue(auditOutput.matches(
+          ".*(hdfs,WRITE,[A-Z]+,[13]+,[0-9]+\\n){3}.*"));
+      // Matches three lines of the format "hdfs,WRITE,name,count,time"
+      // Using [13] for the count group because each operation is run either
+      // 1 or 3 times but the output order isn't guaranteed
+    }
   }
   }
 }
 }

+ 3 - 0
hadoop-tools/hadoop-dynamometer/src/site/markdown/Dynamometer.md

@@ -232,6 +232,9 @@ within that file. A best effort is made to faithfully replay the audit log event
 originally occurred (optionally, this can be adjusted by specifying `auditreplay.rate-factor` which is a multiplicative
 originally occurred (optionally, this can be adjusted by specifying `auditreplay.rate-factor` which is a multiplicative
 factor towards the rate of replay, e.g. use 2.0 to replay the events at twice the original speed).
 factor towards the rate of replay, e.g. use 2.0 to replay the events at twice the original speed).
 
 
+The AuditReplayMapper will output the benchmark results to a file `part-r-00000` in the output directory in CSV format.
+Each line is in the format `user,type,operation,numops,cumulativelatency`, e.g. `hdfs,WRITE,MKDIRS,2,150`.
+
 ### Integrated Workload Launch
 ### Integrated Workload Launch
 
 
 To have the infrastructure application client launch the workload automatically, parameters for the workload job
 To have the infrastructure application client launch the workload automatically, parameters for the workload job