瀏覽代碼

HADOOP-513. Replace map output handling with a servlet. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@451451 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 年之前
父節點
當前提交
155fddd073

+ 5 - 0
CHANGES.txt

@@ -95,6 +95,11 @@ Trunk (unreleased changes)
     DFS, prohibiting the use of "." or ".." as directory or file
     DFS, prohibiting the use of "." or ".." as directory or file
     names.  (Wendy Chien via cutting)
     names.  (Wendy Chien via cutting)
 
 
+24. HADOOP-513.  Replace map output handling with a servlet, rather
+    than a JSP page.  This fixes an issue where
+    IllegalStateException's were logged, sets content-length
+    correctly, and better handles some errors.  (omalley via cutting)
+
 
 
 Release 0.6.2 (unreleased)
 Release 0.6.2 (unreleased)
 
 

+ 1 - 4
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -161,10 +161,7 @@ public class DataNode implements FSConstants, Runnable {
         String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
         String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
         this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
         this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
         //create a servlet to serve full-file content
         //create a servlet to serve full-file content
-        try {
-          this.infoServer.addServlet(null, "/streamFile/*",
-                "org.apache.hadoop.dfs.StreamFile", null);
-        } catch (Exception e) {LOG.warn("addServlet threw exception", e);}
+        this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
         this.infoServer.start();
         this.infoServer.start();
         this.dnRegistration.infoPort = this.infoServer.getPort();
         this.dnRegistration.infoPort = this.infoServer.getPort();
         datanodeObject = this;
         datanodeObject = this;

+ 1 - 1
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -87,7 +87,7 @@ class MapOutputLocation implements Writable {
   }
   }
 
 
   public String toString() {
   public String toString() {
-    return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" + 
+    return "http://" + host + ":" + port + "/mapOutput?map=" + 
             mapTaskId;
             mapTaskId;
   }
   }
   
   

+ 28 - 13
src/java/org/apache/hadoop/mapred/StatusHttpServer.java

@@ -21,11 +21,12 @@ import java.io.UnsupportedEncodingException;
 import java.net.URL;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.net.URLDecoder;
 
 
+import javax.servlet.http.HttpServlet;
+
 import org.mortbay.http.HttpContext;
 import org.mortbay.http.HttpContext;
 import org.mortbay.http.handler.ResourceHandler;
 import org.mortbay.http.handler.ResourceHandler;
 import org.mortbay.http.SocketListener;
 import org.mortbay.http.SocketListener;
 import org.mortbay.jetty.servlet.WebApplicationContext;
 import org.mortbay.jetty.servlet.WebApplicationContext;
-import org.mortbay.jetty.servlet.ServletHttpContext;
 
 
 /**
 /**
  * Create a Jetty embedded server to answer http requests. The primary goal
  * Create a Jetty embedded server to answer http requests. The primary goal
@@ -97,19 +98,33 @@ public class StatusHttpServer {
    * @param name The name of the servlet (can be passed as null)
    * @param name The name of the servlet (can be passed as null)
    * @param pathSpec The path spec for the servlet
    * @param pathSpec The path spec for the servlet
    * @param classname The class name for the servlet
    * @param classname The class name for the servlet
-   * @param contextPath The context path (can be null, defaults to "/")
    */
    */
