Explorar o código

MAPREDUCE-5060. Fetch failures that time out only count against the first map task. Contributed by Robert Joseph Evans

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1455743 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe %!s(int64=12) %!d(string=hai) anos
pai
achega
6fec2f0331

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -94,6 +94,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi
     Prakash via tgraves)
 
+    MAPREDUCE-5060. Fetch failures that time out only count against the first
+    map task (Robert Joseph Evans via jlowe)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 3 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -211,7 +211,6 @@ class Fetcher<K,V> extends Thread {
     
     // Construct the url and connect
     DataInputStream input;
-    boolean connectSucceeded = false;
     
     try {
       URL url = getMapOutputURL(host, maps);
@@ -227,7 +226,6 @@ class Fetcher<K,V> extends Thread {
       // set the read timeout
       connection.setReadTimeout(readTimeout);
       connect(connection, connectionTimeout);
-      connectSucceeded = true;
       input = new DataInputStream(connection.getInputStream());
 
       // Validate response code
@@ -255,18 +253,10 @@ class Fetcher<K,V> extends Thread {
 
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
-      if (!connectSucceeded) {
-        for(TaskAttemptID left: remaining) {
-          scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
-        }
-      } else {
-        // If we got a read error at this stage, it implies there was a problem
-        // with the first map, typically lost map. So, penalize only that map
-        // and add the rest
-        TaskAttemptID firstMap = maps.get(0);
-        scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
+      for(TaskAttemptID left: remaining) {
+        scheduler.copyFailed(left, host, false, connectExcpt);
       }
-      
+     
       // Add back all the remaining maps, WITHOUT marking them as failed
       for(TaskAttemptID left: remaining) {
         scheduler.putBackKnownMapOutput(host, left);

+ 49 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.ArrayList;
 
@@ -70,6 +71,54 @@ public class TestFetcher {
     }
   }
   
+  @SuppressWarnings("unchecked")
+  @Test(timeout=30000)
+  public void testCopyFromHostConnectionTimeout() throws Exception {
+    LOG.info("testCopyFromHostConnectionTimeout");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    when(connection.getInputStream()).thenThrow(
+        new SocketTimeoutException("This is a fake timeout :)"));
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    
+    underTest.copyFromHost(host);
+    
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    
+    verify(allErrs).increment(1);
+    verify(ss).copyFailed(map1ID, host, false, false);
+    verify(ss).copyFailed(map2ID, host, false, false);
+    
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
+  
   @SuppressWarnings("unchecked")
   @Test
   public void testCopyFromHostBogusHeader() throws Exception {