Jelajahi Sumber

MAPREDUCE-5251. Reducer should not implicate map attempt if it has insufficient space to fetch map output. Contributed by Ashwin Shankar

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1507384 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 tahun lalu
induk
melakukan
00b09a3852

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -21,6 +21,9 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
     jlowe)
 
+    MAPREDUCE-5251. Reducer should not implicate map attempt if it has
+    insufficient space to fetch map output (Ashwin Shankar via jlowe)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 8 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -338,7 +338,14 @@ class Fetcher<K,V> extends Thread {
       }
       
       // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      try {
+        mapOutput = merger.reserve(mapId, decompressedLength, id);
+      } catch (IOException ioe) {
+        // kill this reduce attempt
+        ioErrs.increment(1);
+        scheduler.reportLocalError(ioe);
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {

+ 12 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -301,6 +303,16 @@ class ShuffleScheduler<K,V> {
     host.addKnownMap(mapId);
   }
 
+  public void reportLocalError(IOException ioe) {
+    try {
+      LOG.error("Shuffle failed : local error on this node: "
+          + InetAddress.getLocalHost());
+    } catch (UnknownHostException e) {
+      LOG.error("Shuffle failed : local error on this node");
+    }
+    reporter.reportException(ioe);
+  }
+
   public synchronized MapHost getHost() throws InterruptedException {
       while(pendingHosts.isEmpty()) {
         wait();

+ 51 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.junit.Test;
 
 /**
@@ -72,6 +73,56 @@ public class TestFetcher {
     }
   }
   
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceOutOfDiskSpace() throws Throwable {
+    LOG.info("testReduceOutOfDiskSpace");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[] { 0, 0, 0,
+        0 });
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString())).thenReturn(allErrs);
+
+    Fetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(job, id, ss,
+        mm, r, metrics, except, key, connection);
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    when(connection.getResponseCode()).thenReturn(200);
+    when(
+        connection
+            .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    when(connection.getInputStream()).thenReturn(in);
+
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())).thenThrow(
+        new DiskErrorException("No disk space available"));
+
+    underTest.copyFromHost(host);
+    verify(ss).reportLocalError(any(IOException.class));
+  }
+  
   @SuppressWarnings("unchecked")
   @Test(timeout=30000)
   public void testCopyFromHostConnectionTimeout() throws Exception {