Przeglądaj źródła

HADOOP-186. Better error handling in TaskTracker's top-level loop. Also improve calculation of time to send next heartbeat. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@398994 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 lat temu
rodzic
commit
afbcd7e2b7
2 zmienionych plików z 36 dodań i 15 usunięć
  1. 4 0
      CHANGES.txt
  2. 32 15
      src/java/org/apache/hadoop/mapred/TaskTracker.java

+ 4 - 0
CHANGES.txt

@@ -143,6 +143,10 @@ Trunk (unreleased)
     replication counts are now automatically adjusted to be within the
     newly configured bounds. (Hairong Kuang via cutting)
 
+38. HADOOP-186.  Better error handling in TaskTracker's top-level
+    loop.  Also improve calculation of time to send next heartbeat.
+    (omalley via cutting)
+
 Release 0.1.1 - 2006-04-08
 
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)

+ 32 - 15
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -18,7 +18,6 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
@@ -215,6 +214,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
      */
     int offerService() throws Exception {
         long lastHeartbeat = 0;
+        this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
 
         while (running) {
             long now = System.currentTimeMillis();
@@ -227,15 +227,16 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
                 }
                 continue;
             }
+            lastHeartbeat = now;
 
             //
             // Emit standard hearbeat message to check in with JobTracker
             //
             Vector taskReports = new Vector();
             synchronized (this) {
-                for (Iterator it = runningTasks.keySet().iterator(); it.hasNext(); ) {
-                    String taskid = (String) it.next();
-                    TaskInProgress tip = (TaskInProgress) runningTasks.get(taskid);
+                for (Iterator it = runningTasks.values().iterator(); 
+                     it.hasNext(); ) {
+                    TaskInProgress tip = (TaskInProgress) it.next();
                     TaskStatus status = tip.createStatus();
                     taskReports.add(status);
                     if (status.getRunState() != TaskStatus.RUNNING) {
@@ -252,9 +253,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
             //
             // Xmit the heartbeat
             //
-            if (justStarted) {
-                this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
-            }
             
             TaskTrackerStatus status = 
               new TaskTrackerStatus(taskTrackerName, localHostname, 
@@ -269,11 +267,16 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
             //
             // Check if we should create a new Task
             //
-            if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
-                Task t = jobClient.pollForNewTask(taskTrackerName);
-                if (t != null) {
-                  startNewTask(t);
-                }
+            try {
+              if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
+                  Task t = jobClient.pollForNewTask(taskTrackerName);
+                  if (t != null) {
+                    startNewTask(t);
+                  }
+              }
+            } catch (IOException ie) {
+              LOG.info("Problem launching task: " + 
+                       StringUtils.stringifyException(ie));
             }
 
             //
@@ -292,7 +295,12 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
                                      " seconds. Killing.";
                         LOG.info(tip.getTask().getTaskId() + ": " + msg);
                         tip.reportDiagnosticInfo(msg);
-                        tip.killAndCleanup(true);
+                        try {
+                          tip.killAndCleanup(true);
+                        } catch (IOException ie) {
+                          LOG.info("Problem cleaning task up: " +
+                                   StringUtils.stringifyException(ie));
+                        }
                     }
                 }
             }
@@ -307,16 +315,25 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
             //
             // Check for any Tasks whose job may have ended
             //
+            try {
             String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
             if (toCloseIds != null) {
               synchronized (this) {
                 for (int i = 0; i < toCloseIds.length; i++) {
                   TaskInProgress tip = (TaskInProgress) tasks.get(toCloseIds[i]);
-                  tip.jobHasFinished();                        
+                  try {
+                    tip.jobHasFinished();
+                  } catch (IOException ie) {
+                    LOG.info("problem finishing task: " +
+                             StringUtils.stringifyException(ie));
+                  }
                 }
               }
             }
-            lastHeartbeat = now;
+            } catch (IOException ie) {
+              LOG.info("Problem getting closed tasks: " +
+                       StringUtils.stringifyException(ie));
+            }
         }
 
         return 0;