Sfoglia il codice sorgente

MAPREDUCE-3015. Add local dir failure info to metrics and the web UI. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1201813 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins 13 anni fa
parent
commit
352f42dcca

+ 2 - 0
CHANGES.txt

@@ -61,6 +61,8 @@ Release 0.20.206.0 - unreleased
 
     MAPREDUCE-3395. Add mapred.disk.healthChecker.interval to mapred-default.xml. (eli via harsh)
 
+    MAPREDUCE-3015. Add local dir failure info to metrics and the web UI. (eli)
+
 Release 0.20.205.1 - unreleased
 
   NEW FEATURES

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -71,8 +71,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * Version 28: Adding user name to the serialized Task for use by TT.
    * Version 29: Adding available memory and CPU usage information on TT to
    *             TaskTrackerStatus for MAPREDUCE-1218
+   * Version 30: Adding disk failure to TaskTrackerStatus for MAPREDUCE-3015
    */
-  public static final long versionID = 29L;
+  public static final long versionID = 30L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 3 - 2
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1555,7 +1555,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       // III. Create the dummy tasktracker status
       TaskTrackerStatus ttStatus = 
         new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
-                              0 , 0, 0);
+                              0 , 0, 0, 0);
       ttStatus.setLastSeen(clock.getTime());
 
       synchronized (JobTracker.this) {
@@ -5331,7 +5331,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           put("reduce_slots", redSlots);
           put("reduce_slots_used", redSlots - tts.getAvailableReduceSlots());
         }});
-        put("failures", tts.getFailures());
+        put("failures", tts.getTaskFailures());
+        put("dir_failures", tts.getDirFailures());
       }});
     }
     return info;

+ 10 - 5
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -325,7 +325,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   private Localizer localizer;
   private int maxMapSlots;
   private int maxReduceSlots;
-  private int failures;
+  private int taskFailures;
   final long mapRetainSize;
   final long reduceRetainSize;
 
@@ -1772,7 +1772,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
                                        httpPort, 
                                        cloneAndResetRunningTaskStatuses(
                                          sendCounters), 
-                                       failures, 
+                                       taskFailures,
+                                       localStorage.numFailures(),
                                        maxMapSlots,
                                        maxReduceSlots); 
       }
