Selaa lähdekoodia

HADOOP-2790. Optimizes hasSpeculativeTask to do with getting the value of time (System.getCurrentTimeMillis()). This is now done only once in the beginning and is used for all TIPs. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@632572 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 vuotta sitten
vanhempi
commit
32bd5fad82

+ 4 - 0
CHANGES.txt

@@ -191,6 +191,10 @@ Release 0.16.1 - Unreleased
     HADOOP-2918.  Improve error logging so that dfs writes failure with
     HADOOP-2918.  Improve error logging so that dfs writes failure with
     "No lease on file" can be diagnosed. (dhruba)
     "No lease on file" can be diagnosed. (dhruba)
 
 
+    HADOOP-2790.  Optimizes hasSpeculativeTask to do with getting the value of
+    time (System.getCurrentTimeMillis()). This is now done only once in the
+    beginning and is used for all TIPs. (Owen O'Malley via ddas).
+
 Release 0.16.0 - 2008-02-07
 Release 0.16.0 - 2008-02-07
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 21 - 11
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -108,6 +108,8 @@ class JobInProgress {
 
 
   private LocalFileSystem localFs;
   private LocalFileSystem localFs;
   private String jobId;
   private String jobId;
+  private boolean hasSpeculativeMaps;
+  private boolean hasSpeculativeReduces;
 
 
   // Per-job counters
   // Per-job counters
   public static enum Counter { 
   public static enum Counter { 
@@ -179,6 +181,8 @@ class JobInProgress {
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid);
     this.jobMetrics.setTag("jobId", jobid);
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
   }
   }
 
 
   /**
   /**
@@ -632,7 +636,7 @@ class JobInProgress {
     
     
     
     
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps);
+                             maps, nodesToMaps, hasSpeculativeMaps);
     if (target == -1) {
     if (target == -1) {
       return null;
       return null;
     }
     }
@@ -668,7 +672,7 @@ class JobInProgress {
     }
     }
 
 
     int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
     int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null);
+                             reduces, null, hasSpeculativeReduces);
     if (target == -1) {
     if (target == -1) {
       return null;
       return null;
     }
     }
@@ -754,10 +758,11 @@ class JobInProgress {
     return trackerErrors;
     return trackerErrors;
   }
   }
     
     
-  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
-                                          double avgProgress,
-                                          String taskTracker) {
-    return task.hasSpeculativeTask(avgProgress) && 
+  private boolean shouldRunSpeculativeTask(long currentTime,
+                                           TaskInProgress task, 
+                                           double avgProgress,
+                                           String taskTracker) {
+    return task.hasSpeculativeTask(currentTime, avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
            !task.hasRunOnMachine(taskTracker);
   }
   }
   
   
@@ -769,13 +774,15 @@ class JobInProgress {
    * @param tasks The list of potential tasks to try
    * @param tasks The list of potential tasks to try
    * @param firstTaskToTry The first index in tasks to check
    * @param firstTaskToTry The first index in tasks to check
    * @param cachedTasks A list of tasks that would like to run on this node
    * @param cachedTasks A list of tasks that would like to run on this node
+   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    * @return the index in tasks of the selected task (or -1 for no task)
    */
    */
   private int findNewTask(TaskTrackerStatus tts, 
   private int findNewTask(TaskTrackerStatus tts, 
                           int clusterSize,
                           int clusterSize,
                           double avgProgress,
                           double avgProgress,
                           TaskInProgress[] tasks,
                           TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks) {
+                          Map<Node,List<TaskInProgress>> cachedTasks,
+                          boolean hasSpeculative) {
     String taskTracker = tts.getTrackerName();
     String taskTracker = tts.getTrackerName();
     int specTarget = -1;
     int specTarget = -1;
 
 
@@ -799,6 +806,7 @@ class JobInProgress {
       }
       }
       return -1;
       return -1;
     }
     }
+    long currentTime = System.currentTimeMillis();
         
         
     //
     //
     // See if there is a split over a block that is stored on
     // See if there is a split over a block that is stored on
@@ -845,8 +853,9 @@ class JobInProgress {
               }
               }
               return cacheTarget;
               return cacheTarget;
             }
             }
-            if (specTarget == -1 &&
-                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+            if (hasSpeculative && specTarget == -1 &&
+                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
+                                         taskTracker)) {
               specTarget = tip.getIdWithinJob();
               specTarget = tip.getIdWithinJob();
             }
             }
           }
           }
@@ -881,8 +890,9 @@ class JobInProgress {
           if (!isRunning) {
           if (!isRunning) {
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
             return i;
-          } else if (specTarget == -1 &&
-                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
+          } else if (hasSpeculative && specTarget == -1 &&
+                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
+                                              taskTracker)) {
             specTarget = i;
             specTarget = i;
           }
           }
         }
         }

+ 2 - 11
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -96,7 +96,6 @@ class TaskInProgress {
   // currently runnings
   // currently runnings
   private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private JobConf conf;
   private JobConf conf;
-  private boolean runSpeculative;
   private Map<String,List<String>> taskDiagnosticData =
   private Map<String,List<String>> taskDiagnosticData =
     new TreeMap<String,List<String>>();
     new TreeMap<String,List<String>>();
   /**
   /**
@@ -131,7 +130,6 @@ class TaskInProgress {
     this.conf = conf;
     this.conf = conf;
     this.partition = partition;
     this.partition = partition;
     setMaxTaskAttempts();
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getMapSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
     init(JobTracker.getJobUniqueString(jobid));
   }
   }
         
         
@@ -149,7 +147,6 @@ class TaskInProgress {
     this.job = job;
     this.job = job;
     this.conf = conf;
     this.conf = conf;
     setMaxTaskAttempts();
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getReduceSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
     init(JobTracker.getJobUniqueString(jobid));
   }
   }
   
   
@@ -204,11 +201,6 @@ class TaskInProgress {
    */
    */
   void init(String jobUniqueString) {
   void init(String jobUniqueString) {
     this.startTime = System.currentTimeMillis();
     this.startTime = System.currentTimeMillis();
-    if ("true".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = true;
-    } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = false;
-    }
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.id = "tip_" + this.taskIdPrefix;
     this.id = "tip_" + this.taskIdPrefix;
   }
   }
@@ -691,16 +683,15 @@ class TaskInProgress {
    * far behind, and has been behind for a non-trivial amount of 
    * far behind, and has been behind for a non-trivial amount of 
    * time.
    * time.
    */
    */
-  boolean hasSpeculativeTask(double averageProgress) {
+  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
     //
     //
     // REMIND - mjc - these constants should be examined
     // REMIND - mjc - these constants should be examined
     // in more depth eventually...
     // in more depth eventually...
     //
     //
       
       
     if (activeTasks.size() <= MAX_TASK_EXECS &&
     if (activeTasks.size() <= MAX_TASK_EXECS &&
-        runSpeculative &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+        (currentTime - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
         && completes == 0 && !isOnlyCommitPending()) {
       return true;
       return true;
     }
     }