Prechádzať zdrojové kódy

HADOOP-1245. Use the mapred.tasktracker.tasks.maximum value configured on each
node rather than the global number configured on the job tracker. Contributed
by Michael Bieniosek.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@588068 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 rokov pred
rodič
commit
5e876da186

+ 7 - 0
CHANGES.txt

@@ -3,6 +3,13 @@ Hadoop Change Log
 
 Trunk (unreleased changes)
 
+  INCOMPATIBLE CHANGES
+
+    HADOOP-1245.  Use the mapred.tasktracker.tasks.maximum value
+    configured on each tasktracker when allocating tasks, instead of
+    the value configured on the jobtracker. InterTrackerProtocol
+    version changed from 5 to 6. (Michael Bieniosek via omalley)
+    
   IMPROVEMENTS
 
     HADOOP-2045.  Change committer list on website to a table, so that

+ 2 - 1
src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -34,8 +34,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * version 4 changed TaskReport for HADOOP-549.
    * version 5 introduced that removes locateMapOutputs and instead uses
    * getTaskCompletionEvents to figure finished maps and fetch the outputs
+   * version 6 adds maxTasks to TaskTrackerStatus for HADOOP-1245
    */
-  public static final long versionID = 5L;
+  public static final long versionID = 6L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 9 - 7
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -507,7 +507,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   long startTime;
   int totalSubmissions = 0;
 
-  private int maxCurrentTasks;
+  private int totalTaskCapacity;
   private HostsFileReader hostsReader;
 
   //
@@ -621,7 +621,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     //
     TASKTRACKER_EXPIRY_INTERVAL = 
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
-    maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
     RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
     RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
     TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);
@@ -1231,6 +1230,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      totalTaskCapacity -= oldStatus.getMaxTasks();
       if (status == null) {
         taskTrackers.remove(trackerName);
       }
@@ -1238,6 +1238,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      totalTaskCapacity += status.getMaxTasks();
       taskTrackers.put(trackerName, status);
     }
     return oldStatus != null;
@@ -1297,7 +1298,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     int remainingMapLoad = 0;
     int numTaskTrackers;
     TaskTrackerStatus tts;
-	
+
     synchronized (taskTrackers) {
       numTaskTrackers = taskTrackers.size();
       tts = taskTrackers.get(taskTracker);
@@ -1306,7 +1307,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
       return null;
     }
-    int totalCapacity = numTaskTrackers * maxCurrentTasks;
 
     synchronized(jobsByPriority){
       for (Iterator it = jobsByPriority.iterator(); it.hasNext();) {
@@ -1320,6 +1320,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       }   
     }
 
+    int maxCurrentTasks = tts.getMaxTasks();
+    
     // find out the maximum number of maps or reduces that we are willing
     // to run on any node.
     int maxMapLoad = 0;
@@ -1381,7 +1383,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
             padding = Math.min(maxCurrentTasks,
                                (int)(totalNeededMaps * PAD_FRACTION));
           }
-          if (totalMaps + padding >= totalCapacity) {
+          if (totalMaps + padding >= totalTaskCapacity) {
             break;
           }
         }
@@ -1419,7 +1421,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
               Math.min(maxCurrentTasks,
                        (int) (totalNeededReduces * PAD_FRACTION));
           }
-          if (totalReduces + padding >= totalCapacity) {
+          if (totalReduces + padding >= totalTaskCapacity) {
             break;
           }
         }
@@ -1575,7 +1577,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       return new ClusterStatus(taskTrackers.size(),
                                totalMaps,
                                totalReduces,
-                               maxCurrentTasks,
+                               totalTaskCapacity,
                                state);          
     }
   }

+ 2 - 2
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -391,7 +391,7 @@ public class TaskTracker
     this.reduceTotal = 0;
     this.acceptNewTasks = true;
     this.status = null;
-        
+
     this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
     this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
     int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
