ソースを参照

svn merge --change -1363454 for reverting MAPREDUCE-4423

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1363935 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 13 年 前
コミット
d45922de2c

+ 0 - 3
hadoop-mapreduce-project/CHANGES.txt

@@ -739,9 +739,6 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't
     init. (Jason Lowe via daryn)
 
-    MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans
-    via tgraves)
-
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 22 - 40
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -49,8 +49,7 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.annotations.VisibleForTesting;
-
+@SuppressWarnings({"deprecation"})
 class Fetcher<K,V> extends Thread {
   
   private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -176,18 +175,13 @@ class Fetcher<K,V> extends Thread {
     }
   }
 
-  @VisibleForTesting
-  protected HttpURLConnection openConnection(URL url) throws IOException {
-    return (HttpURLConnection)url.openConnection();
-  }
-  
   /**
    * The crux of the matter...
    * 
    * @param host {@link MapHost} from which we need to  
    *              shuffle available map-outputs.
    */
-  protected void copyFromHost(MapHost host) throws IOException {
+  private void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
     List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
     
@@ -197,11 +191,9 @@ class Fetcher<K,V> extends Thread {
       return;
     }
     
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Fetcher " + id + " going to fetch from " + host);
-      for (TaskAttemptID tmp: maps) {
-        LOG.debug(tmp);
-      }
+    LOG.debug("Fetcher " + id + " going to fetch from " + host);
+    for (TaskAttemptID tmp: maps) {
+      LOG.debug(tmp);
     }
     
     // List of maps to be fetched yet
@@ -213,7 +205,7 @@ class Fetcher<K,V> extends Thread {
     
     try {
       URL url = getMapOutputURL(host, maps);
-      HttpURLConnection connection = openConnection(url);
+      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
       
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -274,24 +266,17 @@ class Fetcher<K,V> extends Thread {
     
     try {
       // Loop through available map-outputs and fetch them
-      // On any error, faildTasks is not null and we exit
-      // after putting back the remaining maps to the 
-      // yet_to_be_fetched list and marking the failed tasks.
-      TaskAttemptID[] failedTasks = null;
-      while (!remaining.isEmpty() && failedTasks == null) {
-        failedTasks = copyMapOutput(host, input, remaining);
-      }
-      
-      if(failedTasks != null) {
-        for(TaskAttemptID left: failedTasks) {
-          scheduler.copyFailed(left, host, true);
-        }
+      // On any error, good becomes false and we exit after putting back
+      // the remaining maps to the yet_to_be_fetched list
+      boolean good = true;
+      while (!remaining.isEmpty() && good) {
+        good = copyMapOutput(host, input, remaining);
       }
       
       IOUtils.cleanup(LOG, input);
       
       // Sanity check
-      if (failedTasks == null && !remaining.isEmpty()) {
+      if (good && !remaining.isEmpty()) {
         throw new IOException("server didn't return all expected map outputs: "
             + remaining.size() + " left.");
       }
@@ -300,9 +285,10 @@ class Fetcher<K,V> extends Thread {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
-  }
+      
+   }
   
-  private TaskAttemptID[] copyMapOutput(MapHost host,
+  private boolean copyMapOutput(MapHost host,
                                 DataInputStream input,
                                 Set<TaskAttemptID> remaining) {
     MapOutput<K,V> mapOutput = null;
@@ -324,15 +310,14 @@ class Fetcher<K,V> extends Thread {
       } catch (IllegalArgumentException e) {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
-        //Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new TaskAttemptID[remaining.size()]);
+        return false;
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, mapId)) {
-        return new TaskAttemptID[] {mapId};
+        return false;
       }
       
       LOG.debug("header: " + mapId + ", len: " + compressedLength + 
@@ -344,7 +329,7 @@ class Fetcher<K,V> extends Thread {
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        return new TaskAttemptID[] {mapId};
+        return false;
       } 
       
       // Go!
@@ -366,18 +351,14 @@ class Fetcher<K,V> extends Thread {
       // Note successful shuffle
       remaining.remove(mapId);
       metrics.successFetch();
-      return null;
+      return true;
     } catch (IOException ioe) {
       ioErrs.increment(1);
       if (mapId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
                  mapId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
-        if(mapId == null) {
-          return remaining.toArray(new TaskAttemptID[remaining.size()]);
-        } else {
-          return new TaskAttemptID[] {mapId};
-        }
+        return false;
       }
       
       LOG.info("Failed to shuffle output of " + mapId + 
@@ -385,8 +366,9 @@ class Fetcher<K,V> extends Thread {
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
+      scheduler.copyFailed(mapId, host, true);
       metrics.failedFetch();
-      return new TaskAttemptID[] {mapId};
+      return false;
     }
 
   }

+ 0 - 121
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce.task.reduce;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.ArrayList;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.junit.Test;
-
-/**
- * Test that the Fetcher does what we expect it to.
- */
-public class TestFetcher {
-
-  public static class FakeFetcher<K,V> extends Fetcher<K,V> {
-
-    private HttpURLConnection connection;
-
-    public FakeFetcher(JobConf job, TaskAttemptID reduceId,
-        ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter,
-        ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
-        SecretKey jobTokenSecret, HttpURLConnection connection) {
-      super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
-          jobTokenSecret);
-      this.connection = connection;
-    }
-    
-    @Override
-    protected HttpURLConnection openConnection(URL url) throws IOException {
-      if(connection != null) {
-        return connection;
-      }
-      return super.openConnection(url);
-    }
-  }
-  
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testCopyFromHostBogusHeader() throws Exception {
-    JobConf job = new JobConf();
-    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
-    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
-    MergeManager<Text, Text> mm = mock(MergeManager.class);
-    Reporter r = mock(Reporter.class);
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
-    ExceptionReporter except = mock(ExceptionReporter.class);
-    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
-    HttpURLConnection connection = mock(HttpURLConnection.class);
-    
-    Counters.Counter allErrs = mock(Counters.Counter.class);
-    when(r.getCounter(anyString(), anyString()))
-      .thenReturn(allErrs);
-    
-    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
-        r, metrics, except, key, connection);
-    
-
-    MapHost host = new MapHost("localhost", "http://localhost:8080/");
-    
-    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
-    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
-    maps.add(map1ID);
-    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
-    maps.add(map2ID);
-    when(ss.getMapsForHost(host)).thenReturn(maps);
-    
-    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
-    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
-    
-    when(connection.getResponseCode()).thenReturn(200);
-    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
-      .thenReturn(replyHash);
-    ByteArrayInputStream in = new ByteArrayInputStream(
-        "5 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
-    when(connection.getInputStream()).thenReturn(in);
-    
-    underTest.copyFromHost(host);
-    
-    verify(connection)
-      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
-          encHash);
-    
-    verify(allErrs).increment(1);
-    verify(ss).copyFailed(map1ID, host, true);
-    verify(ss).copyFailed(map2ID, host, true);
-  }
-
-}