@@ -2868,7 +2869,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         }
         if (!done) {
           if (!wasKilled) {
-            failures += 1;
+            taskFailures++;
             setTaskFailState(true);
             // call the script here for the failed tasks.
             if (debugCommand != null) {
@@ -3108,7 +3109,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
           isCleaningup()) {
         wasKilled = true;
         if (wasFailure) {
-          failures += 1;
+          taskFailures++;
         }
         // runner could be null if task-cleanup attempt is not localized yet
         if (runner != null) {
@@ -3117,7 +3118,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         setTaskFailState(wasFailure);
       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
-          failures += 1;
+          taskFailures++;
           taskStatus.setRunState(TaskStatus.State.FAILED);
         } else {
           taskStatus.setRunState(TaskStatus.State.KILLED);
@@ -4055,6 +4056,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     return maxReduceSlots;
   }
 
+  int getNumDirFailures() {
+    return localStorage.numFailures();
+  }
+
   //called from unit test
   synchronized void setMaxMapSlots(int mapSlots) {
     maxMapSlots = mapSlots;

+ 3 - 0
src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsSource.java

@@ -41,6 +41,8 @@ public class TaskTrackerMetricsSource extends TaskTrackerInstrumentation
       registry.newGauge("mapTaskSlots", "", 0);
   final MetricMutableGaugeInt redSlots =
       registry.newGauge("reduceTaskSlots", "", 0);
+  final MetricMutableGaugeInt failedDirs =
+      registry.newGauge("failedDirs", "", 0);
   final MetricMutableCounterInt completedTasks =
       registry.newCounter("tasks_completed", "", 0);
   final MetricMutableCounterInt timedoutTasks =
@@ -61,6 +63,7 @@ public class TaskTrackerMetricsSource extends TaskTrackerInstrumentation
     redsRunning.set(tt.reduceTotal);
     mapSlots.set(tt.getMaxCurrentMapTasks());
     redSlots.set(tt.getMaxCurrentReduceTasks());
+    failedDirs.set(tt.getNumDirFailures());
     registry.snapshot(builder.addRecord(registry.name()), all);
   }
 

+ 20 - 8
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -47,7 +47,8 @@ public class TaskTrackerStatus implements Writable {
   String trackerName;
   String host;
   int httpPort;
-  int failures;
+  int taskFailures;
+  int dirFailures;
   List<TaskStatus> taskReports;
     
   volatile long lastSeen;
@@ -357,14 +358,15 @@ public class TaskTrackerStatus implements Writable {
    */
   public TaskTrackerStatus(String trackerName, String host, 
                            int httpPort, List<TaskStatus> taskReports, 
-                           int failures, int maxMapTasks,
-                           int maxReduceTasks) {
+                           int taskFailures, int dirFailures,
+                           int maxMapTasks, int maxReduceTasks) {
     this.trackerName = trackerName;
     this.host = host;
     this.httpPort = httpPort;
 
     this.taskReports = new ArrayList<TaskStatus>(taskReports);
-    this.failures = failures;
+    this.taskFailures = taskFailures;
+    this.dirFailures = dirFailures;
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
     this.resStatus = new ResourceStatus();
@@ -394,8 +396,16 @@ public class TaskTrackerStatus implements Writable {
    * Get the number of tasks that have failed on this tracker.
    * @return The number of failed tasks
    */
-  public int getFailures() {
-    return failures;
+  public int getTaskFailures() {
+    return taskFailures;
+  }
+
+  /**
+   * Get the number of local directories that have failed on this tracker.
+   * @return The number of failed local directories
+   */
+  public int getDirFailures() {
+    return dirFailures;
   }
     
   /**
@@ -652,7 +662,8 @@ public class TaskTrackerStatus implements Writable {
     Text.writeString(out, trackerName);
     Text.writeString(out, host);
     out.writeInt(httpPort);
-    out.writeInt(failures);
+    out.writeInt(taskFailures);
+    out.writeInt(dirFailures);
     out.writeInt(maxMapTasks);
     out.writeInt(maxReduceTasks);
     resStatus.write(out);
@@ -668,7 +679,8 @@ public class TaskTrackerStatus implements Writable {
     this.trackerName = Text.readString(in);
     this.host = Text.readString(in);
     this.httpPort = in.readInt();
-    this.failures = in.readInt();
+    this.taskFailures = in.readInt();
+    this.dirFailures = in.readInt();
     this.maxMapTasks = in.readInt();
     this.maxReduceTasks = in.readInt();
     resStatus.readFields(in);

+ 3 - 3
src/test/org/apache/hadoop/mapred/TestClusterStatus.java

@@ -154,7 +154,7 @@ public class TestClusterStatus extends TestCase {
       List<TaskStatus> taskStatuses) {
     return new TaskTrackerStatus(trackerName, 
       JobInProgress.convertTrackerNameToHostName(trackerName), 0,
-      taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+      taskStatuses, 0, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
   }
   
   public void testClusterMetrics() throws IOException, InterruptedException {
@@ -267,10 +267,10 @@ public class TestClusterStatus extends TestCase {
     TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
     TaskTrackerStatus status1 = new TaskTrackerStatus(
         trackers[0],JobInProgress.convertTrackerNameToHostName(
-            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 0, 2, 2);
     TaskTrackerStatus status2 = new TaskTrackerStatus(
         trackers[1],JobInProgress.convertTrackerNameToHostName(
-            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 0, 2, 2);
     tt1.setStatus(status1);
     tt2.setStatus(status2);
     

+ 15 - 2
src/test/org/apache/hadoop/mapred/TestDiskFailures.java

@@ -26,6 +26,9 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -76,28 +79,38 @@ public class TestDiskFailures extends ClusterMapReduceTestCase {
     startCluster(true, props, numMapredLocalDirs);
 
     MiniMRCluster cluster = getMRCluster();
+    TaskTracker tt = cluster.getTaskTrackerRunner(0).getTaskTracker();
+    MetricsRecordBuilder rb = getMetrics(new TaskTrackerMetricsSource(tt));
     String[] localDirs = cluster.getTaskTrackerLocalDirs(0);
 
+    assertGauge("failedDirs", 0, rb);
+
     // Make 1 disk fail and verify if TaskTracker gets re-inited or not and
     // the good mapred local dirs list gets updated properly in TaskTracker.
     prepareDirToFail(localDirs[2]);
     String expectedMapredLocalDirs = localDirs[0] + "," + localDirs[1] + ","
                                      + localDirs[3];
     verifyReinitTaskTrackerAfterDiskFailure(expectedMapredLocalDirs, cluster);
-    
+    rb = getMetrics(new TaskTrackerMetricsSource(tt));
+    assertGauge("failedDirs", 1, rb);
+
     // Make 2 more disks fail and verify if TaskTracker gets re-inited or not
     // and the good mapred local dirs list gets updated properly in TaskTracker.
     prepareDirToFail(localDirs[0]);
     prepareDirToFail(localDirs[3]);
     expectedMapredLocalDirs = localDirs[1];
     verifyReinitTaskTrackerAfterDiskFailure(expectedMapredLocalDirs, cluster);
-    
+    rb = getMetrics(new TaskTrackerMetricsSource(tt));
+    assertGauge("failedDirs", 3, rb);
+
     // Fail the remaining single disk(i.e. the remaining good mapred-local-dir).
     prepareDirToFail(localDirs[1]);
     waitForDiskHealthCheck();
     assertTrue(
         "Tasktracker is not dead even though all mapred local dirs became bad.",
         cluster.getTaskTrackerRunner(0).isDead);
+    rb = getMetrics(new TaskTrackerMetricsSource(tt));
+    assertGauge("failedDirs", 4, rb);
   }
 
   /**

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -130,13 +130,13 @@ public class TestJobQueueTaskScheduler extends TestCase {
       
       TaskTracker tt1 = new TaskTracker("tt1");
       tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
-                    new ArrayList<TaskStatus>(), 0,
+                    new ArrayList<TaskStatus>(), 0, 0,
                     maxMapTasksPerTracker, maxReduceTasksPerTracker));
       trackers.put("tt1", tt1);
       
       TaskTracker tt2 = new TaskTracker("tt2");
       tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
-                    new ArrayList<TaskStatus>(), 0,
+                    new ArrayList<TaskStatus>(), 0, 0,
                     maxMapTasksPerTracker, maxReduceTasksPerTracker));
       trackers.put("tt2", tt2);
     }

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestParallelInitialization.java

@@ -91,7 +91,7 @@ public class TestParallelInitialization extends TestCase {
       JobConf conf = new JobConf();
       queueManager = new QueueManager(conf);
       trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
-                   new ArrayList<TaskStatus>(), 0,
+                   new ArrayList<TaskStatus>(), 0, 0,
                    maxMapTasksPerTracker, maxReduceTasksPerTracker));
     }
     

+ 8 - 5
src/webapps/job/machines.jsp

@@ -49,7 +49,8 @@
                 "<td><b># running tasks</b></td>" +
                 "<td><b>Max Map Tasks</b></td>" +
                 "<td><b>Max Reduce Tasks</b></td>" +
-                "<td><b>Failures</b></td>" +
+                "<td><b>Task Failures</b></td>" +
+                "<td><b>Directory Failures</b></td>" +
                 "<td><b>Node Health Status</b></td>" +
                 "<td><b>Seconds Since Node Last Healthy</b></td>");
       if (type.equals("blacklisted")) {
@@ -88,18 +89,20 @@
           it2.next();
           numCurTasks++;
         }
-        int numFailures = tt.getFailures();
-        if (numFailures > maxFailures) {
-          maxFailures = numFailures;
+        int numTaskFailures = tt.getTaskFailures();
+        if (numTaskFailures > maxFailures) {
+          maxFailures = numTaskFailures;
           failureKing = tt.getTrackerName();
         }
+        int numDirFailures = tt.getDirFailures();
         out.print("<tr><td><a href=\"http://");
         out.print(tt.getHost() + ":" + tt.getHttpPort() + "/\">");
         out.print(tt.getTrackerName() + "</a></td><td>");
         out.print(tt.getHost() + "</td><td>" + numCurTasks +
                   "</td><td>" + tt.getMaxMapSlots() +
                   "</td><td>" + tt.getMaxReduceSlots() + 
-                  "</td><td>" + numFailures +
+                  "</td><td>" + numTaskFailures +
+                  "</td><td>" + numDirFailures +
                   "</td><td>" + healthString +
                   "</td><td>" + sinceHealthCheck); 
         if (type.equals("blacklisted")) {