-  public void addServlet(String name, String pathSpec, String classname,
-     String contextPath) 
- throws ClassNotFoundException, InstantiationException, IllegalAccessException {
-    String tmpContextPath = contextPath;
-    if (tmpContextPath == null) tmpContextPath = "/";
-    ServletHttpContext context = 
-                    (ServletHttpContext)webServer.getContext(tmpContextPath);
-    if (name == null)
-      context.addServlet(pathSpec, classname);
-    else
-      context.addServlet(name, pathSpec, classname);
+  public <T extends HttpServlet> 
+  void addServlet(String name, String pathSpec, 
+                  Class<T> servletClass) {
+    WebApplicationContext context = webAppContext;
+    try {
+      if (name == null) {
+        context.addServlet(pathSpec, servletClass.getName());
+      } else {
+        context.addServlet(name, pathSpec, servletClass.getName());
+      } 
+    } catch (ClassNotFoundException ex) {
+      throw makeRuntimeException("Problem instantiating class", ex);
+    } catch (InstantiationException ex) {
+      throw makeRuntimeException("Problem instantiating class", ex);
+    } catch (IllegalAccessException ex) {
+      throw makeRuntimeException("Problem instantiating class", ex);
+    }
+  }
+  
+  private static RuntimeException makeRuntimeException(String msg, 
+                                                       Throwable cause) {
+    RuntimeException result = new RuntimeException(msg);
+    if (cause != null) {
+      result.initCause(cause);
+    }
+    return result;
   }
   }
   
   
   /**
   /**

+ 60 - 4
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -28,8 +28,12 @@ import java.net.*;
 import java.util.*;
 import java.util.*;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
-import org.apache.hadoop.metrics.ContextFactory;
-import org.apache.hadoop.metrics.MetricsContext;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
 
 
@@ -361,8 +365,6 @@ public class TaskTracker
       this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
       this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);
       int workerThreads = conf.getInt("tasktracker.http.threads", 40);
       int workerThreads = conf.getInt("tasktracker.http.threads", 40);
       server.setThreads(1, workerThreads);
       server.setThreads(1, workerThreads);
-      server.start();
-      this.httpPort = server.getPort();
       // let the jsp pages get to the task tracker, config, and other relevant
       // let the jsp pages get to the task tracker, config, and other relevant
       // objects
       // objects
       FileSystem local = FileSystem.getNamed("local", conf);
       FileSystem local = FileSystem.getNamed("local", conf);
@@ -370,6 +372,9 @@ public class TaskTracker
       server.setAttribute("local.file.system", local);
       server.setAttribute("local.file.system", local);
       server.setAttribute("conf", conf);
       server.setAttribute("conf", conf);
       server.setAttribute("log", LOG);
       server.setAttribute("log", LOG);
+      server.addServlet("mapOutput", "mapOutput", MapOutputServlet.class);
+      server.start();
+      this.httpPort = server.getPort();
       initialize();
       initialize();
     }
     }
 
 
@@ -1318,4 +1323,55 @@ public class TaskTracker
             System.exit(-1);
             System.exit(-1);
         }
         }
     }
     }
+    
+    /**
+     * This class is used in TaskTracker's Jetty to serve the map outputs
+     * to other nodes.
+     * @author Owen O'Malley
+     */
+    public static class MapOutputServlet extends HttpServlet {
+      public void doGet(HttpServletRequest request, 
+                        HttpServletResponse response
+                       ) throws ServletException, IOException {
+        String mapId = request.getParameter("map");
+        String reduceId = request.getParameter("reduce");
+        if (mapId == null || reduceId == null) {
+          throw new IOException("map and reduce parameters are required");
+        }
+        ServletContext context = getServletContext();
+        int reduce = Integer.parseInt(reduceId);
+        byte[] buffer = new byte[64*1024];
+        OutputStream outStream = response.getOutputStream();
+        JobConf conf = (JobConf) context.getAttribute("conf");
+        FileSystem fileSys = 
+          (FileSystem) context.getAttribute("local.file.system");
+        Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
+        response.setContentLength((int) fileSys.getLength(filename));
+        InputStream inStream = null;
+        try {
+          inStream = fileSys.open(filename);
+          try {
+            int len = inStream.read(buffer);
+            while (len > 0) {
+              outStream.write(buffer, 0, len);
+              len = inStream.read(buffer);
+            }
+          } finally {
+            inStream.close();
+            outStream.close();
+          }
+        } catch (IOException ie) {
+          TaskTracker tracker = 
+            (TaskTracker) context.getAttribute("task.tracker");
+          Log log = (Log) context.getAttribute("log");
+          String errorMsg = "getMapOutput(" + mapId + "," + reduceId + 
+          ") failed :\n"+
+          StringUtils.stringifyException(ie);
+          log.warn(errorMsg);
+          tracker.mapOutputLost(mapId, errorMsg);
+          response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+          throw ie;
+        } 
+      }
+    }
 }
 }

+ 0 - 56
src/webapps/task/getMapOutput.jsp

@@ -1,56 +0,0 @@
-<%@ page
-  contentType="application/octet-stream"
-  session="false"
-  buffer="64kb"
-  import="javax.servlet.*"
-  import="javax.servlet.http.*"
-  import="java.io.*"
-  import="java.util.*"
-  import="org.apache.hadoop.fs.*"
-  import="org.apache.hadoop.mapred.*"
-  import="org.apache.hadoop.util.*"
-  import="org.apache.commons.logging.*"
-%><%
-  String mapId = request.getParameter("map");
-  String reduceId = request.getParameter("reduce");
-  if (mapId == null || reduceId == null) {
-    throw new IOException("map and reduce parameters are required");
-  }
-  int reduce = Integer.parseInt(reduceId);
-  byte[] buffer = new byte[64*1024];
-  OutputStream outStream = response.getOutputStream();
-  JobConf conf = (JobConf) application.getAttribute("conf");
-  FileSystem fileSys = 
-     (FileSystem) application.getAttribute("local.file.system");
-  Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
-  response.resetBuffer();
-  InputStream inStream = null;
-  try {
-    inStream = fileSys.open(filename);
-    int len = inStream.read(buffer);
-    while (len > 0) {
-      try {
-        outStream.write(buffer, 0, len);
-      } catch (Exception e) {
-        break;
-      }
-      len = inStream.read(buffer);
-    }
-  } catch (IOException ie) {
-    TaskTracker tracker = 
-       (TaskTracker) application.getAttribute("task.tracker");
-    Log log = (Log) application.getAttribute("log");
-    String errorMsg = "getMapOutput(" + mapId + "," + reduceId + ") failed : "+
-                       StringUtils.stringifyException(ie);
-    log.warn(errorMsg);
-    tracker.mapOutputLost(mapId, errorMsg);
-    throw ie;
-  } finally {
-    if (inStream != null) {
-      inStream.close();
-    }
-    if (outStream != null) {
-      outStream.close();
-    }
-  }
-%>