瀏覽代碼

Merge -r 669478:669479 from trunk onto 0.18 branch. Fixes HADOOP-3333.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@669532 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父節點
當前提交
de69c6a1c8

+ 5 - 0
CHANGES.txt

@@ -628,6 +628,11 @@ Release 0.18.0 - Unreleased
     HADOOP-3580. Fixes a problem to do with specifying a har as an input to 
     a job. (Mahadev Konar via ddas)
 
+    HADOOP-3333. Don't assign a task to a tasktracker that it failed to  
+    execute earlier (used to happen in the case of lost tasktrackers where
+    the tasktracker would reinitialize and bind to a different port). 
+    (Jothi Padmanabhan and Arun Murthy via ddas)
+
     HADOOP-3534. Log IOExceptions that happen in closing the name
     system when the NameNode shuts down. (Tsz Wo (Nicholas) Sze via omalley)
 

+ 39 - 25
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -536,7 +536,7 @@ class JobInProgress {
         }
         
         // Tell the job to fail the relevant task
-        failedTask(tip, status.getTaskID(), status, status.getTaskTracker(),
+        failedTask(tip, status.getTaskID(), status, ttStatus,
                    wasRunning, wasComplete, metrics);
 
         // Did the task failure lead to tip failure?
@@ -652,14 +652,16 @@ class JobInProgress {
    * Return a MapTask, if appropriate, to run on the given tasktracker
    */
   public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
-                                            int clusterSize
+                                            int clusterSize, 
+                                            int numUniqueHosts
                                            ) throws IOException {
     if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
         
-    int target = findNewMapTask(tts, clusterSize, status.mapProgress());
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
+                                status.mapProgress());
     if (target == -1) {
       return null;
     }
@@ -685,14 +687,16 @@ class JobInProgress {
    *  work on temporary MapRed files.  
    */
   public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
-                                               int clusterSize
+                                               int clusterSize,
+                                               int numUniqueHosts
                                               ) throws IOException {
     if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
 
-    int  target = findNewReduceTask(tts, clusterSize, status.reduceProgress());
+    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
+                                    status.reduceProgress());
     if (target == -1) {
       return null;
     }
@@ -940,11 +944,14 @@ class JobInProgress {
   /**
    * Find a non-running task in the passed list of TIPs
    * @param tips a collection of TIPs
-   * @param taskTracker the tracker that has requested a task to run
+   * @param ttStatus the status of tracker that has requested a task to run
+   * @param numUniqueHosts number of unique hosts that run trask trackers
    * @param removeFailedTip whether to remove the failed tips
    */
   private synchronized TaskInProgress findTaskFromList(
-      Collection<TaskInProgress> tips, String taskTracker, boolean removeFailedTip) {
+      Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
+      int numUniqueHosts,
+      boolean removeFailedTip) {
     Iterator<TaskInProgress> iter = tips.iterator();
     while (iter.hasNext()) {
       TaskInProgress tip = iter.next();
@@ -960,8 +967,8 @@ class JobInProgress {
       // (3) when the TIP is non-schedulable (running, killed, complete)
       if (tip.isRunnable() && !tip.isRunning()) {
         // check if the tip has failed on this host
-        if (!tip.hasFailedOnMachine(taskTracker) || 
-             tip.getNumberOfFailedMachines() >= clusterSize) {
+        if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
+             tip.getNumberOfFailedMachines() >= numUniqueHosts) {
           // check if the tip has failed on all the nodes
           iter.remove();
           return tip;
@@ -988,8 +995,8 @@ class JobInProgress {
    * @return a tip that can be speculated on the tracker
    */
   private synchronized TaskInProgress findSpeculativeTask(
-      Collection<TaskInProgress> list, String taskTracker, double avgProgress,
-      long currentTime, boolean shouldRemove) {
+      Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
+      double avgProgress, long currentTime, boolean shouldRemove) {
     
     Iterator<TaskInProgress> iter = list.iterator();
 
@@ -1001,7 +1008,8 @@ class JobInProgress {
         continue;
       }
 
-      if (!tip.hasRunOnMachine(taskTracker)) {
+      if (!tip.hasRunOnMachine(ttStatus.getHost(), 
+                               ttStatus.getTrackerName())) {
         if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
           // In case of shared list we don't remove it. Since the TIP failed 
           // on this tracker can be scheduled on some other tracker.
@@ -1026,11 +1034,13 @@ class JobInProgress {
    * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
+   * @param numUniqueHosts The number of hosts that run task trackers
    * @param avgProgress The average progress of this kind of task in this job
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private synchronized int findNewMapTask(TaskTrackerStatus tts, 
                                           int clusterSize,
+                                          int numUniqueHosts,
                                           double avgProgress) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
@@ -1073,7 +1083,8 @@ class JobInProgress {
       for (int level = 0; level < maxLevel; ++level) {
         List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
         if (cacheForLevel != null) {
-          tip = findTaskFromList(cacheForLevel, taskTracker, level == 0);
+          tip = findTaskFromList(cacheForLevel, tts, 
+                                 numUniqueHosts,level == 0);
           if (tip != null) {
             // Add to running cache
             scheduleMap(tip);
@@ -1122,7 +1133,7 @@ class JobInProgress {
 
       List<TaskInProgress> cache = nonRunningMapCache.get(parent);
       if (cache != null) {
-        tip = findTaskFromList(cache, taskTracker, false);
+        tip = findTaskFromList(cache, tts, numUniqueHosts, false);
         if (tip != null) {
           // Add to the running cache
           scheduleMap(tip);
@@ -1138,7 +1149,7 @@ class JobInProgress {
     }
 
     // 3. Search non-local tips for a new task
-    tip = findTaskFromList(nonLocalMaps, taskTracker, false);
+    tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
     if (tip != null) {
       // Add to the running list
       scheduleMap(tip);
@@ -1160,7 +1171,7 @@ class JobInProgress {
         for (int level = 0; level < maxLevel; ++level) {
           Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
           if (cacheForLevel != null) {
-            tip = findSpeculativeTask(cacheForLevel, taskTracker, 
+            tip = findSpeculativeTask(cacheForLevel, tts, 
                                       avgProgress, currentTime, level == 0);
             if (tip != null) {
               if (cacheForLevel.size() == 0) {
@@ -1193,7 +1204,7 @@ class JobInProgress {
 
         Set<TaskInProgress> cache = runningMapCache.get(parent);
         if (cache != null) {
-          tip = findSpeculativeTask(cache, taskTracker, avgProgress, 
+          tip = findSpeculativeTask(cache, tts, avgProgress, 
                                     currentTime, false);
           if (tip != null) {
             // remove empty cache entries
@@ -1208,7 +1219,7 @@ class JobInProgress {
       }
 
       // 3. Check non-local tips for speculation
-      tip = findSpeculativeTask(nonLocalRunningMaps, taskTracker, avgProgress, 
+      tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 
                                 currentTime, false);
       if (tip != null) {
         LOG.info("Choosing a non-local task " + tip.getTIPId() 
@@ -1223,11 +1234,13 @@ class JobInProgress {
    * Find new reduce task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
+   * @param numUniqueHosts The number of hosts that run task trackers
    * @param avgProgress The average progress of this kind of task in this job
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
                                              int clusterSize,
+                                             int numUniqueHosts,
                                              double avgProgress) {
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
@@ -1241,7 +1254,7 @@ class JobInProgress {
 
     // 1. check for a never-executed reduce tip
     // reducers don't have a cache and so pass -1 to explicitly call that out
-    tip = findTaskFromList(nonRunningReduces, taskTracker, false);
+    tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
     if (tip != null) {
       scheduleReduce(tip);
       return tip.getIdWithinJob();
@@ -1249,7 +1262,7 @@ class JobInProgress {
 
     // 2. check for a reduce tip to be speculated
     if (hasSpeculativeReduces) {
-      tip = findSpeculativeTask(runningReduces, taskTracker, avgProgress, 
+      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
                                 System.currentTimeMillis(), false);
       if (tip != null) {
         scheduleReduce(tip);
@@ -1441,14 +1454,15 @@ class JobInProgress {
    * obtain the map task's output.
    */
   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
-                          TaskStatus status, String trackerName,
+                          TaskStatus status, 
+                          TaskTrackerStatus taskTrackerStatus,
                           boolean wasRunning, boolean wasComplete,
                           JobTrackerMetrics metrics) {
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
 
     // Mark the taskid as FAILED or KILLED
-    tip.incompleteSubTask(taskid, trackerName, this.status);
+    tip.incompleteSubTask(taskid, taskTrackerStatus, this.status);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
@@ -1490,8 +1504,8 @@ class JobInProgress {
     }
         
     // update job history
-    String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
-                               status.getTaskTracker()).getHost()).toString();
+    String taskTrackerName = jobtracker.getNode(
+                               taskTrackerStatus.getHost()).toString();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                 taskTrackerName);
@@ -1526,7 +1540,7 @@ class JobInProgress {
     // Note down that a task has failed on this tasktracker 
     //
     if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(trackerName);
+      addTrackerTaskFailure(taskTrackerStatus.getTrackerName());
     }
         
     //

+ 31 - 4
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -565,8 +566,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   //
   int totalMaps = 0;
   int totalReduces = 0;
-  private TreeMap<String, TaskTrackerStatus> taskTrackers =
-    new TreeMap<String, TaskTrackerStatus>();
+  private HashMap<String, TaskTrackerStatus> taskTrackers =
+    new HashMap<String, TaskTrackerStatus>();
+  HashMap<String,Integer>uniqueHostsMap = new HashMap<String, Integer>();
   List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
@@ -1387,6 +1389,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
       if (status == null) {
         taskTrackers.remove(trackerName);
+        Integer numTaskTrackersInHost = 
+          uniqueHostsMap.get(oldStatus.getHost());
+        numTaskTrackersInHost --;
+        if (numTaskTrackersInHost > 0)  {
+          uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
+        }
+        else {
+          uniqueHostsMap.remove(oldStatus.getHost());
+        }
       }
     }
     if (status != null) {
@@ -1394,7 +1405,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       totalReduces += status.countReduceTasks();
       totalMapTaskCapacity += status.getMaxMapTasks();
       totalReduceTaskCapacity += status.getMaxReduceTasks();
+      boolean alreadyPresent = false;
+      if (taskTrackers.containsKey(trackerName)) {
+        alreadyPresent = true;
+      }
       taskTrackers.put(trackerName, status);
+
+      if (!alreadyPresent)  {
+        Integer numTaskTrackersInHost = 
+          uniqueHostsMap.get(status.getHost());
+        if (numTaskTrackersInHost == null) {
+          numTaskTrackersInHost = 0;
+        }
+        numTaskTrackersInHost ++;
+        uniqueHostsMap.put(status.getHost(), numTaskTrackersInHost);
+      }
     }
     return oldStatus != null;
   }
@@ -1579,7 +1604,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
             continue;
           }
 
-          Task t = job.obtainNewMapTask(tts, numTaskTrackers);
+          Task t = job.obtainNewMapTask(tts, numTaskTrackers,
+                                        uniqueHostsMap.size());
           if (t != null) {
             expireLaunchingTasks.addNewTask(t.getTaskID());
             myMetrics.launchMap();
@@ -1616,7 +1642,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
             continue;
           }
 
-          Task t = job.obtainNewReduceTask(tts, numTaskTrackers);
+          Task t = job.obtainNewReduceTask(tts, numTaskTrackers, 
+                                           uniqueHostsMap.size());
           if (t != null) {
             expireLaunchingTasks.addNewTask(t.getTaskID());
             myMetrics.launchReduce();

+ 14 - 9
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -431,11 +431,15 @@ class TaskInProgress {
    * Indicate that one of the taskids in this TaskInProgress
    * has failed.
    */
-  public void incompleteSubTask(TaskAttemptID taskid, String trackerName, 
+  public void incompleteSubTask(TaskAttemptID taskid, 
+                                TaskTrackerStatus ttStatus,
                                 JobStatus jobStatus) {
     //
     // Note the failure and its location
     //
+    String trackerName = ttStatus.getTrackerName();
+    String trackerHostName = ttStatus.getHost();
+     
     TaskStatus status = taskStatuses.get(taskid);
     TaskStatus.State taskState = TaskStatus.State.FAILED;
     if (status != null) {
@@ -480,7 +484,7 @@ class TaskInProgress {
 
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
-      machinesWhereFailed.add(trackerName);
+      machinesWhereFailed.add(trackerHostName);
     } else {
       numKilledTasks++;
     }
@@ -722,21 +726,22 @@ class TaskInProgress {
     
   /**
    * Has this task already failed on this machine?
-   * @param tracker The task tracker name
+   * @param trackerHost The task tracker hostname
    * @return Has it failed?
    */
-  public boolean hasFailedOnMachine(String tracker) {
-    return machinesWhereFailed.contains(tracker);
+  public boolean hasFailedOnMachine(String trackerHost) {
+    return machinesWhereFailed.contains(trackerHost);
   }
     
   /**
    * Was this task ever scheduled to run on this machine?
-   * @param tracker The task tracker name
+   * @param trackerHost The task tracker hostname 
+   * @param trackerName The tracker name
    * @return Was task scheduled on the tracker?
    */
-  public boolean hasRunOnMachine(String tracker){
-    return this.activeTasks.values().contains(tracker) || 
-      hasFailedOnMachine(tracker);
+  public boolean hasRunOnMachine(String trackerHost, String trackerName) {
+    return this.activeTasks.values().contains(trackerName) || 
+      hasFailedOnMachine(trackerHost);
   }
   /**
    * Get the number of machines where this task has failed.

+ 12 - 1
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -23,6 +23,7 @@ import java.util.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -360,8 +361,18 @@ public class MiniMRCluster {
       throw new IllegalArgumentException( "The length of hosts [" + hosts.length
           + "] is less than the number of tasktrackers [" + numTaskTrackers + "].");
     }
+     
+     //Generate rack names if required
+     if (racks == null) {
+       System.out.println("Generating rack names for tasktrackers");
+       racks = new String[numTaskTrackers];
+       for (int i=0; i < racks.length; ++i) {
+         racks[i] = NetworkTopology.DEFAULT_RACK;
+       }
+     }
+     
     //Generate some hostnames if required
-    if (racks != null && hosts == null) {
+    if (hosts == null) {
       System.out.println("Generating host names for tasktrackers");
       hosts = new String[numTaskTrackers];
       for (int i = 0; i < numTaskTrackers; i++) {