Browse Source

HADOOP-992. Fix MiniMR unit tests to use MiniDFS when specified, rather than the local FS. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@505362 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
a8cad5adbc

+ 3 - 0
CHANGES.txt

@@ -28,6 +28,9 @@ Branch 0.11 - unreleased
     from neededReplications after a replication target was selected.
     (Hairong Kuang via cutting)
 
+ 5. HADOOP-992.  Fix MiniMR unit tests to use MiniDFS when specified,
+    rather than the local FS.  (omalley via cutting)
+
 
 Release 0.11.0 - 2007-02-02
 

+ 15 - 10
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -52,14 +52,6 @@ public class JobConf extends Configuration {
     addDefaultResource("mapred-default.xml");
   }
   
-  private void initialize(Class exampleClass) {
-    initialize();
-    String jar = findContainingJar(exampleClass);
-    if (jar != null) {
-      setJar(jar);
-    }   
-  }
-  
   /**
    * Construct a map/reduce job configuration.
    */
@@ -72,7 +64,8 @@ public class JobConf extends Configuration {
    * @param exampleClass a class whose containing jar is used as the job's jar.
    */
   public JobConf(Class exampleClass) {
-    initialize(exampleClass);
+    initialize();
+    setJarByClass(exampleClass);
   }
   
   /**
@@ -93,7 +86,8 @@ public class JobConf extends Configuration {
    */
   public JobConf(Configuration conf, Class exampleClass) {
     this(conf);
-    initialize(exampleClass);
+    initialize();
+    setJarByClass(exampleClass);
   }
 
 
@@ -117,6 +111,17 @@ public class JobConf extends Configuration {
 
   public String getJar() { return get("mapred.jar"); }
   public void setJar(String jar) { set("mapred.jar", jar); }
+  
+  /**
+   * Set the job's jar file by finding an example class location.
+   * @param cls the example class
+   */
+  public void setJarByClass(Class cls) {
+    String jar = findContainingJar(cls);
+    if (jar != null) {
+      setJar(jar);
+    }   
+  }
 
   public Path getSystemDir() {
     return new Path(get("mapred.system.dir", "/tmp/hadoop/mapred/system"));

+ 4 - 6
src/test/org/apache/hadoop/mapred/MRCaching.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.MapReduceBase;
-import java.io.*;
 import org.apache.hadoop.filecache.*;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -125,12 +124,12 @@ public class MRCaching {
     }
   }
 
-  public static boolean launchMRCache(String jobTracker, String indir,
-      String outdir, String fileSys, JobConf conf, String input)
+  public static boolean launchMRCache(String indir,
+      String outdir, JobConf conf, String input)
       throws IOException {
     final Path inDir = new Path(indir);
     final Path outDir = new Path(outdir);
-    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    FileSystem fs = FileSystem.get(conf);
     fs.delete(outDir);
     if (!fs.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -140,8 +139,6 @@ public class MRCaching {
       file.writeBytes(input);
       file.close();
     }
-    conf.set("fs.default.name", fileSys);
-    conf.set("mapred.job.tracker", jobTracker);
     conf.setJobName("cachetest");
 
     // the keys are words (strings)
@@ -170,6 +167,7 @@ public class MRCaching {
     fs.copyFromLocalFile(jarPath, cacheTest);
     fs.copyFromLocalFile(zipPath, cacheTest);
     // setting the cached archives to zip, jar and simple text files
+    String fileSys = fs.getName();
     String archive1 = "dfs://" + fileSys + "/tmp/cachedir/test.jar";
     String archive2 = "dfs://" + fileSys + "/tmp/cachedir/test.zip"; 
     String file1 = "dfs://" + fileSys + "/tmp/cachedir/test.txt";

+ 15 - 14
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -56,13 +56,7 @@ public class MiniMRCluster {
          */
         public void run() {
             try {
-                JobConf jc = new JobConf();
-                jc.set("fs.name.node", namenode);
-                jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
-                jc.set("mapred.job.tracker.info.port", jobTrackerInfoPort);
-                // this timeout seems to control the minimum time for the test, so
-                // set it down at 2 seconds.
-                jc.setInt("ipc.client.timeout", 1000);
+                JobConf jc = createJobConf();
                 jc.set("mapred.local.dir","build/test/mapred/local");
                 JobTracker.startTracker(jc);
             } catch (Throwable e) {
@@ -105,12 +99,7 @@ public class MiniMRCluster {
          */
         public void run() {
             try {
-                JobConf jc = new JobConf();
-                jc.set("fs.name.node", namenode);
-                jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
-                // this timeout seems to control the minimum time for the test, so
-                // set it down at 2 seconds.
-                jc.setInt("ipc.client.timeout", 1000);
+                JobConf jc = createJobConf();
                 jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++);
                 jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++);
                 File localDir = new File(jc.get("mapred.local.dir"));
@@ -219,6 +208,17 @@ public class MiniMRCluster {
         return jobTrackerPort;
     }
 
+    public JobConf createJobConf() {
+      JobConf result = new JobConf();
+      result.set("fs.default.name", namenode);
+      result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
+      result.set("mapred.job.tracker.info.port", jobTrackerInfoPort);
+      // this timeout controls the minimum time for the test, so
+      // set it down at 1 seconds.
+      result.setInt("ipc.client.timeout", 1000);
+      return result;
+    }
+    
     /**
      * Create the config and start up the servers.  The ports supplied by the user are
      * just used as suggestions.  If those ports are already in use, new ports
@@ -229,7 +229,8 @@ public class MiniMRCluster {
                          int numTaskTrackers,
                          String namenode,
                          boolean taskTrackerFirst) throws IOException {
-        this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, taskTrackerFirst, 1);
+        this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+             taskTrackerFirst, 1);
     } 
   
     public MiniMRCluster(int jobTrackerPort,

+ 4 - 9
src/test/org/apache/hadoop/mapred/PiEstimator.java

@@ -134,20 +134,15 @@ public class PiEstimator {
    * This is the main driver for computing the value of Pi using
    * monte-carlo method.
    */
-  static double launch(int numMaps, int numPoints, String jt, String dfs)
+  static double launch(int numMaps, int numPoints, JobConf jobConf)
   throws IOException {
 
-    Configuration conf = new Configuration();
-    JobConf jobConf = new JobConf(conf, PiEstimator.class);
-    if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
-    if (dfs != null) { jobConf.set("fs.default.name", dfs); }
+    jobConf.setJarByClass(PiEstimator.class);
     jobConf.setJobName("test-mini-mr");
     
     // turn off speculative execution, because DFS doesn't handle
     // multiple writers to the same file.
     jobConf.setSpeculativeExecution(false);
-    jobConf.setInputKeyClass(IntWritable.class);
-    jobConf.setInputValueClass(IntWritable.class);
     jobConf.setInputFormat(SequenceFileInputFormat.class);
         
     jobConf.setOutputKeyClass(IntWritable.class);
@@ -211,7 +206,7 @@ public class PiEstimator {
         int nMaps = Integer.parseInt(argv[0]);
         int nSamples = Integer.parseInt(argv[1]);
         
-	System.out.println("Estimated value of PI is "+
-                launch(nMaps, nSamples, null, null));
+        System.out.println("Estimated value of PI is "+
+                           launch(nMaps, nSamples, new JobConf()));
     }
 }

+ 4 - 11
src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java

@@ -19,13 +19,9 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
-import java.util.*;
 import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
 
 /**
  * A JUnit test to test caching with DFS
@@ -37,19 +33,16 @@ public class TestMiniMRDFSCaching extends TestCase {
   public void testWithDFS() throws IOException {
     MiniMRCluster mr = null;
     MiniDFSCluster dfs = null;
-    String namenode = null;
     FileSystem fileSys = null;
     try {
       JobConf conf = new JobConf();
       dfs = new MiniDFSCluster(65314, conf, true);
       fileSys = dfs.getFileSystem();
-      namenode = fileSys.getName();
-      mr = new MiniMRCluster(60050, 50060, 2, namenode, true, 4);
+      mr = new MiniMRCluster(60050, 50060, 2, fileSys.getName(), true, 4);
       // run the wordcount example with caching
-      boolean ret = MRCaching.launchMRCache("localhost:"+mr.getJobTrackerPort(),
-                                            "/testing/wc/input",
-                                            "/testing/wc/output", namenode,
-                                            conf,
+      boolean ret = MRCaching.launchMRCache("/testing/wc/input",
+                                            "/testing/wc/output",
+                                            mr.createJobConf(),
                                             "The quick brown fox\nhas many silly\n"
                                                 + "red fox sox\n");
       assertTrue("Archives not matching", ret);

+ 7 - 6
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -35,20 +35,21 @@ public class TestMiniMRLocalFS extends TestCase {
       MiniMRCluster mr = null;
       try {
           mr = new MiniMRCluster(60030, 60040, 2, "local", false, 3);
-          String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
-          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobTrackerName, "local");
+          double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
+                                               mr.createJobConf());
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-          JobConf jconf = new JobConf();
           // run the wordcount example with caching
-          boolean ret = MRCaching.launchMRCache(jobTrackerName, "/tmp/wc/input",
-                                                "/tmp/wc/output", "local", jconf,
+          JobConf job = mr.createJobConf();
+          boolean ret = MRCaching.launchMRCache("/tmp/wc/input",
+                                                "/tmp/wc/output", 
+                                                job,
                                                 "The quick brown fox\nhas many silly\n"
                                                     + "red fox sox\n");
           // assert the number of lines read during caching
           assertTrue("Failed test archives not matching", ret);
           // test the task report fetchers
-          JobClient client = new JobClient(jconf);
+          JobClient client = new JobClient(job);
           TaskReport[] reports = client.getMapTaskReports("job_0001");
           assertEquals("number of maps", 10, reports.length);
           reports = client.getReduceTaskReports("job_0001");

+ 23 - 18
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -21,6 +21,9 @@ package org.apache.hadoop.mapred;
 import java.io.*;
 import java.util.*;
 import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,19 +38,19 @@ import org.apache.hadoop.examples.WordCount;
  * @author Milind Bhandarkar
  */
 public class TestMiniMRWithDFS extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestMiniMRWithDFS.class.getName());
   
-    static final int NUM_MAPS = 10;
-    static final int NUM_SAMPLES = 100000;
+  static final int NUM_MAPS = 10;
+  static final int NUM_SAMPLES = 100000;
   
-  public static String launchWordCount(String fileSys,
-                                       String jobTracker,
-                                       JobConf conf,
+  public static String launchWordCount(JobConf conf,
                                        String input,
                                        int numMaps,
                                        int numReduces) throws IOException {
     final Path inDir = new Path("/testing/wc/input");
     final Path outDir = new Path("/testing/wc/output");
-    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    FileSystem fs = FileSystem.get(conf);
     fs.delete(outDir);
     if (!fs.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -57,8 +60,6 @@ public class TestMiniMRWithDFS extends TestCase {
       file.writeBytes(input);
       file.close();
     }
-    conf.set("fs.default.name", fileSys);
-    conf.set("mapred.job.tracker", jobTracker);
     conf.setJobName("wordcount");
     conf.setInputFormat(TextInputFormat.class);
     
@@ -75,6 +76,12 @@ public class TestMiniMRWithDFS extends TestCase {
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);
     JobClient.runJob(conf);
+    return readOutput(outDir, conf);
+  }
+
+  public static String readOutput(Path outDir, 
+                                  JobConf conf) throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
     StringBuffer result = new StringBuffer();
     {
       Path[] fileList = fs.listPaths(outDir);
@@ -108,6 +115,7 @@ public class TestMiniMRWithDFS extends TestCase {
     for(int i=0; i < trackers; ++i) {
       int numNotDel = 0;
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
+      LOG.debug("Tracker directory: " + localDir);
       File trackerDir = new File(localDir, "taskTracker");
       assertTrue("local dir " + localDir + " does not exist.", 
                    localDir.isDirectory());
@@ -124,6 +132,7 @@ public class TestMiniMRWithDFS extends TestCase {
       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
         String name = contents[fileIdx];
         if (!("taskTracker".equals(contents[fileIdx]))) {
+          LOG.debug("Looking at " + name);
           int idx = neededDirs.indexOf(name);
           assertTrue("Spurious directory " + name + " found in " +
                      localDir, idx != -1);
@@ -141,7 +150,6 @@ public class TestMiniMRWithDFS extends TestCase {
   }
   
   public void testWithDFS() throws IOException {
-      String namenode = null;
       MiniDFSCluster dfs = null;
       MiniMRCluster mr = null;
       FileSystem fileSys = null;
@@ -152,22 +160,20 @@ public class TestMiniMRWithDFS extends TestCase {
           Configuration conf = new Configuration();
           dfs = new MiniDFSCluster(65314, conf, 4, true);
           fileSys = dfs.getFileSystem();
-          namenode = fileSys.getName();
           mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, 
-                                 namenode, true);
-          final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+                                 fileSys.getName(), true);
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, 
-                                               jobTrackerName, namenode);
+                                               mr.createJobConf());
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
           checkTaskDirectories(mr, new String[]{}, new String[]{});
           
           // Run a word count example
-          JobConf jobConf = new JobConf();
+          JobConf jobConf = mr.createJobConf();
           // Keeping tasks that match this pattern
           jobConf.setKeepTaskFilesPattern("task_[0-9]*_m_000001_.*");
           String result;
-          result = launchWordCount(namenode, jobTrackerName, jobConf, 
+          result = launchWordCount(jobConf, 
                                    "The quick brown fox\nhas many silly\n" + 
                                    "red fox sox\n",
                                    3, 1);
@@ -175,9 +181,8 @@ public class TestMiniMRWithDFS extends TestCase {
                        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
           checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
           // test with maps=0
-          jobConf = new JobConf();
-          result = launchWordCount(namenode, jobTrackerName, jobConf, 
-                                   "owen is oom", 0, 1);
+          jobConf = mr.createJobConf();
+          result = launchWordCount(jobConf, "owen is oom", 0, 1);
           assertEquals("is\t1\noom\t1\nowen\t1\n", result);
       } finally {
           if (fileSys != null) { fileSys.close(); }