소스 검색

HADOOP-3130. Make the connect timeout smaller for getFile. Contributed by Amar Ramesh Kamat and Runping Qi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@648697 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 년 전
부모
커밋
9393f9c913
3개의 변경된 파일88개의 추가작업 그리고 8개의 파일을 삭제
  1. 3 0
      CHANGES.txt
  2. 82 6
      src/java/org/apache/hadoop/mapred/MapOutputLocation.java
  3. 3 2
      src/java/org/apache/hadoop/mapred/ReduceTask.java

+ 3 - 0
CHANGES.txt

@@ -18,6 +18,9 @@ Trunk (unreleased changes)
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().
     (Lohit Vjayarenu via rangadi)
 
+    HADOOP-3130. Make the connect timeout smaller for getFile.
+    (Amar Ramesh Kamat via ddas)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 82 - 6
src/java/org/apache/hadoop/mapred/MapOutputLocation.java

@@ -49,6 +49,10 @@ class MapOutputLocation implements Writable, MRConstants {
   private String host;
   private int port;
   private String jobId;
+  // basic/unit connection timeout (in milliseconds)
+  private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
+  // default read timeout (in milliseconds)
+  private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
   /** RPC constructor **/
   public MapOutputLocation() {
@@ -102,6 +106,51 @@ class MapOutputLocation implements Writable, MRConstants {
            "&map=" + mapTaskId;
   }
   
+  /** 
+   * The connection establishment is attempted multiple times and is given up 
+   * only on the last failure. Instead of connecting with a timeout of 
+   * X, we try connecting with a timeout of x < X but multiple times. 
+   */
+  private InputStream getInputStream(URLConnection connection, 
+                                     int connectionTimeout, 
+                                     int readTimeout) 
+  throws IOException {
+    int unit = 0;
+    if (connectionTimeout < 0) {
+      throw new IOException("Invalid timeout "
+                            + "[timeout = " + connectionTimeout + " ms]");
+    } else if (connectionTimeout > 0) {
+      unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
+             ? connectionTimeout
+             : UNIT_CONNECT_TIMEOUT;
+    }
+    // set the read timeout to the total timeout
+    connection.setReadTimeout(readTimeout);
+    // set the connect timeout to the unit-connect-timeout
+    connection.setConnectTimeout(unit);
+    while (true) {
+      try {
+        return connection.getInputStream();
+      } catch (IOException ioe) {
+        // update the total remaining connect-timeout
+        connectionTimeout -= unit;
+
+        // throw an exception if we have waited for timeout amount of time
+        // note that the updated value if timeout is used here
+        if (connectionTimeout == 0) {
+          throw ioe;
+        }
+
+        // reset the connect timeout for the last try
+        if (connectionTimeout < unit) {
+          unit = connectionTimeout;
+          // reset the connect time out for the final connect
+          connection.setConnectTimeout(unit);
+        }
+      }
+    }
+  }
+
   /**
    * Get the map output into a local file (either in the inmemory fs or on the 
    * local fs) from the remote server.
@@ -113,7 +162,7 @@ class MapOutputLocation implements Writable, MRConstants {
    * @param lDirAlloc the LocalDirAllocator object
    * @param conf the Configuration object
    * @param reduce the reduce id to get for
-   * @param timeout number of ms for connection and read timeout
+   * @param timeout number of milliseconds for connection timeout
    * @return the path of the file that got created
    * @throws IOException when something goes wrong
    */
@@ -124,6 +173,36 @@ class MapOutputLocation implements Writable, MRConstants {
                       LocalDirAllocator lDirAlloc,
                       Configuration conf, int reduce,
                       int timeout, Progressable progressable) 
+  throws IOException, InterruptedException {
+    return getFile(inMemFileSys, localFileSys, shuffleMetrics, localFilename, 
+                   lDirAlloc, conf, reduce, timeout, DEFAULT_READ_TIMEOUT, 
+                   progressable);
+  }
+
+  /**
+   * Get the map output into a local file (either in the inmemory fs or on the 
+   * local fs) from the remote server.
+   * We use the file system so that we generate checksum files on the data.
+   * @param inMemFileSys the inmemory filesystem to write the file to
+   * @param localFileSys the local filesystem to write the file to
+   * @param shuffleMetrics the metrics context
+   * @param localFilename the filename to write the data into
+   * @param lDirAlloc the LocalDirAllocator object
+   * @param conf the Configuration object
+   * @param reduce the reduce id to get for
+   * @param connectionTimeout number of milliseconds for connection timeout
+   * @param readTimeout number of milliseconds for read timeout
+   * @return the path of the file that got created
+   * @throws IOException when something goes wrong
+   */
+  public Path getFile(InMemoryFileSystem inMemFileSys,
+                      FileSystem localFileSys,
+                      ShuffleClientMetrics shuffleMetrics,
+                      Path localFilename, 
+                      LocalDirAllocator lDirAlloc,
+                      Configuration conf, int reduce,
+                      int connectionTimeout, int readTimeout, 
+                      Progressable progressable) 
   throws IOException, InterruptedException {
     boolean good = false;
     long totalBytes = 0;
@@ -132,11 +211,8 @@ class MapOutputLocation implements Writable, MRConstants {
     URL path = new URL(toString() + "&reduce=" + reduce);
     try {
       URLConnection connection = path.openConnection();
-      if (timeout > 0) {
-        connection.setConnectTimeout(timeout);
-        connection.setReadTimeout(timeout);
-      }
-      InputStream input = connection.getInputStream();
+      InputStream input = getInputStream(connection, connectionTimeout, 
+                                         readTimeout); 
       OutputStream output = null;
       
       //We will put a file in memory if it meets certain criteria:

+ 3 - 2
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -1030,8 +1030,9 @@ class ReduceTask extends Task {
         // loop until we get all required outputs
         while (!neededOutputs.isEmpty() && mergeThrowable == null) {
           
-          LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
-          " map output(s)");
+          LOG.info(reduceTask.getTaskId() + " Need another " 
+                   + neededOutputs.size() + " map output(s) where " 
+                   + numInFlight + " is already in progress");
           
           try {
             // Put the hash entries for the failed fetches. Entries here