@@ -860,7 +860,7 @@ public class TaskTracker
       synchronized (this) {
         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
                                        httpPort, cloneAndResetRunningTaskStatuses(), 
-                                       failures); 
+                                       failures, maxCurrentTasks); 
       }
     } else {
       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +

+ 16 - 1
src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -46,6 +46,7 @@ class TaskTrackerStatus implements Writable {
   List<TaskStatus> taskReports;
     
   volatile long lastSeen;
+  int maxTasks;
     
   /**
    */
@@ -57,13 +58,15 @@ class TaskTrackerStatus implements Writable {
    */
   public TaskTrackerStatus(String trackerName, String host, 
                            int httpPort, List<TaskStatus> taskReports, 
-                           int failures) {
+                           int failures, int maxTasks) {
     this.trackerName = trackerName;
     this.host = host;
     this.httpPort = httpPort;
 
     this.taskReports = new ArrayList<TaskStatus>(taskReports);
     this.failures = failures;
+
+    this.maxTasks = maxTasks;
   }
 
   /**
@@ -149,6 +152,16 @@ class TaskTrackerStatus implements Writable {
     this.lastSeen = lastSeen;
   }
 
+  /**
+   * Get the maximum concurrent tasks for this node.  (This applies
+   * per type of task - a node with maxTasks==1 will run up to 1 map
+   * and 1 reduce concurrently).
+   * @return maximum tasks this node supports
+   */
+  public int getMaxTasks() {
+    return maxTasks;
+  }
+  
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
@@ -157,6 +170,7 @@ class TaskTrackerStatus implements Writable {
     UTF8.writeString(out, host);
     out.writeInt(httpPort);
     out.writeInt(failures);
+    out.writeInt(maxTasks);
 
     out.writeInt(taskReports.size());
     for (TaskStatus taskStatus : taskReports) {
@@ -169,6 +183,7 @@ class TaskTrackerStatus implements Writable {
     this.host = UTF8.readString(in);
     this.httpPort = in.readInt();
     this.failures = in.readInt();
+    this.maxTasks = in.readInt();
 
     taskReports.clear();
     int numTasks = in.readInt();

+ 11 - 5
src/webapps/job/jobtracker.jsp

@@ -67,15 +67,21 @@
   public void generateSummaryTable(JspWriter out,
                                    JobTracker tracker) throws IOException {
     ClusterStatus status = tracker.getClusterStatus();
+    String tasksPerNode = status.getTaskTrackers() > 0 ?
+      percentFormat.format(((double)status.getMaxTasks()) / status.getTaskTrackers()) :
+      "-";
     out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n"+
               "<tr><th>Maps</th><th>Reduces</th>" + 
-              "<th>Tasks/Node</th><th>Total Submissions</th>" +
-              "<th>Nodes</th></tr>\n");
+              "<th>Total Submissions</th>" +
+              "<th>Nodes</th><th>Task Capacity</th><th>Avg. Tasks/Node</th></tr>\n");
     out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
               status.getReduceTasks() + "</td><td>" + 
-              status.getMaxTasks() + "</td><td>" +
-              tracker.getTotalSubmissions() + "</td><td><a href=\"machines.jsp\">" +
-              status.getTaskTrackers() + "</a></td></tr></table>\n");
+              tracker.getTotalSubmissions() +
+              "</td><td><a href=\"machines.jsp\">" +
+              status.getTaskTrackers() +
+              "</a></td><td>" + status.getMaxTasks() +
+	      "</td><td>" + tasksPerNode +
+              "</td></tr></table>\n");
   }%>
 
 <%@page import="org.apache.hadoop.dfs.JspHelper"%>

+ 4 - 2
src/webapps/job/machines.jsp

@@ -23,9 +23,10 @@
     } else {
       out.print("<center>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-      out.print("<tr><td align=\"center\" colspan=\"5\"><b>Task Trackers</b></td></tr>\n");
+      out.print("<tr><td align=\"center\" colspan=\"6\"><b>Task Trackers</b></td></tr>\n");
       out.print("<tr><td><b>Name</b></td><td><b>Host</b></td>" +
-                "<td><b># running tasks</b></td><td><b>Failures</b></td>" +
+                "<td><b># running tasks</b></td><td><b>Max Tasks</b></td>" +
+                "<td><b>Failures</b></td>" +
                 "<td><b>Seconds since heartbeat</b></td></tr>\n");
       int maxFailures = 0;
       String failureKing = null;
@@ -49,6 +50,7 @@
         out.print(tt.getHost() + ":" + tt.getHttpPort() + "/\">");
         out.print(tt.getTrackerName() + "</a></td><td>");
         out.print(tt.getHost() + "</td><td>" + numCurTasks +
+                  "</td><td>" + tt.getMaxTasks() + 
                   "</td><td>" + numFailures + 
                   "</td><td>" + sinceHeartbeat + "</td></tr>\n");
       }