Browse Source

Fix for HADOOP-12. The JobTracker now loads the InputFormat from the job's jar file.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@376474 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
6a533c06ff
1 changed files with 30 additions and 2 deletions
  1. 30 2
      src/java/org/apache/hadoop/mapred/JobInProgress.java

+ 30 - 2
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -20,6 +20,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.LogFormatter;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 import java.util.logging.*;
 
@@ -35,6 +36,7 @@ class JobInProgress {
     JobProfile profile;
     JobStatus status;
     File localJobFile = null;
+    File localJarFile = null;
 
     TaskInProgress maps[] = null;
     TaskInProgress reduces[] = null;
@@ -64,10 +66,18 @@ class JobInProgress {
         this.startTime = System.currentTimeMillis();
 
         this.localJobFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".xml");
+        this.localJarFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".jar");
         FileSystem fs = FileSystem.get(conf);
         fs.copyToLocalFile(new File(jobFile), localJobFile);
 
         JobConf jd = new JobConf(localJobFile);
+
+        String jarFile = jd.getJar();
+        if (jarFile != null) {
+          fs.copyToLocalFile(new File(jarFile), localJarFile);
+          jd.setJar(localJarFile.getCanonicalPath());
+        }
+
         this.numMapTasks = jd.getNumMapTasks();
         this.numReduceTasks = jd.getNumReduceTasks();
 
@@ -97,8 +107,22 @@ class JobInProgress {
 
         JobConf jd = new JobConf(localJobFile);
         FileSystem fs = FileSystem.get(conf);
-        FileSplit[] splits =
-            jd.getInputFormat().getSplits(fs, jd, numMapTasks);
+        String ifClassName = jd.get("mapred.input.format.class");
+        InputFormat inputFormat;
+        if (ifClassName != null && localJarFile != null) {
+          try {
+            ClassLoader loader =
+              new URLClassLoader(new URL[]{ localJarFile.toURL() });
+            Class inputFormatClass = loader.loadClass(ifClassName);
+            inputFormat = (InputFormat)inputFormatClass.newInstance();
+          } catch (Exception e) {
+            throw new IOException(e.toString());
+          }
+        } else {
+          inputFormat = jd.getInputFormat();
+        }
+
+        FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);
 
         //
         // sort splits by decreasing length, to reduce job's tail
@@ -417,6 +441,10 @@ class JobInProgress {
             localJobFile.delete();
             localJobFile = null;
         }
+        if (localJarFile != null) {
+            localJarFile.delete();
+            localJarFile = null;
+        }
 
         //
         // If the job file was in the temporary system directory,