浏览代码

Fix for HADOOP-81. Job-specific parameters should be read from the job-specific configuration, not the daemon's. This permits speculative execution, number of map & reduce tasks, etc. to be settable in the job. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@386224 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 年之前
父节点
当前提交
98a20a93b4

+ 16 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -244,6 +244,22 @@ public class JobConf extends Configuration {
     setClass("mapred.combiner.class", theClass, Reducer.class);
   }
   
+  /**
+   * Should speculative execution be used for this job?
+   * @return Defaults to true
+   */
+  public boolean getSpeculativeExecution() { 
+    return getBoolean("mapred.speculative.execution", true);
+  }
+  
+  /**
+   * Turn on or off speculative execution for this job.
+   * In general, it should be turned off for map jobs that have side effects.
+   */
+  public void setSpeculativeExecution(boolean new_val) {
+    setBoolean("mapred.speculative.execution", new_val);
+  }
+  
   public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
   public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
 

+ 15 - 12
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -50,43 +50,46 @@ class JobInProgress {
     long finishTime;
     String deleteUponCompletion = null;
 
-    Configuration conf;
+    private JobConf conf;
     boolean tasksInited = false;
 
     /**
      * Create a JobInProgress with the given job file, plus a handle
      * to the tracker.
      */
-    public JobInProgress(String jobFile, JobTracker jobtracker, Configuration conf) throws IOException {
+    public JobInProgress(String jobFile, JobTracker jobtracker, 
+                         Configuration default_conf) throws IOException {
         String jobid = "job_" + jobtracker.createUniqueId();
         String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
-        this.conf = conf;
         this.jobtracker = jobtracker;
         this.profile = new JobProfile(jobid, jobFile, url);
         this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
         this.startTime = System.currentTimeMillis();
 
-        this.localJobFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".xml");
-        this.localJarFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".jar");
-        FileSystem fs = FileSystem.get(conf);
+        JobConf default_job_conf = new JobConf(default_conf);
+        this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, 
+            jobid + ".xml");
+        this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, 
+            jobid + ".jar");
+        FileSystem fs = FileSystem.get(default_conf);
         fs.copyToLocalFile(new File(jobFile), localJobFile);
 
-        JobConf jd = new JobConf(localJobFile);
+        conf = new JobConf(localJobFile);
 
-        String jarFile = jd.getJar();
+        String jarFile = conf.getJar();
         if (jarFile != null) {
           fs.copyToLocalFile(new File(jarFile), localJarFile);
-          jd.setJar(localJarFile.getCanonicalPath());
+          conf.setJar(localJarFile.getCanonicalPath());
         }
 
-        this.numMapTasks = jd.getNumMapTasks();
-        this.numReduceTasks = jd.getNumReduceTasks();
+        this.numMapTasks = conf.getNumMapTasks();
+        this.numReduceTasks = conf.getNumReduceTasks();
 
         //
         // If a jobFile is in the systemDir, we can delete it (and
         // its JAR) upon completion
         //
-        File systemDir = jd.getSystemDir();
+        File systemDir = conf.getSystemDir();
         if (jobFile.startsWith(systemDir.getPath())) {
             this.deleteUponCompletion = jobFile;
         }

+ 25 - 25
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -47,37 +47,37 @@ class TaskInProgress {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskInProgress");
 
     // Defines the TIP
-    String jobFile = null;
-    FileSplit split = null;
-    String hints[][] = null;
-    TaskInProgress predecessors[] = null;
-    int partition;
-    JobTracker jobtracker;
-    String id;
-    String totalTaskIds[];
-    JobInProgress job;
+    private String jobFile = null;
+    private FileSplit split = null;
+    private String hints[][] = null;
+    private TaskInProgress predecessors[] = null;
+    private int partition;
+    private JobTracker jobtracker;
+    private String id;
+    private String totalTaskIds[];
+    private JobInProgress job;
 
     // Status of the TIP
-    int numTaskFailures = 0;
-    double progress = 0;
-    String state = "";
-    long startTime = 0;
-    int completes = 0;
-    boolean failed = false;
-    TreeSet usableTaskIds = new TreeSet();
-    TreeSet recentTasks = new TreeSet();
-    Configuration conf;
+    private int numTaskFailures = 0;
+    private double progress = 0;
+    private String state = "";
+    private long startTime = 0;
+    private int completes = 0;
+    private boolean failed = false;
+    private TreeSet usableTaskIds = new TreeSet();
+    private TreeSet recentTasks = new TreeSet();
+    private JobConf conf;
     
-    TreeMap taskDiagnosticData = new TreeMap();
-    TreeMap taskStatuses = new TreeMap();
+    private TreeMap taskDiagnosticData = new TreeMap();
+    private TreeMap taskStatuses = new TreeMap();
 
-    TreeSet machinesWhereFailed = new TreeSet();
-    TreeSet tasksReportedClosed = new TreeSet();
+    private TreeSet machinesWhereFailed = new TreeSet();
+    private TreeSet tasksReportedClosed = new TreeSet();
 
     /**
      * Constructor for MapTask
      */
-    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, Configuration conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job) {
         this.jobFile = jobFile;
         this.split = split;
         this.jobtracker = jobtracker;
@@ -89,7 +89,7 @@ class TaskInProgress {
     /**
      * Constructor for ReduceTask
      */
-    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, Configuration conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) {
         this.jobFile = jobFile;
         this.predecessors = predecessors;
         this.partition = partition;
@@ -408,7 +408,7 @@ class TaskInProgress {
         //
         if (isMapTask() &&
             recentTasks.size() <= MAX_TASK_EXECS &&
-            conf.getBoolean("mapred.speculative.execution", true) &&
+            conf.getSpeculativeExecution() &&
             (averageProgress - progress >= SPECULATIVE_GAP) &&
             (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
             return true;