Prechádzať zdrojové kódy

HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
repetitive calls to get the current time and late checking to see if
we want speculation on at all.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@632722 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 rokov pred
rodič
commit
e8f4535373

+ 4 - 0
CHANGES.txt

@@ -52,6 +52,10 @@ Trunk (unreleased changes)
 
   OPTIMIZATIONS
 
+    HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing
+    repetitive calls to get the current time and late checking to see if
+    we want speculation on at all. (omalley)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

+ 21 - 11
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -108,6 +108,8 @@ class JobInProgress {
 
   private LocalFileSystem localFs;
   private String jobId;
+  private boolean hasSpeculativeMaps;
+  private boolean hasSpeculativeReduces;
 
   // Per-job counters
   public static enum Counter { 
@@ -179,6 +181,8 @@ class JobInProgress {
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid);
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
   }
 
   /**
@@ -632,7 +636,7 @@ class JobInProgress {
     
     
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, nodesToMaps);
+                             maps, nodesToMaps, hasSpeculativeMaps);
     if (target == -1) {
       return null;
     }
@@ -668,7 +672,7 @@ class JobInProgress {
     }
 
     int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
-                             reduces, null);
+                             reduces, null, hasSpeculativeReduces);
     if (target == -1) {
       return null;
     }
@@ -754,10 +758,11 @@ class JobInProgress {
     return trackerErrors;
   }
     
-  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
-                                          double avgProgress,
-                                          String taskTracker) {
-    return task.hasSpeculativeTask(avgProgress) && 
+  private boolean shouldRunSpeculativeTask(long currentTime,
+                                           TaskInProgress task, 
+                                           double avgProgress,
+                                           String taskTracker) {
+    return task.hasSpeculativeTask(currentTime, avgProgress) && 
            !task.hasRunOnMachine(taskTracker);
   }
   
@@ -769,13 +774,15 @@ class JobInProgress {
    * @param tasks The list of potential tasks to try
    * @param firstTaskToTry The first index in tasks to check
    * @param cachedTasks A list of tasks that would like to run on this node
+   * @param hasSpeculative Should it try to find speculative tasks
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private int findNewTask(TaskTrackerStatus tts, 
                           int clusterSize,
                           double avgProgress,
                           TaskInProgress[] tasks,
-                          Map<Node,List<TaskInProgress>> cachedTasks) {
+                          Map<Node,List<TaskInProgress>> cachedTasks,
+                          boolean hasSpeculative) {
     String taskTracker = tts.getTrackerName();
     int specTarget = -1;
 
@@ -799,6 +806,7 @@ class JobInProgress {
       }
       return -1;
     }
+    long currentTime = System.currentTimeMillis();
         
     //
     // See if there is a split over a block that is stored on
@@ -845,8 +853,9 @@ class JobInProgress {
               }
               return cacheTarget;
             }
-            if (specTarget == -1 &&
-                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+            if (hasSpeculative && specTarget == -1 &&
+                shouldRunSpeculativeTask(currentTime, tip, avgProgress, 
+                                         taskTracker)) {
               specTarget = tip.getIdWithinJob();
             }
           }
@@ -881,8 +890,9 @@ class JobInProgress {
           if (!isRunning) {
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
-          } else if (specTarget == -1 &&
-                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
+          } else if (hasSpeculative && specTarget == -1 &&
+                     shouldRunSpeculativeTask(currentTime, task, avgProgress,
+                                              taskTracker)) {
             specTarget = i;
           }
         }

+ 2 - 11
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -96,7 +96,6 @@ class TaskInProgress {
   // currently runnings
   private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private JobConf conf;
-  private boolean runSpeculative;
   private Map<String,List<String>> taskDiagnosticData =
     new TreeMap<String,List<String>>();
   /**
@@ -131,7 +130,6 @@ class TaskInProgress {
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getMapSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
         
@@ -149,7 +147,6 @@ class TaskInProgress {
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
-    this.runSpeculative = conf.getReduceSpeculativeExecution();
     init(JobTracker.getJobUniqueString(jobid));
   }
   
@@ -204,11 +201,6 @@ class TaskInProgress {
    */
   void init(String jobUniqueString) {
     this.startTime = System.currentTimeMillis();
-    if ("true".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = true;
-    } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
-      this.runSpeculative = false;
-    }
     this.taskIdPrefix = makeUniqueString(jobUniqueString);
     this.id = "tip_" + this.taskIdPrefix;
   }
@@ -691,16 +683,15 @@ class TaskInProgress {
    * far behind, and has been behind for a non-trivial amount of 
    * time.
    */
-  boolean hasSpeculativeTask(double averageProgress) {
+  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
     //
     // REMIND - mjc - these constants should be examined
     // in more depth eventually...
     //
       
     if (activeTasks.size() <= MAX_TASK_EXECS &&
-        runSpeculative &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+        (currentTime - startTime >= SPECULATIVE_LAG) 
         && completes == 0 && !isOnlyCommitPending()) {
       return true;
     }