Pārlūkot izejas kodu

HADOOP-964. Fix a problem introduced by HADOOP-830, where jobs fail whose comparators and/or i/o types are in the job's jar. Contributed by Dennis Kubes.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@502694 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 gadi atpakaļ
vecāks
revīzija
6855d8d704

+ 4 - 0
CHANGES.txt

@@ -131,6 +131,10 @@ Trunk (unreleased changes)
 40. HADOOP-967.  Change RPC clients to start sending a version header.
     (omalley via cutting)
 
+41. HADOOP-964.  Fix a bug introduced by HADOOP-830 where jobs failed
+    whose comparators and/or i/o types were in the job's jar.
+    (Dennis Kubes via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

+ 37 - 0
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -315,10 +315,47 @@ class ReduceTaskRunner extends TaskRunner implements MRConstants {
     }
 
   }
+  
+  private void configureClasspath(JobConf conf)
+    throws IOException {
+    
+    // get the task and the current classloader which will become the parent
+    Task task = getTask();
+    ClassLoader parent = conf.getClassLoader();   
+    
+    // get the work directory which holds the elements we are dynamically
+    // adding to the classpath
+    File workDir = new File(task.getJobFile()).getParentFile();
+    File jobCacheDir = new File(workDir.getParent(), "work");
+    ArrayList<URL> urllist = new ArrayList<URL>();
+    
+    // add the jars and directories to the classpath
+    String jar = conf.getJar();
+    if (jar != null) {      
+      File[] libs = new File(jobCacheDir, "lib").listFiles();
+      if (libs != null) {
+        for (int i = 0; i < libs.length; i++) {
+          urllist.add(libs[i].toURL());
+        }
+      }
+      urllist.add(new File(jobCacheDir, "classes").toURL());
+      urllist.add(jobCacheDir.toURL());
+     
+    }
+    urllist.add(workDir.toURL());
+    
+    // create a new classloader with the old classloader as its parent
+    // then set that classloader as the one used by the current jobconf
+    URL[] urls = urllist.toArray(new URL[urllist.size()]);
+    URLClassLoader loader = new URLClassLoader(urls, parent);
+    conf.setClassLoader(loader);
+  }
 
   public ReduceTaskRunner(Task task, TaskTracker tracker, 
                           JobConf conf) throws IOException {
+    
     super(task, tracker, conf);
+    configureClasspath(conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
 

+ 94 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java

@@ -75,6 +75,7 @@ public class TestMiniMRClasspath extends TestCase {
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     {
+      Path[] parents = fs.listPaths(outDir.getParent());
       Path[] fileList = fs.listPaths(outDir);
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
@@ -90,7 +91,62 @@ public class TestMiniMRClasspath extends TestCase {
     }
     return result.toString();
   }
-  
+
+   static String launchExternal(String fileSys, String jobTracker, JobConf conf,
+    String input, int numMaps, int numReduces)
+    throws IOException {
+
+    final Path inDir = new Path("/testing/ext/input");
+    final Path outDir = new Path("/testing/ext/output");
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("wordcount");
+    conf.setInputFormat(TextInputFormat.class);
+
+    // the keys are counts
+    conf.setOutputValueClass(IntWritable.class);
+    // the values are the messages
+    conf.set("mapred.output.key.class", "ExternalWritable");
+
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+    
+    conf.set("mapred.mapper.class", "ExternalMapperReducer"); 
+    conf.set("mapred.reducer.class", "ExternalMapperReducer");
+
+    //pass a job.jar already included in the hadoop build
+    conf.setJar("build/test/testjar/testjob.jar");
+    JobClient.runJob(conf);
+    StringBuffer result = new StringBuffer();
+
+    Path[] fileList = fs.listPaths(outDir);
+    for (int i = 0; i < fileList.length; ++i) {
+      BufferedReader file = new BufferedReader(new InputStreamReader(
+        fs.open(fileList[i])));
+      String line = file.readLine();
+      while (line != null) {
+        result.append(line);
+        line = file.readLine();
+        result.append("\n");
+      }
+      file.close();
+    }
+
+    return result.toString();
+  }
+   
   public void testClassPath() throws IOException {
       String namenode = null;
       MiniDFSCluster dfs = null;
@@ -124,4 +180,41 @@ public class TestMiniMRClasspath extends TestCase {
       }
   }
   
+  public void testExternalWritable()
+    throws IOException {
+ 
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      
+      final int taskTrackers = 4;
+      final int jobTrackerPort = 60050;
+
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(65314, conf, true);
+      fileSys = dfs.getFileSystem();
+      namenode = fileSys.getName();
+      mr = new MiniMRCluster(jobTrackerPort, 50060, taskTrackers, namenode, 
+        true, 3);      
+      JobConf jobConf = new JobConf();
+      String result;
+      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+      
+      result = launchExternal(namenode, jobTrackerName, jobConf, 
+                               "Dennis was here!\nDennis again!",
+                               3, 1);
+      assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result);
+      
+    } 
+    finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
+      }
+    }
+  }
+  
 }

+ 48 - 0
src/test/testjar/ExternalMapperReducer.java

@@ -0,0 +1,48 @@
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ExternalMapperReducer
+  implements Mapper, Reducer {
+
+  public void configure(JobConf job) {
+
+  }
+
+  public void close()
+    throws IOException {
+
+  }
+
+  public void map(WritableComparable key, Writable value,
+    OutputCollector output, Reporter reporter)
+    throws IOException {
+    
+    if (value instanceof Text) {
+      Text text = (Text)value;
+      ExternalWritable ext = new ExternalWritable(text.toString());
+      output.collect(ext, new IntWritable(1));
+    }
+  }
+
+  public void reduce(WritableComparable key, Iterator values,
+    OutputCollector output, Reporter reporter)
+    throws IOException {
+    
+    int count = 0;
+    while (values.hasNext()) {
+      count++;
+      values.next();
+    }
+    output.collect(key, new IntWritable(count));
+  }
+}

+ 67 - 0
src/test/testjar/ExternalWritable.java

@@ -0,0 +1,67 @@
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This is an example simple writable class.  This is used as a class external 
+ * to the Hadoop IO classes for testing of user Writable classes.
+ * 
+ * @author Dennis E. Kubes
+ */
+public class ExternalWritable
+  implements WritableComparable {
+
+  private String message = null;
+  
+  public ExternalWritable() {
+    
+  }
+  
+  public ExternalWritable(String message) {
+    this.message = message;
+  }
+  
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public void readFields(DataInput in)
+    throws IOException {
+    
+    message = null;
+    boolean hasMessage = in.readBoolean();
+    if (hasMessage) {
+      message = in.readUTF();   
+    }
+  }
+
+  public void write(DataOutput out)
+    throws IOException {
+    
+    boolean hasMessage = (message != null && message.length() > 0);
+    out.writeBoolean(hasMessage);
+    if (hasMessage) {
+      out.writeUTF(message);
+    }
+  }
+  
+  public int compareTo(Object o) {
+    
+    if (!(o instanceof ExternalWritable)) {
+      throw new IllegalArgumentException("Input not an ExternalWritable");
+    }
+    
+    ExternalWritable that = (ExternalWritable)o;
+    return this.message.compareTo(that.message);
+  }
+
+  public String toString() {
+    return this.message;
+  }
+}