Kaynağa Gözat

MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du

(cherry picked from commit 177e8090f5809beb3ebcb656cd0affbb3f487de8)
Jason Lowe 10 yıl önce
ebeveyn
işleme
9e8d1d7d60

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -231,6 +231,9 @@ Release 2.6.0 - 2014-11-15
     MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
     (Emilio Coppa and jlowe via kihwal)
 
+    MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
+    correctly (Junping Du via jlowe)
+
 Release 2.5.2 - 2014-11-10
 
   INCOMPATIBLE CHANGES

+ 30 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -407,7 +407,7 @@ class Fetcher<K,V> extends Thread {
         }
         if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
           LOG.warn("Failed to connect to host: " + url + "after " 
-              + fetchRetryTimeout + "milliseconds.");
+              + fetchRetryTimeout + " milliseconds.");
           throw e;
         }
         try {
@@ -596,7 +596,7 @@ class Fetcher<K,V> extends Thread {
     } else {
       // timeout, prepare to be failed.
       LOG.warn("Timeout for copying MapOutput with retry on host " + host 
-          + "after " + fetchRetryTimeout + "milliseconds.");
+          + "after " + fetchRetryTimeout + " milliseconds.");
       
     }
   }
@@ -678,28 +678,49 @@ class Fetcher<K,V> extends Thread {
     } else if (connectionTimeout > 0) {
       unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
     }
+    long startTime = Time.monotonicNow();
+    long lastTime = startTime;
+    int attempts = 0;
     // set the connect timeout to the unit-connect-timeout
     connection.setConnectTimeout(unit);
     while (true) {
       try {
+        attempts++;
         connection.connect();
         break;
       } catch (IOException ioe) {
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
+        long currentTime = Time.monotonicNow();
+        long retryTime = currentTime - startTime;
+        long leftTime = connectionTimeout - retryTime;
+        long timeSinceLastIteration = currentTime - lastTime;
         // throw an exception if we have waited for timeout amount of time
         // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
+        if (leftTime <= 0) {
+          int retryTimeInSeconds = (int) retryTime/1000;
+          LOG.error("Connection retry failed with " + attempts + 
+              " attempts in " + retryTimeInSeconds + " seconds");
           throw ioe;
         }
-
         // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
+        if (leftTime < unit) {
+          unit = (int)leftTime;
           // reset the connect time out for the final connect
           connection.setConnectTimeout(unit);
         }
+        
+        if (timeSinceLastIteration < unit) {
+          try {
+            // sleep the left time of unit
+            sleep(unit - timeSinceLastIteration);
+          } catch (InterruptedException e) {
+            LOG.warn("Sleep in connection retry get interrupted.");
+            if (stopped) {
+              return;
+            }
+          }
+        }
+        // update the total remaining connect-timeout
+        lastTime = Time.monotonicNow();
       }
     }
   }