浏览代码

HADOOP-16418. [Dynamometer] Fix checkstyle and findbugs warnings. Contributed by Erik Krogen.

Erik Krogen 5 年之前
父节点
当前提交
fc0656dd30
共有 10 个文件被更改,包括 132 次插入136 次删除
  1. 8 42
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/main/java/org/apache/hadoop/tools/dynamometer/blockgenerator/BlockInfo.java
  2. 1 1
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/main/java/org/apache/hadoop/tools/dynamometer/blockgenerator/GenerateDNBlockInfosReducer.java
  3. 4 0
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/main/java/org/apache/hadoop/tools/dynamometer/blockgenerator/XMLParser.java
  4. 4 3
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/test/java/org/apache/hadoop/tools/dynamometer/blockgenerator/TestXMLParser.java
  5. 5 4
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/AMOptions.java
  6. 5 5
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java
  7. 6 1
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/DynoInfraUtils.java
  8. 7 6
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/SimulatedDataNodes.java
  9. 84 68
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java
  10. 8 6
      hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java

+ 8 - 42
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/main/java/org/apache/hadoop/tools/dynamometer/blockgenerator/BlockInfo.java

@@ -34,69 +34,35 @@ import org.apache.hadoop.io.Writable;
 
 
 public class BlockInfo implements Writable {
 public class BlockInfo implements Writable {
 
 
-  public LongWritable getBlockId() {
+  LongWritable getBlockId() {
     return blockId;
     return blockId;
   }
   }
 
 
-  public void setBlockId(LongWritable blockId) {
-    this.blockId = blockId;
-  }
-
-  public LongWritable getBlockGenerationStamp() {
+  LongWritable getBlockGenerationStamp() {
     return blockGenerationStamp;
     return blockGenerationStamp;
   }
   }
 
 
-  public void setBlockGenerationStamp(LongWritable blockGenerationStamp) {
-    this.blockGenerationStamp = blockGenerationStamp;
-  }
-
-  public LongWritable getSize() {
+  LongWritable getSize() {
     return size;
     return size;
   }
   }
 
 
-  public void setSize(LongWritable size) {
-    this.size = size;
-  }
-
-  public short getReplication() {
+  short getReplication() {
     return replication;
     return replication;
   }
   }
 
 
-  public void setReplication(short replication) {
-    this.replication = replication;
-  }
-
   private LongWritable blockId;
   private LongWritable blockId;
   private LongWritable blockGenerationStamp;
   private LongWritable blockGenerationStamp;
   private LongWritable size;
   private LongWritable size;
   private transient short replication;
   private transient short replication;
 
 
-  public BlockInfo(BlockInfo blockInfo) {
-    this.blockId = blockInfo.getBlockId();
-    this.blockGenerationStamp = blockInfo.getBlockGenerationStamp();
-    this.size = blockInfo.getSize();
-    this.replication = replication;
-  }
-
-  public BlockInfo() {
+  @SuppressWarnings("unused") // Used via reflection
+  private BlockInfo() {
     this.blockId = new LongWritable();
     this.blockId = new LongWritable();
     this.blockGenerationStamp = new LongWritable();
     this.blockGenerationStamp = new LongWritable();
-    this.size = new LongWritable(1);
-  }
-
-  public BlockInfo(long blockid, long blockgenerationstamp) {
-    this.blockId = new LongWritable(blockid);
-    this.blockGenerationStamp = new LongWritable(blockgenerationstamp);
-    this.size = new LongWritable(1);
-  }
-
-  public BlockInfo(long blockid, long blockgenerationstamp, long size) {
-    this.blockId = new LongWritable(blockid);
-    this.blockGenerationStamp = new LongWritable(blockgenerationstamp);
-    this.size = new LongWritable(size);
+    this.size = new LongWritable();
   }
   }
 
 
-  public BlockInfo(long blockid, long blockgenerationstamp, long size,
+  BlockInfo(long blockid, long blockgenerationstamp, long size,
       short replication) {
       short replication) {
     this.blockId = new LongWritable(blockid);
     this.blockId = new LongWritable(blockid);
     this.blockGenerationStamp = new LongWritable(blockgenerationstamp);
     this.blockGenerationStamp = new LongWritable(blockgenerationstamp);

+ 1 - 1
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/main/java/org/apache/hadoop/tools/dynamometer/blockgenerator/GenerateDNBlockInfosReducer.java

@@ -76,7 +76,7 @@ public class GenerateDNBlockInfosReducer
 
 
     Text out = new Text();
     Text out = new Text();
     while (it.hasNext()) {
     while (it.hasNext()) {
-      BlockInfo blockInfo = new BlockInfo(it.next());
+      BlockInfo blockInfo = it.next();
       String blockLine = blockInfo.getBlockId() + ","
       String blockLine = blockInfo.getBlockId() + ","
           + blockInfo.getBlockGenerationStamp() + "," + blockInfo.getSize();
           + blockInfo.getBlockGenerationStamp() + "," + blockInfo.getSize();
       out.set(blockLine);
       out.set(blockLine);

+ 4 - 0
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/main/java/org/apache/hadoop/tools/dynamometer/blockgenerator/XMLParser.java

@@ -27,6 +27,10 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
+// Checkstyle complains about the XML tags even though they are wrapped
+// within <pre> and {@code} tags. The SupressWarnings has to go before the
+// Javadoc to take effect.
+@SuppressWarnings("checkstyle:javadocstyle")
 /**
 /**
  * This class parses an fsimage file in XML format. It accepts the file
  * This class parses an fsimage file in XML format. It accepts the file
  * line-by-line and maintains an internal state machine to keep track of
  * line-by-line and maintains an internal state machine to keep track of

+ 4 - 3
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-blockgen/src/test/java/org/apache/hadoop/tools/dynamometer/blockgenerator/TestXMLParser.java

@@ -51,10 +51,11 @@ public class TestXMLParser {
         "</INodeSection>"
         "</INodeSection>"
     };
     };
 
 
+    short replCount = 0; // This is ignored
     Map<BlockInfo, Short> expectedBlockCount = new HashMap<>();
     Map<BlockInfo, Short> expectedBlockCount = new HashMap<>();
-    expectedBlockCount.put(new BlockInfo(6, 7, 8), (short) 3);
-    expectedBlockCount.put(new BlockInfo(9, 10, 11), (short) 3);
-    expectedBlockCount.put(new BlockInfo(13, 14, 15), (short) 12);
+    expectedBlockCount.put(new BlockInfo(6, 7, 8, replCount), (short) 3);
+    expectedBlockCount.put(new BlockInfo(9, 10, 11, replCount), (short) 3);
+    expectedBlockCount.put(new BlockInfo(13, 14, 15, replCount), (short) 12);
 
 
     final Map<BlockInfo, Short> actualBlockCount = new HashMap<>();
     final Map<BlockInfo, Short> actualBlockCount = new HashMap<>();
     XMLParser parser = new XMLParser();
     XMLParser parser = new XMLParser();

+ 5 - 4
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/AMOptions.java

@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration;
  * Options supplied to the Client which are then passed through to the
  * Options supplied to the Client which are then passed through to the
  * ApplicationMaster.
  * ApplicationMaster.
  */
  */
-class AMOptions {
+final class AMOptions {
 
 
   public static final String NAMENODE_MEMORY_MB_ARG = "namenode_memory_mb";
   public static final String NAMENODE_MEMORY_MB_ARG = "namenode_memory_mb";
   public static final String NAMENODE_MEMORY_MB_DEFAULT = "2048";
   public static final String NAMENODE_MEMORY_MB_DEFAULT = "2048";
@@ -75,9 +75,10 @@ class AMOptions {
   // Extended shellEnv including custom environment variables
   // Extended shellEnv including custom environment variables
   private final Map<String, String> shellEnv;
   private final Map<String, String> shellEnv;
 
 
-  AMOptions(int datanodeMemoryMB, int datanodeVirtualCores, String datanodeArgs,
-      String datanodeNodeLabelExpression, int datanodesPerCluster,
-      String datanodeLaunchDelay, int namenodeMemoryMB,
+  @SuppressWarnings("checkstyle:parameternumber")
+  private AMOptions(int datanodeMemoryMB, int datanodeVirtualCores,
+      String datanodeArgs, String datanodeNodeLabelExpression,
+      int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB,
       int namenodeVirtualCores, String namenodeArgs,
       int namenodeVirtualCores, String namenodeArgs,
       String namenodeNodeLabelExpression, int namenodeMetricsPeriod,
       String namenodeNodeLabelExpression, int namenodeMetricsPeriod,
       String namenodeNameDir, String namenodeEditsDir,
       String namenodeNameDir, String namenodeEditsDir,

+ 5 - 5
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/Client.java

@@ -860,11 +860,11 @@ public class Client extends Configured implements Tool {
       String relativePath = file.getAbsolutePath()
       String relativePath = file.getAbsolutePath()
           .substring(root.getAbsolutePath().length() + 1);
           .substring(root.getAbsolutePath().length() + 1);
       try {
       try {
-        FileInputStream in = new FileInputStream(file.getAbsolutePath());
-        out.putNextEntry(new ZipEntry(relativePath));
-        IOUtils.copyBytes(in, out, getConf(), false);
-        out.closeEntry();
-        in.close();
+        try (FileInputStream in = new FileInputStream(file.getAbsolutePath())) {
+          out.putNextEntry(new ZipEntry(relativePath));
+          IOUtils.copyBytes(in, out, getConf(), false);
+          out.closeEntry();
+        }
       } catch (FileNotFoundException fnfe) {
       } catch (FileNotFoundException fnfe) {
         LOG.warn("Skipping file; it is a symlink with a nonexistent target: {}",
         LOG.warn("Skipping file; it is a symlink with a nonexistent target: {}",
             file);
             file);

+ 6 - 1
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/DynoInfraUtils.java

@@ -139,7 +139,11 @@ public final class DynoInfraUtils {
           APACHE_DOWNLOAD_MIRROR_DEFAULT);
           APACHE_DOWNLOAD_MIRROR_DEFAULT);
     }
     }
 
 
-    destinationDir.mkdirs();
+    if (!destinationDir.exists()) {
+      if (!destinationDir.mkdirs()) {
+        throw new IOException("Unable to create local dir: " + destinationDir);
+      }
+    }
     URL downloadURL = new URL(apacheMirror + String
     URL downloadURL = new URL(apacheMirror + String
         .format(APACHE_DOWNLOAD_MIRROR_SUFFIX_FORMAT, version, version));
         .format(APACHE_DOWNLOAD_MIRROR_SUFFIX_FORMAT, version, version));
     log.info("Downloading tarball from: <{}> to <{}>", downloadURL,
     log.info("Downloading tarball from: <{}> to <{}>", downloadURL,
@@ -441,6 +445,7 @@ public final class DynoInfraUtils {
    * @param shouldExit Should return true iff this should stop waiting.
    * @param shouldExit Should return true iff this should stop waiting.
    * @param log Where to log information.
    * @param log Where to log information.
    */
    */
+  @SuppressWarnings("checkstyle:parameternumber")
   private static void waitForNameNodeJMXValue(String valueName,
   private static void waitForNameNodeJMXValue(String valueName,
       String jmxBeanQuery, String jmxProperty, double threshold,
       String jmxBeanQuery, String jmxProperty, double threshold,
       double printThreshold, boolean decreasing, Properties nameNodeProperties,
       double printThreshold, boolean decreasing, Properties nameNodeProperties,

+ 7 - 6
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/main/java/org/apache/hadoop/tools/dynamometer/SimulatedDataNodes.java

@@ -21,6 +21,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.InputStreamReader;
 import java.net.URI;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.List;
@@ -79,7 +80,7 @@ public class SimulatedDataNodes extends Configured implements Tool {
   static void printUsageExit(String err) {
   static void printUsageExit(String err) {
     System.out.println(err);
     System.out.println(err);
     System.out.println(USAGE);
     System.out.println(USAGE);
-    System.exit(1);
+    throw new RuntimeException(err);
   }
   }
 
 
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
@@ -120,7 +121,7 @@ public class SimulatedDataNodes extends Configured implements Tool {
       mc.formatDataNodeDirs();
       mc.formatDataNodeDirs();
     } catch (IOException e) {
     } catch (IOException e) {
       System.out.println("Error formatting DataNode dirs: " + e);
       System.out.println("Error formatting DataNode dirs: " + e);
-      System.exit(1);
+      throw new RuntimeException("Error formatting DataNode dirs", e);
     }
     }
 
 
     try {
     try {
@@ -144,10 +145,10 @@ public class SimulatedDataNodes extends Configured implements Tool {
 
 
       for (int dnIndex = 0; dnIndex < blockListFiles.size(); dnIndex++) {
       for (int dnIndex = 0; dnIndex < blockListFiles.size(); dnIndex++) {
         Path blockListFile = blockListFiles.get(dnIndex);
         Path blockListFile = blockListFiles.get(dnIndex);
-        try (FSDataInputStream fsdis = blockListFile.getFileSystem(getConf())
-            .open(blockListFile)) {
-          BufferedReader reader = new BufferedReader(
-              new InputStreamReader(fsdis));
+        try (FSDataInputStream fsdis =
+            blockListFile.getFileSystem(getConf()).open(blockListFile);
+            BufferedReader reader = new BufferedReader(
+                new InputStreamReader(fsdis, StandardCharsets.UTF_8))) {
           List<Block> blockList = new ArrayList<>();
           List<Block> blockList = new ArrayList<>();
           int cnt = 0;
           int cnt = 0;
           for (String line = reader.readLine(); line != null; line = reader
           for (String line = reader.readLine(); line != null; line = reader

+ 84 - 68
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-infra/src/test/java/org/apache/hadoop/tools/dynamometer/TestDynamometerInfra.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.tools.dynamometer;
 
 
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets;
 import java.util.Optional;
 import java.util.Optional;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import org.apache.hadoop.test.PlatformAssumptions;
 import org.apache.hadoop.test.PlatformAssumptions;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
 import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
@@ -307,59 +308,12 @@ public class TestDynamometerInfra {
 
 
   @Test(timeout = 15 * 60 * 1000)
   @Test(timeout = 15 * 60 * 1000)
   public void testNameNodeInYARN() throws Exception {
   public void testNameNodeInYARN() throws Exception {
-    final Client client = new Client(JarFinder.getJar(ApplicationMaster.class),
-        JarFinder.getJar(Assert.class));
     Configuration localConf = new Configuration(yarnConf);
     Configuration localConf = new Configuration(yarnConf);
     localConf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60000);
     localConf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60000);
-    client.setConf(localConf);
 
 
-    Thread appThread = new Thread(() -> {
-      try {
-        client.run(new String[] {"-" + Client.MASTER_MEMORY_MB_ARG, "128",
-            "-" + Client.CONF_PATH_ARG, confZip.toString(),
-            "-" + Client.BLOCK_LIST_PATH_ARG,
-            blockImageOutputDir.toString(), "-" + Client.FS_IMAGE_DIR_ARG,
-            fsImageTmpPath.getParent().toString(),
-            "-" + Client.HADOOP_BINARY_PATH_ARG,
-            hadoopTarballPath.getAbsolutePath(),
-            "-" + AMOptions.DATANODES_PER_CLUSTER_ARG, "2",
-            "-" + AMOptions.DATANODE_MEMORY_MB_ARG, "128",
-            "-" + AMOptions.DATANODE_NODELABEL_ARG, DATANODE_NODELABEL,
-            "-" + AMOptions.NAMENODE_MEMORY_MB_ARG, "256",
-            "-" + AMOptions.NAMENODE_METRICS_PERIOD_ARG, "1",
-            "-" + AMOptions.NAMENODE_NODELABEL_ARG, NAMENODE_NODELABEL,
-            "-" + AMOptions.SHELL_ENV_ARG,
-            "HADOOP_HOME=" + getHadoopHomeLocation(),
-            "-" + AMOptions.SHELL_ENV_ARG,
-            "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop",
-            "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
-            "-" + Client.WORKLOAD_INPUT_PATH_ARG,
-            fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
-            "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
-            "-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
-            "-" + AMOptions.NAMENODE_ARGS_ARG,
-            "-Ddfs.namenode.safemode.extension=0"});
-      } catch (Exception e) {
-        LOG.error("Error running client", e);
-      }
-    });
+    final Client client = createAndStartClient(localConf);
 
 
-    appThread.start();
-    LOG.info("Waiting for application ID to become available");
-    GenericTestUtils.waitFor(() -> {
-      try {
-        List<ApplicationReport> apps = yarnClient.getApplications();
-        if (apps.size() == 1) {
-          infraAppId = apps.get(0).getApplicationId();
-          return true;
-        } else if (apps.size() > 1) {
-          fail("Unexpected: more than one application");
-        }
-      } catch (IOException | YarnException e) {
-        fail("Unexpected exception: " + e);
-      }
-      return false;
-    }, 1000, 60000);
+    awaitApplicationStartup();
 
 
     Supplier<Boolean> falseSupplier = () -> false;
     Supplier<Boolean> falseSupplier = () -> false;
     Optional<Properties> namenodeProperties = DynoInfraUtils
     Optional<Properties> namenodeProperties = DynoInfraUtils
@@ -372,25 +326,7 @@ public class TestDynamometerInfra {
     DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false,
     DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false,
         falseSupplier, localConf, LOG);
         falseSupplier, localConf, LOG);
 
 
-    // Test that we can successfully write to / read from the cluster
-    try {
-      URI nameNodeUri =
-          DynoInfraUtils.getNameNodeHdfsUri(namenodeProperties.get());
-      DistributedFileSystem dynoFS =
-          (DistributedFileSystem) FileSystem.get(nameNodeUri, localConf);
-      Path testFile = new Path("/tmp/test/foo");
-      dynoFS.mkdir(testFile.getParent(), FsPermission.getDefault());
-      FSDataOutputStream out = dynoFS.create(testFile, (short) 1);
-      out.write(42);
-      out.hsync();
-      out.close();
-      FileStatus[] stats = dynoFS.listStatus(testFile.getParent());
-      assertEquals(1, stats.length);
-      assertEquals("foo", stats[0].getPath().getName());
-    } catch (IOException e) {
-      LOG.error("Failed to write or read", e);
-      throw e;
-    }
+    assertClusterIsFunctional(localConf, namenodeProperties.get());
 
 
     Map<ContainerId, Container> namenodeContainers = miniYARNCluster
     Map<ContainerId, Container> namenodeContainers = miniYARNCluster
         .getNodeManager(0).getNMContext().getContainers();
         .getNodeManager(0).getNMContext().getContainers();
@@ -459,6 +395,86 @@ public class TestDynamometerInfra {
     }, 3000, 60000);
     }, 3000, 60000);
   }
   }
 
 
+  private void assertClusterIsFunctional(Configuration localConf,
+      Properties namenodeProperties) throws IOException {
+    // Test that we can successfully write to / read from the cluster
+    try {
+      URI nameNodeUri = DynoInfraUtils.getNameNodeHdfsUri(namenodeProperties);
+      DistributedFileSystem dynoFS =
+          (DistributedFileSystem) FileSystem.get(nameNodeUri, localConf);
+      Path testFile = new Path("/tmp/test/foo");
+      dynoFS.mkdir(testFile.getParent(), FsPermission.getDefault());
+      FSDataOutputStream out = dynoFS.create(testFile, (short) 1);
+      out.write(42);
+      out.hsync();
+      out.close();
+      FileStatus[] stats = dynoFS.listStatus(testFile.getParent());
+      assertEquals(1, stats.length);
+      assertEquals("foo", stats[0].getPath().getName());
+    } catch (IOException e) {
+      LOG.error("Failed to write or read", e);
+      throw e;
+    }
+  }
+
+  private void awaitApplicationStartup()
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for application ID to become available");
+    GenericTestUtils.waitFor(() -> {
+      try {
+        List<ApplicationReport> apps = yarnClient.getApplications();
+        if (apps.size() == 1) {
+          infraAppId = apps.get(0).getApplicationId();
+          return true;
+        } else if (apps.size() > 1) {
+          fail("Unexpected: more than one application");
+        }
+      } catch (IOException | YarnException e) {
+        fail("Unexpected exception: " + e);
+      }
+      return false;
+    }, 1000, 60000);
+  }
+
+  private Client createAndStartClient(Configuration localConf) {
+    final Client client = new Client(JarFinder.getJar(ApplicationMaster.class),
+        JarFinder.getJar(Assert.class));
+    client.setConf(localConf);
+    Thread appThread = new Thread(() -> {
+      try {
+        client.run(new String[] {"-" + Client.MASTER_MEMORY_MB_ARG, "128",
+            "-" + Client.CONF_PATH_ARG, confZip.toString(),
+            "-" + Client.BLOCK_LIST_PATH_ARG,
+            blockImageOutputDir.toString(), "-" + Client.FS_IMAGE_DIR_ARG,
+            fsImageTmpPath.getParent().toString(),
+            "-" + Client.HADOOP_BINARY_PATH_ARG,
+            hadoopTarballPath.getAbsolutePath(),
+            "-" + AMOptions.DATANODES_PER_CLUSTER_ARG, "2",
+            "-" + AMOptions.DATANODE_MEMORY_MB_ARG, "128",
+            "-" + AMOptions.DATANODE_NODELABEL_ARG, DATANODE_NODELABEL,
+            "-" + AMOptions.NAMENODE_MEMORY_MB_ARG, "256",
+            "-" + AMOptions.NAMENODE_METRICS_PERIOD_ARG, "1",
+            "-" + AMOptions.NAMENODE_NODELABEL_ARG, NAMENODE_NODELABEL,
+            "-" + AMOptions.SHELL_ENV_ARG,
+            "HADOOP_HOME=" + getHadoopHomeLocation(),
+            "-" + AMOptions.SHELL_ENV_ARG,
+            "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop",
+            "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
+            "-" + Client.WORKLOAD_INPUT_PATH_ARG,
+            fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
+            "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
+            "-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
+            "-" + AMOptions.NAMENODE_ARGS_ARG,
+            "-Ddfs.namenode.safemode.extension=0"});
+      } catch (Exception e) {
+        LOG.error("Error running client", e);
+      }
+    });
+
+    appThread.start();
+    return client;
+  }
+
   private static URI getResourcePath(String resourceName) {
   private static URI getResourcePath(String resourceName) {
     try {
     try {
       return TestDynamometerInfra.class.getClassLoader()
       return TestDynamometerInfra.class.getClassLoader()

+ 8 - 6
hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/WorkloadDriver.java

@@ -124,7 +124,7 @@ public class WorkloadDriver extends Configured implements Tool {
       startTimestampMs = tmpConf.getTimeDuration(tmpConfKey, 0,
       startTimestampMs = tmpConf.getTimeDuration(tmpConfKey, 0,
           TimeUnit.MILLISECONDS) + System.currentTimeMillis();
           TimeUnit.MILLISECONDS) + System.currentTimeMillis();
     }
     }
-    Class<? extends WorkloadMapper> mapperClass = getMapperClass(
+    Class<? extends WorkloadMapper<?, ?>> mapperClass = getMapperClass(
         cli.getOptionValue(MAPPER_CLASS_NAME));
         cli.getOptionValue(MAPPER_CLASS_NAME));
     if (!mapperClass.newInstance().verifyConfigurations(getConf())) {
     if (!mapperClass.newInstance().verifyConfigurations(getConf())) {
       System.err
       System.err
@@ -140,9 +140,8 @@ public class WorkloadDriver extends Configured implements Tool {
   }
   }
 
 
   public static Job getJobForSubmission(Configuration baseConf, String nnURI,
   public static Job getJobForSubmission(Configuration baseConf, String nnURI,
-      long startTimestampMs, Class<? extends WorkloadMapper> mapperClass)
-      throws IOException, ClassNotFoundException, InstantiationException,
-      IllegalAccessException {
+      long startTimestampMs, Class<? extends WorkloadMapper<?, ?>> mapperClass)
+      throws IOException, InstantiationException, IllegalAccessException {
     Configuration conf = new Configuration(baseConf);
     Configuration conf = new Configuration(baseConf);
     conf.set(NN_URI, nnURI);
     conf.set(NN_URI, nnURI);
     conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
     conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
@@ -173,7 +172,10 @@ public class WorkloadDriver extends Configured implements Tool {
     System.exit(ToolRunner.run(driver, args));
     System.exit(ToolRunner.run(driver, args));
   }
   }
 
 
-  private Class<? extends WorkloadMapper> getMapperClass(String className)
+  // The cast is actually checked via isAssignableFrom but the compiler doesn't
+  // recognize this
+  @SuppressWarnings("unchecked")
+  private Class<? extends WorkloadMapper<?, ?>> getMapperClass(String className)
       throws ClassNotFoundException {
       throws ClassNotFoundException {
     if (!className.contains(".")) {
     if (!className.contains(".")) {
       className = WorkloadDriver.class.getPackage().getName() + "." + className;
       className = WorkloadDriver.class.getPackage().getName() + "." + className;
@@ -183,7 +185,7 @@ public class WorkloadDriver extends Configured implements Tool {
       throw new IllegalArgumentException(className + " is not a subclass of "
       throw new IllegalArgumentException(className + " is not a subclass of "
           + WorkloadMapper.class.getCanonicalName());
           + WorkloadMapper.class.getCanonicalName());
     }
     }
-    return (Class<? extends WorkloadMapper>) mapperClass;
+    return (Class<? extends WorkloadMapper<?, ?>>) mapperClass;
   }
   }
 
 
   private String getMapperUsageInfo(String mapperClassName)
   private String getMapperUsageInfo(String mapperClassName)