Explorar o código

Fix for HADOOP-86. Errors while reading map output now cause map task to fail and be re-executed.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@387279 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=19) %!d(string=hai) anos
pai
achega
74d3933823

+ 7 - 6
src/java/org/apache/hadoop/mapred/MapOutputFile.java

@@ -23,6 +23,8 @@ import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskTracker.MapOutputServer;
 
 /** A local file to be transferred via the {@link MapOutputProtocol}. */ 
 class MapOutputFile implements Writable, Configurable {
@@ -106,16 +108,15 @@ class MapOutputFile implements Writable, Configurable {
     UTF8.writeString(out, reduceTaskId);
     out.writeInt(partition);
     
-    // write the length-prefixed file content to the wire
     File file = getOutputFile(mapTaskId, partition);
-    out.writeLong(file.length());
-
     FSDataInputStream in = null;
     try {
+      // write the length-prefixed file content to the wire
+      out.writeLong(file.length());
       in = FileSystem.getNamed("local", this.jobConf).open(file);
-    } catch (IOException e) {
-      // log a SEVERE exception in order to cause TaskTracker to exit
+    } catch (FileNotFoundException e) {
       TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e);
+      ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
       throw e;
     }
     try {
@@ -127,8 +128,8 @@ class MapOutputFile implements Writable, Configurable {
         try {
           l = in.read(buffer);
         } catch (IOException e) {
-          // log a SEVERE exception in order to cause TaskTracker to exit
           TaskTracker.LOG.log(Level.SEVERE,"Can't read map output:" + file, e);
+          ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId);
           throw e;
         }
       }

+ 38 - 6
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -70,6 +70,15 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
 
     private int maxCurrentTasks;
 
+    class MapOutputServer extends RPC.Server {
+      private MapOutputServer(int port, int threads) {
+        super(TaskTracker.this, fConf, port, threads, false);
+      }
+      public TaskTracker getTaskTracker() {
+        return TaskTracker.this;
+      }
+    }
+
     /**
      * Start with the local machine name, and the default JobTracker
      */
@@ -127,7 +136,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
         }
         while (true) {
             try {
-                this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks, false, this.fConf);
+                this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks);
                 this.mapOutputServer.start();
                 break;
             } catch (BindException e) {
@@ -305,11 +314,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
               }
             }
             lastHeartbeat = now;
-
-            if (LogFormatter.hasLoggedSevere()) {
-              LOG.info("Severe problem detected.  TaskTracker exiting.");
-              return STALE_STATE;
-            }
         }
 
         return 0;
@@ -538,6 +542,22 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
             }
         }
 
+        /**
+         * The map output has been lost.
+         */
+        public synchronized void mapOutputLost() throws IOException {
+            if (runstate == TaskStatus.SUCCEEDED) {
+              LOG.info("Reporting output lost:"+task.getTaskId());
+              runstate = TaskStatus.FAILED;       // change status to failure
+              synchronized (TaskTracker.this) {   // force into next heartbeat
+                runningTasks.put(task.getTaskId(), this);
+                mapTotal++;
+              }
+            } else {
+              LOG.warning("Output already reported lost:"+task.getTaskId());
+            }
+        }
+
         /**
          * We no longer need anything from this task.  Either the 
          * controlling job is all done and the files have been copied
@@ -645,6 +665,18 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
         }
     }
 
+    /**
+     * A completed map task's output has been lost.
+     */
+    public synchronized void mapOutputLost(String taskid) throws IOException {
+        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
+        if (tip != null) {
+          tip.mapOutputLost();
+        } else {
+          LOG.warning("Unknown child with bad map output: "+taskid+". Ignored.");
+        }
+    }
+
     /** 
      * The main() for child processes. 
      */